IGNITE-27871 Improve deployment lookup to reduce deploy() contention …#12760
IGNITE-27871 Improve deployment lookup to reduce deploy() contention …#12760oleg-vlsk wants to merge 3 commits intoapache:masterfrom
Conversation
…for locally available tasks with peerClassLoadingEnabled=true
| P2PClassLoadingFailureHandlingTest.class, | ||
| P2PClassLoadingIssuesTest.class | ||
| P2PClassLoadingIssuesTest.class, | ||
| GridDeploymentLocalStoreReuseTest.class |
There was a problem hiding this comment.
Add comma to the end of line please (to reduce conflicts on merge)
| CompletableFuture<T2<UUID, Set<UUID>>> fut = client.compute() | ||
| .withTimeout(timeout). | ||
| <T2<UUID, Set<UUID>>, T2<UUID, Set<UUID>>>executeAsync2(TestTask.class.getName(), null) | ||
| .toCompletableFuture(); | ||
|
|
||
| try { | ||
| fut.get(); |
There was a problem hiding this comment.
client.compute().execute(TestTask.class.getName(), null);
There was a problem hiding this comment.
Used this snippet, thank you.
| List<IgniteInternalFuture<Void>> futs = new ArrayList<>(CLIENT_CNT); | ||
|
|
||
| for (IgniteClient client : clients) | ||
| futs.add(runAsync(() -> executeTasksOnClient(client, EXEC_CNT, 5_000L))); | ||
|
|
||
| waitForAllFutures(futs.toArray(new IgniteInternalFuture[0])); |
There was a problem hiding this comment.
runMultiThreaded(i -> executeTasksOnClient(clients.get(i), EXEC_CNT), CLIENT_CNT, "worker");
There was a problem hiding this comment.
I decided not to go for multi-threaded execution as the perpose of the test is to verify certain behaviour during subsequent executions of the same task. So I ended up using a simple for-loop.
| ClusterNode[] allServerNodes = grid(0).cluster().forServers().nodes().toArray(new ClusterNode[0]); | ||
|
|
||
| for (int i = 0; i < CLIENT_CNT; i++) | ||
| clients.add(startClient(allServerNodes)); |
There was a problem hiding this comment.
You can connect to any server node,it's not necessary to provide all nodes, one is enough, i.e. clients.add(startClient(0));
| /** */ | ||
| private static class DeploymentListeningLogger extends ListeningTestLogger { | ||
| /** */ | ||
| private final ConcurrentLinkedQueue<String> depNotFound = new ConcurrentLinkedQueue<>(); | ||
|
|
||
| /** */ | ||
| public DeploymentListeningLogger(IgniteLogger log) { | ||
| super(log); | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
| @Override public void debug(String msg) { | ||
| if (msg.contains("Deployment was not found for class with specific class loader")) | ||
| depNotFound.add(msg); | ||
|
|
||
| super.debug(msg); | ||
| } | ||
|
|
||
| /** {@inheritDoc} */ | ||
| @Override public ListeningTestLogger getLogger(Object ctgr) { | ||
| return this; | ||
| } | ||
|
|
||
| /** */ | ||
| public List<String> depNotFound() { | ||
| return depNotFound.stream().collect(Collectors.toUnmodifiableList()); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
It's incorrect usage of listening logger, all you need is register listener like:
LogListener lsnr = LogListener.matches(notFoundMsg).times(CLIENT_CNT).build();
listeningTestLog.registerListener(lsnr);
listeningTestLog should be created on top of standard logger, for example:
setLoggerDebugLevel();
listeningTestLog = new ListeningTestLogger(log);
And passed to ignite configuration. No need for logger for each node.
| meta.alias(rsrcName); | ||
| meta.className(clsName); | ||
| meta.senderNodeId(ctx.localNodeId()); | ||
| meta.classLoader(ldr); |
There was a problem hiding this comment.
Setting classloader disables deployment SPI as far as I understand. See https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java#L174
There was a problem hiding this comment.
Moved the local app classloader check to GridDeploymentLocalStore#deployment so that in the initial call the meta does not contains classloader.
| private final ConcurrentMap<String, Deque<GridDeployment>> cache = new ConcurrentHashMap<>(); | ||
|
|
||
| /** Deployment cache by classloader. */ | ||
| private final ConcurrentMap<ClassLoader, Deque<GridDeployment>> cacheByLdr = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
cacheByLdr always used under the lock mux, no ConcurrentMap overhead required here.
Also maybe it worth to use IdentityHashMap in case someone redefine classloader's equals() in a wrong way.
There was a problem hiding this comment.
Done, thank you for the hint.
|
|
||
| dep = d; | ||
| for (GridDeployment d : depsByLdr) { | ||
| if (!d.undeployed() && d.classLoader() == ldr) { |
There was a problem hiding this comment.
If it's undeployed, it's cleaned from cache, how we can find it?
Why do we need to check classloader if we put in cache only items with exactly this classloader?
There was a problem hiding this comment.
Yes, those were 'extra safety' checks. Changed the lookup logic altogether (see below).
| dep = candidate; | ||
| } | ||
| } | ||
| else { |
There was a problem hiding this comment.
Do we still need this check? If deployment not found by classloader in classloader cache it can't be found in aliases cache. We preserve both caches synchronized and modify it only under the lock.
There was a problem hiding this comment.
Removed this else block.
| if (d.classLoader() == ldr) { | ||
| // Cache class and alias. | ||
| fireEvt = d.addDeployedClass(cls, alias); | ||
| Deque<GridDeployment> depsByLdr = cacheByLdr.get(ldr); |
There was a problem hiding this comment.
Looks like it's one-to-one relation for deployment and classloader. Did I miss something?
There was a problem hiding this comment.
There was a problem hiding this comment.
I'm talking about cacheByLdr, not cache. For cacheByLdr it looks like only one deployment is possible for one classloader.
…calStore#deployment, correct cache lookup mechanism in GridDeploymentLocalStore#deploy, simplify GridDeploymentLocalStoreReuseTest#testNoExcessiveLocalDeploymentCacheMisses
| ClassLoader ldr = Thread.currentThread().getContextClassLoader(); | ||
|
|
||
| if (ldr == null) | ||
| ldr = U.resolveClassLoader(ctx.config()); |
There was a problem hiding this comment.
- Let's move ldr initialization outside the loop.
- Just add
|| dep.classLoader() == ldrto theifcondition
| if (d.classLoader() == ldr) { | ||
| // Cache class and alias. | ||
| fireEvt = d.addDeployedClass(cls, alias); | ||
| Deque<GridDeployment> depsByLdr = cacheByLdr.get(ldr); |
There was a problem hiding this comment.
I'm talking about cacheByLdr, not cache. For cacheByLdr it looks like only one deployment is possible for one classloader.
| assertTrue(lsnr0.check(5_000)); | ||
| assertTrue(lsnr1.check(5_000)); |
There was a problem hiding this comment.
Why do we need to wait here? As far as I understand here strict happens-before between task completion and log message.


…for locally available tasks with peerClassLoadingEnabled=true
Thank you for submitting the pull request to the Apache Ignite.
In order to streamline the review of the contribution
we ask you to ensure the following steps have been taken:
The Contribution Checklist
The description explains WHAT and WHY was made instead of HOW.
The following pattern must be used:
IGNITE-XXXX Change summarywhereXXXX- number of JIRA issue.(see the Maintainers list)
the
green visaattached to the JIRA ticket (see TC.Bot: Check PR)Notes
If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com #ignite channel.