[CELEBORN-2257] Add reporting of remote disks during registration.#3597
[CELEBORN-2257] Add reporting of remote disks during registration.#3597Dzeri96 wants to merge 2 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes worker registration disk reporting so the master can see remote storage (HDFS/S3/OSS) immediately (before the first heartbeat), and refactors disk snapshot APIs / slot-allocation logic to distinguish local vs remote disks more clearly.
Changes:
- Renamed disk snapshot / healthy-dir helpers to explicitly mean “local” and added an “all disks” snapshot.
- Updated worker registration/heartbeat disk reporting to incorporate remote disks.
- Simplified master slot-allocation filtering by embedding disk-type metadata into
StorageInfo.Typeand using it in allocation logic.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala | Updates mocks to the renamed localDisksSnapshot() API. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala | Introduces localDisksSnapshot() / allDisksSnapshot() and renames “healthy working dirs” to local-only. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala | Switches registration to report all disks; refactors heartbeat disk update flow. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala | Uses local-only healthy working dirs check for slot reservation. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala | Updates test to use localDisksSnapshot(). |
| master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java | Simplifies disk filtering using StorageInfo.Type metadata; refactors usable-slot bookkeeping. |
| common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala | Refactors slot recomputation / propagation logic and uses isDFS. |
| common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java | Adds isDFS + mask metadata into StorageInfo.Type and introduces isAvailable(...). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
Show resolved
Hide resolved
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
Outdated
Show resolved
Hide resolved
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
Show resolved
Hide resolved
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
Outdated
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3597 +/- ##
==========================================
- Coverage 67.13% 67.07% -0.06%
==========================================
Files 357 357
Lines 21860 21935 +75
Branches 1943 1947 +4
==========================================
+ Hits 14674 14711 +37
- Misses 6166 6213 +47
+ Partials 1020 1011 -9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
Show resolved
Hide resolved
| private val diskInfos = storageManager | ||
| .allDisksSnapshot() | ||
| .map { diskInfo => diskInfo.mountPoint -> diskInfo } | ||
| .toMap.asJava |
There was a problem hiding this comment.
This PR changes worker registration/heartbeat disk reporting to include remote disks (allDisksSnapshot) and introduces new slot-availability semantics (StorageInfo.isAvailable, Type.isDFS). There doesn’t appear to be a test asserting that remote disk infos are (a) included in the initial registration payload and (b) preserved across subsequent heartbeats so the master can allocate slots from them before/without the first heartbeat. Adding a focused unit/integration test around worker->master disk info propagation would help prevent regressions here.
There was a problem hiding this comment.
This is what I wrote in the original PR. I need someone from the existing community to guide me on writing an integration test.
SteNicholas
left a comment
There was a problem hiding this comment.
@Dzeri96, thanks for contribution. Could you explain which fix mainly provided in this pull request?
| Type(int value) { | ||
| Type(int value, boolean isDFS, int mask) { | ||
| this.value = value; | ||
| this.isDFS = isDFS; |
There was a problem hiding this comment.
IMO, it's unnecessary to add isDFS variable. The isDFS method is enough for usage.
There was a problem hiding this comment.
So my initial idea for implementing this was a HashMap. I had built a map that was being filled in the static block, like the other maps, but then I realized you had to make sure each ENUM member was in this map, so I wrote a test to enforce it.
In the end I found this static solution much more elegant. The compiler forces you to assign each enum member a isDFS value. It's also less code. Let me know if you want me to change it though.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (estimatedPartitionSize.nonEmpty && !newDisk.storageType.isDFS) { | ||
| newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get | ||
| newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get | ||
| } |
There was a problem hiding this comment.
This change introduces new slot-update semantics for DFS vs local disks inside updateThenGetDiskInfos, but there are no unit tests covering that remote/DFS disk slot fields are preserved across successive updates (e.g., registration updateDiskSlots(...) followed by heartbeat updateThenGetDiskInfos(...)). Adding a focused test in WorkerInfoSuite for a DFS DiskInfo would help prevent regressions like remote disks becoming unavailable after the first heartbeat.
|
@SteNicholas So my changes are explained in the PR description pretty well I think. While @eolivelli was running his tests, he noticed that the current faulty behaviour presents a problem when the auto-scaling spawns new nodes. In this moment, the system is under pressure, and yet the newly-spawned nodes don't report remote disks, leading to performance degredation and the need to spawn more nodes. In hindsight though, I should have limited the PR to just this. It's just that while I was trying to understand the code, I made the other changes to make it more readable for myself. In the end I decided to include them too since we will be working on this part of the project a lot in the future. Also, don't forget to help me with writing a test! |
for PR overall LGTM, Additionally, you can use |
|
I'll try to write some tests this week and will ping you when I'm done @RexXiong |
| diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); | ||
| } | ||
| for (DiskInfo diskInfo : worker.diskInfos().values()) { | ||
| if (diskInfo.status().equals(DiskStatus.HEALTHY) |
There was a problem hiding this comment.
| if (diskInfo.status().equals(DiskStatus.HEALTHY) | |
| if (DiskStatus.HEALTHY.equals(diskInfo.status()) |
This suggestion could avoid the null situation for diskInfo.status().
SteNicholas
left a comment
There was a problem hiding this comment.
@Dzeri96, thanks for update. The changes overall LGTM. Please add integration test for the changes.
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR resolve a correctness bug?
Not a correctness bug
Does this PR introduce any user-facing change?
No
How was this patch tested?
Important: I want help from the community on how to write tests for this.