Skip to content

[fix][meta] Metadata cache refresh might not take effect#25246

Open
BewareMyPower wants to merge 2 commits intoapache:masterfrom
BewareMyPower:bewaremypower/fix-metadata-cache-not-updated
Open

[fix][meta] Metadata cache refresh might not take effect#25246
BewareMyPower wants to merge 2 commits intoapache:masterfrom
BewareMyPower:bewaremypower/fix-metadata-cache-not-updated

Conversation

@BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Feb 13, 2026

Motivation

#25187 introduces a regression on MetadataCacheImpl#refresh. When a path is created or modified, the refresh method will be called by updating objCache with the future returned by readValueFromStore(). However, there is a race:

  1. [current thread] Acquire the lock of the node in the ConcurrentMap of objCache. Then call readValueFromStore, which calls store.get
  2. [metadata store worker thread] In the callback of store.get, objCache.getIfPresent calls the ConcurrentMap#get to get the cached future and check if it's the same with the future inserted to the map
  3. [current thread] Insert the future returned by to the ConcurrentMap of objCache and release the lock

The updated future of step 3 is not guaranteed to be immediately visible in step 2 because ConcurrentMap#get is lock-free, which means it does not need to wait the lock on path is released after step 3.

final var cachedFuture = objCache.getIfPresent(path);
if (cachedFuture != null && cachedFuture != future) {
if (log.isDebugEnabled()) {
log.debug("A new read on key {} is in progress or completed, ignore this one", path);
}
return cachedFuture;
}

The correctness of the code above is based on the fact that the cachedFuture must be the future of readValueFromStore, but not the existing cached future.

This logic was added originally because testCloneInReadModifyUpdateOrCreate failed. I've thought it's caused by duplicated refresh calls but it's actually not. However, the root cause is that when it failed, the create method implementation was wrong:

        return serialize(path, value).thenAcceptAsync(content -> store.put(path, content, Optional.of(-1L)))
                .thenApply(stat -> objCache.get(path))
                /* ... */
                .thenApply(__ -> null);

It should be thenComposeAsync rather than thenAcceptAsync, otherwise objCache.get might not see the updated value. This bug was accidentally fixed in my later commits.

Modifications

  • Revert the cachedFuture related code. We might need a careful design to reduce unnecessary deserializations when refresh is called multiple times for an update (e.g. the callback of MetadataCacheImpl#put and the notification method of AbstractMetadataStore#accept)
  • Add testRefreshRace to verify this race disappears.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@BewareMyPower BewareMyPower self-assigned this Feb 13, 2026
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 13, 2026
@BewareMyPower BewareMyPower added release/4.0.9 release/4.1.3 type/bug The PR fixed a bug or issue reported a bug area/metadata and removed doc-not-needed Your PR changes do not impact docs labels Feb 13, 2026
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 13, 2026
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari
Copy link
Member

lhotari commented Feb 13, 2026

/pulsarbot rerun-failure-checks

@BewareMyPower
Copy link
Contributor Author

@lhotari Do you know why ResourceQuotaCalculatorImplTest now run on Broker Group 2? I checked the previous workflows and found it should run in BROKER_FLAKY workflow. https://github.com/apache/pulsar/actions/runs/21906841628/job/63249299032

Anyway, I pushed a PR to fix this broken test (not flaky): #25247

assertTrue(cache.get(key).get().isEmpty());

store.put(key, "\"value\"".getBytes(StandardCharsets.UTF_8), Optional.empty()).get();
Awaitility.await().pollInterval(Duration.ofMillis(1)).atMost(Duration.ofSeconds(3)).untilAsserted(() ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the test need to wait here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/metadata doc-not-needed Your PR changes do not impact docs release/4.0.9 release/4.1.3 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants