-
Notifications
You must be signed in to change notification settings - Fork 478
Adds AssistantManagerThriftRPCs #6284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
30883b0
43b0db5
d497917
9c016f1
8c3d0cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.accumulo.core.rpc.clients; | ||
|
|
||
| import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; | ||
| import static java.util.concurrent.TimeUnit.MILLISECONDS; | ||
|
|
||
| import java.util.ConcurrentModificationException; | ||
|
|
||
| import org.apache.accumulo.core.client.AccumuloException; | ||
| import org.apache.accumulo.core.client.AccumuloSecurityException; | ||
| import org.apache.accumulo.core.client.NamespaceNotFoundException; | ||
| import org.apache.accumulo.core.client.TableNotFoundException; | ||
| import org.apache.accumulo.core.clientImpl.ClientContext; | ||
| import org.apache.accumulo.core.clientImpl.thrift.ThriftConcurrentModificationException; | ||
| import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException; | ||
| import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; | ||
| import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; | ||
| import org.apache.accumulo.core.manager.thrift.AssistantManagerClientService.Client; | ||
| import org.apache.accumulo.core.rpc.ThriftUtil; | ||
| import org.apache.thrift.transport.TTransportException; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class AssistantManagerThriftClient extends ThriftClientTypes<Client> | ||
| implements ManagerClient<Client> { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(AssistantManagerThriftClient.class); | ||
|
|
||
| AssistantManagerThriftClient(String serviceName) { | ||
| super(serviceName, new Client.Factory()); | ||
| } | ||
|
|
||
| @Override | ||
| public Client getConnection(ClientContext context) { | ||
| return getManagerConnection(LOG, this, context); | ||
| } | ||
|
|
||
| public <R> R executeTableCommand(ClientContext context, Exec<R,Client> exec) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this code copied from MangerThriftClient? If so would be good to push to the super type and refactor for any specialization needed.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May need to create a new super type that the primary and assistant manager share, not sure. See that a fate client also extends the base type and it may not need some of these methods. |
||
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | ||
| Client client = null; | ||
| while (true) { | ||
| try { | ||
| client = getConnectionWithRetry(context); | ||
| return exec.execute(client); | ||
| } catch (TTransportException tte) { | ||
| LOG.debug("ManagerClient request failed, retrying ... ", tte); | ||
| sleepUninterruptibly(100, MILLISECONDS); | ||
| } catch (ThriftSecurityException e) { | ||
| throw new AccumuloSecurityException(e.user, e.code, e); | ||
| } catch (ThriftTableOperationException e) { | ||
| switch (e.getType()) { | ||
| case NAMESPACE_NOTFOUND: | ||
| throw new TableNotFoundException(e.getTableName(), new NamespaceNotFoundException(e)); | ||
| case NOTFOUND: | ||
| throw new TableNotFoundException(e); | ||
| default: | ||
| throw new AccumuloException(e); | ||
| } | ||
| } catch (ThriftNotActiveServiceException e) { | ||
| // Let it loop, fetching a new location | ||
| LOG.debug("Contacted a Manager which is no longer active, retrying"); | ||
| sleepUninterruptibly(100, MILLISECONDS); | ||
| } catch (ThriftConcurrentModificationException e) { | ||
| throw new ConcurrentModificationException(e.getMessage(), e); | ||
| } catch (Exception e) { | ||
| throw new AccumuloException(e); | ||
| } finally { | ||
| if (client != null) { | ||
| ThriftUtil.close(client, context); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public <R> R execute(ClientContext context, Exec<R,Client> exec) | ||
| throws AccumuloException, AccumuloSecurityException { | ||
| try { | ||
| return executeTableCommand(context, exec); | ||
| } catch (TableNotFoundException e) { | ||
| throw new AssertionError(e); | ||
| } | ||
| } | ||
|
|
||
| public void executeVoidTableCommand(ClientContext context, ExecVoid<Client> exec) | ||
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | ||
| Client client = null; | ||
| while (true) { | ||
| try { | ||
| client = getConnectionWithRetry(context); | ||
| exec.execute(client); | ||
| return; | ||
| } catch (TTransportException tte) { | ||
| LOG.debug("ManagerClient request failed, retrying ... ", tte); | ||
| sleepUninterruptibly(100, MILLISECONDS); | ||
| } catch (ThriftSecurityException e) { | ||
| throw new AccumuloSecurityException(e.user, e.code, e); | ||
| } catch (ThriftTableOperationException e) { | ||
| switch (e.getType()) { | ||
| case NAMESPACE_NOTFOUND: | ||
| throw new TableNotFoundException(e.getTableName(), new NamespaceNotFoundException(e)); | ||
| case NOTFOUND: | ||
| throw new TableNotFoundException(e); | ||
| default: | ||
| throw new AccumuloException(e); | ||
| } | ||
| } catch (ThriftNotActiveServiceException e) { | ||
| // Let it loop, fetching a new location | ||
| LOG.debug("Contacted a Manager which is no longer active, retrying"); | ||
| sleepUninterruptibly(100, MILLISECONDS); | ||
| } catch (ThriftConcurrentModificationException e) { | ||
| throw new ConcurrentModificationException(e.getMessage(), e); | ||
| } catch (Exception e) { | ||
| throw new AccumuloException(e); | ||
| } finally { | ||
| if (client != null) { | ||
| ThriftUtil.close(client, context); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void executeVoid(ClientContext context, ExecVoid<Client> exec) | ||
| throws AccumuloException, AccumuloSecurityException { | ||
| try { | ||
| executeVoidTableCommand(context, exec); | ||
| } catch (TableNotFoundException e) { | ||
| throw new AccumuloException(e); | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,7 @@ | |
| import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException; | ||
| import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; | ||
| import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; | ||
| import org.apache.accumulo.core.manager.thrift.ManagerClientService.Client; | ||
| import org.apache.accumulo.core.manager.thrift.PrimaryManagerClientService.Client; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be good to rename this class from ManagerThriftClient to PrimaryManagerThriftClient |
||
| import org.apache.accumulo.core.rpc.ThriftUtil; | ||
| import org.apache.thrift.transport.TTransportException; | ||
| import org.slf4j.Logger; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this will always connect to the primary manager. Need to call [this code] (
accumulo/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
Line 420 in 97b5c81
For picking one I have been considering using hashing for #6279 to better spread the connections. If we randomly pick one and we C clients then every assistant manager can have up to C connections or more. If we hash clients to AM assistant managers then each one will have on the order of C/AM connections. This could also help w/ debugging where a client goes to the same manager consistently. However not sure what to hash on in the client. For #6279 was thinking of doing hash mod using the compactor address. To start with it would probably be best to just randomly pick one though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we could tie in TServerClient.getThriftServerConnection somehow. Then the debug property would apply to Manager connections as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@keith-turner I saw the code in FateManager, but the FateThriftClient still used the default getConnection method from ManagerClient.
Should
FateThriftClientalso be updated to select from the assistant Managers?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can not do that now, fate meta client is only available on the primary manager. After #6224,#6227, and #6232 are merged then both fate clients (meta and user) would be available on all managers. After those are merged could refactor the client code that initiates fate table ops to use any manger.
The code in FateThriftClient is for clients that interact with creating and waiting on fate operations. The code in FateManager is distributing execution of fate operations across managers. So what we currently have is fate operations can execute on all managers, but accumulo clients must still work w/ the primary manager to initiate and wait on fate operations. After #6232 is merged then accumulo clients could interact w/ any manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay! I opened #6291 for simplifying the thrift client code a bit.