Search before asking
Description
Task Overview:
The existing RemoteStorageCleaner always derives the remote storage path from the global remote.data.dir config. With per-table/partition remoteDataDir now tracked in metadata, the cleaner must use the directory that was assigned to each specific table or partition at creation time — otherwise deletion will target the wrong path when multiple remote directories are configured.
Additionally, the previous implementation had a correctness bug: asyncDeleteRemoteDirectory(TablePartition) fetched TableInfo from CoordinatorContext to resolve the table path. Since completeDeleteTable() calls coordinatorContext.removeTable() before all partitions finish their deletion, getTableInfoById() could return null, causing partition remote directory cleanup to be silently skipped. This commit fixes both the bug and the multi-dir limitation in a unified refactor.
Purpose:
- Fix the bug where partition remote directories are leaked when
TableInfo is removed from context before all partitions complete deletion.
- Make
RemoteStorageCleaner use per-table/partition remoteDataDir so it correctly targets the assigned remote directory in a multi-directory deployment.
- Store
PartitionInfo (including remoteDataDir) in CoordinatorContext so it is independently available without relying on TableInfo.
Scope:
1. RemoteStorageCleaner — accept explicit remoteDataDir
RemoteStorageCleaner (moved to remote/ package): Remove dependency on Configuration. asyncDeleteTableRemoteDir() and asyncDeletePartitionRemoteDir() now accept String remoteDataDir as an explicit parameter. Each method computes FlussPaths.remoteKvDir(remoteDataDir) / FlussPaths.remoteLogDir(remoteDataDir) on demand.
2. CoordinatorContext — store PartitionInfo independently
CoordinatorContext: Add partitionInfoById map (Map<Long, PartitionInfo>).
- Update
putPartition() to also store PartitionInfo (which carries remoteDataDir).
- Add
getPartitionInfoById(long partitionId) accessor.
- Add
hasPartitionsToDelete(long tableId) helper.
3. ZooKeeperClient — expose full PartitionRegistration
ZooKeeperClient: Replace getPartitionNameAndIdsForTables() (returning Map<String, Long>) with getPartitionNameAndRegistrationsForTables() (returning Map<String, PartitionRegistration>), so callers can access remoteDataDir from the registration.
4. CoordinatorEventProcessor — build PartitionInfo with remoteDataDir
- When loading existing partitions at startup: read
PartitionRegistration from ZK, construct PartitionInfo(partitionId, resolvedPartitionSpec, remoteDataDir), and pass it to onCreateNewPartition().
- When processing
CreatePartitionEvent: build PartitionInfo from createPartitionEvent.getRemoteDataDir().
- After
processDropTable() for a partitioned table: call tableManager.resumeTableDeletions() to handle the case where all partitions are already deleted before the drop event is processed.
5. TableManager — pass remoteDataDir to cleaner
onCreateNewPartition(): accept PartitionInfo instead of (partitionId, partitionName) pair; store it in context via putPartition().
resumeTableDeletions(): made public; add guard — skip completeDeleteTable() if coordinatorContext.hasPartitionsToDelete(tableId) is true.
resumePartitionDeletions(): after completing a partition, set hasPartitionCompleted = true; call resumeTableDeletions() at the end.
asyncDeleteRemoteDirectory(long tableId): pass tableInfo.getRemoteDataDir() to RemoteStorageCleaner.
asyncDeleteRemoteDirectory(TablePartition): no longer depends on TableInfo — retrieves PartitionInfo directly from coordinatorContext.getPartitionInfoById() and passes partitionInfo.getRemoteDataDir() to the cleaner.
6. CreatePartitionEvent / TableChangeWatcher
CreatePartitionEvent: Add remoteDataDir field.
TableChangeWatcher: Pass remoteDataDir when constructing CreatePartitionEvent.
Willingness to contribute
Search before asking
Description
Task Overview:
The existing
RemoteStorageCleaneralways derives the remote storage path from the globalremote.data.dirconfig. With per-table/partitionremoteDataDirnow tracked in metadata, the cleaner must use the directory that was assigned to each specific table or partition at creation time — otherwise deletion will target the wrong path when multiple remote directories are configured.Additionally, the previous implementation had a correctness bug:
asyncDeleteRemoteDirectory(TablePartition)fetchedTableInfofromCoordinatorContextto resolve the table path. SincecompleteDeleteTable()callscoordinatorContext.removeTable()before all partitions finish their deletion,getTableInfoById()could returnnull, causing partition remote directory cleanup to be silently skipped. This commit fixes both the bug and the multi-dir limitation in a unified refactor.Purpose:
TableInfois removed from context before all partitions complete deletion.RemoteStorageCleaneruse per-table/partitionremoteDataDirso it correctly targets the assigned remote directory in a multi-directory deployment.PartitionInfo(includingremoteDataDir) inCoordinatorContextso it is independently available without relying onTableInfo.Scope:
1. RemoteStorageCleaner — accept explicit remoteDataDir
RemoteStorageCleaner(moved toremote/package): Remove dependency onConfiguration.asyncDeleteTableRemoteDir()andasyncDeletePartitionRemoteDir()now acceptString remoteDataDiras an explicit parameter. Each method computesFlussPaths.remoteKvDir(remoteDataDir)/FlussPaths.remoteLogDir(remoteDataDir)on demand.2. CoordinatorContext — store PartitionInfo independently
CoordinatorContext: AddpartitionInfoByIdmap (Map<Long, PartitionInfo>).putPartition()to also storePartitionInfo(which carriesremoteDataDir).getPartitionInfoById(long partitionId)accessor.hasPartitionsToDelete(long tableId)helper.3. ZooKeeperClient — expose full PartitionRegistration
ZooKeeperClient: ReplacegetPartitionNameAndIdsForTables()(returningMap<String, Long>) withgetPartitionNameAndRegistrationsForTables()(returningMap<String, PartitionRegistration>), so callers can accessremoteDataDirfrom the registration.4. CoordinatorEventProcessor — build PartitionInfo with remoteDataDir
PartitionRegistrationfrom ZK, constructPartitionInfo(partitionId, resolvedPartitionSpec, remoteDataDir), and pass it toonCreateNewPartition().CreatePartitionEvent: buildPartitionInfofromcreatePartitionEvent.getRemoteDataDir().processDropTable()for a partitioned table: calltableManager.resumeTableDeletions()to handle the case where all partitions are already deleted before the drop event is processed.5. TableManager — pass remoteDataDir to cleaner
onCreateNewPartition(): acceptPartitionInfoinstead of(partitionId, partitionName)pair; store it in context viaputPartition().resumeTableDeletions(): madepublic; add guard — skipcompleteDeleteTable()ifcoordinatorContext.hasPartitionsToDelete(tableId)is true.resumePartitionDeletions(): after completing a partition, sethasPartitionCompleted = true; callresumeTableDeletions()at the end.asyncDeleteRemoteDirectory(long tableId): passtableInfo.getRemoteDataDir()toRemoteStorageCleaner.asyncDeleteRemoteDirectory(TablePartition): no longer depends onTableInfo— retrievesPartitionInfodirectly fromcoordinatorContext.getPartitionInfoById()and passespartitionInfo.getRemoteDataDir()to the cleaner.6. CreatePartitionEvent / TableChangeWatcher
CreatePartitionEvent: AddremoteDataDirfield.TableChangeWatcher: PassremoteDataDirwhen constructingCreatePartitionEvent.Willingness to contribute