Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void setProperty(final String property, final String value)
log.warn("{} was deprecated and will be removed in a future release;"
+ " setting its replacement {} instead", property, replacement);
});
ThriftClientTypes.MANAGER.executeVoid(context, client -> client
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context, client -> client
.setSystemProperty(TraceUtil.traceInfo(), context.rpcCreds(), property, value));
checkLocalityGroups(property);
}
Expand Down Expand Up @@ -138,7 +138,7 @@ private Map<String,String> tryToModifyProperties(final Consumer<Map<String,Strin
}

// Send to server
ThriftClientTypes.MANAGER.executeVoid(context, client -> client
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context, client -> client
.modifySystemProperties(TraceUtil.traceInfo(), context.rpcCreds(), vProperties));

return vProperties.getProperties();
Expand Down Expand Up @@ -184,7 +184,7 @@ public void removeProperty(final String property)
log.warn("{} was deprecated and will be removed in a future release; assuming user meant"
+ " its replacement {} and will remove that instead", property, replacement);
});
ThriftClientTypes.MANAGER.executeVoid(context,
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context,
client -> client.removeSystemProperty(TraceUtil.traceInfo(), context.rpcCreds(), property));
checkLocalityGroups(property);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void setProperty(final String namespace, final String property, final Str
checkArgument(value != null, "value is null");

try {
ThriftClientTypes.MANAGER.executeVoidTableCommand(context,
ThriftClientTypes.ASSISTANT_MANAGER.executeVoidTableCommand(context,
client -> client.setNamespaceProperty(TraceUtil.traceInfo(), context.rpcCreds(),
namespace, property, value));
} catch (TableNotFoundException e) {
Expand Down Expand Up @@ -218,7 +218,7 @@ private Map<String,String> tryToModifyProperties(final String namespace,

try {
// Send to server
ThriftClientTypes.MANAGER.executeVoidTableCommand(context,
ThriftClientTypes.ASSISTANT_MANAGER.executeVoidTableCommand(context,
client -> client.modifyNamespaceProperties(TraceUtil.traceInfo(), context.rpcCreds(),
namespace, vProperties));

Expand Down Expand Up @@ -273,7 +273,7 @@ public void removeProperty(final String namespace, final String property)
EXISTING_NAMESPACE_NAME.validate(namespace);
checkArgument(property != null, "property is null");
try {
ThriftClientTypes.MANAGER.executeVoidTableCommand(context, client -> client
ThriftClientTypes.ASSISTANT_MANAGER.executeVoidTableCommand(context, client -> client
.removeNamespaceProperty(TraceUtil.traceInfo(), context.rpcCreds(), namespace, property));
} catch (TableNotFoundException e) {
if (e.getCause() instanceof NamespaceNotFoundException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Set<ResourceGroupId> list() {
@Override
public void create(ResourceGroupId group) throws AccumuloException, AccumuloSecurityException {
checkArgument(group != null, "group argument must be supplied");
ThriftClientTypes.MANAGER.executeVoid(context, client -> client
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context, client -> client
.createResourceGroupNode(TraceUtil.traceInfo(), context.rpcCreds(), group.canonical()));
}

Expand Down Expand Up @@ -116,7 +116,7 @@ public void setProperty(ResourceGroupId group, String property, String value)
+ " setting its replacement {} instead", property, replacement);
});
try {
ThriftClientTypes.MANAGER.executeVoid(context,
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context,
client -> client.setResourceGroupProperty(TraceUtil.traceInfo(), context.rpcCreds(),
group.canonical(), property, value));
} catch (AccumuloException | AccumuloSecurityException e) {
Expand Down Expand Up @@ -168,7 +168,7 @@ private Map<String,String> tryToModifyProperties(final ResourceGroupId group,

// Send to server
try {
ThriftClientTypes.MANAGER.executeVoid(context,
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context,
client -> client.modifyResourceGroupProperties(TraceUtil.traceInfo(), context.rpcCreds(),
group.canonical(), vProperties));
} catch (AccumuloException | AccumuloSecurityException e) {
Expand Down Expand Up @@ -228,7 +228,7 @@ public void removeProperty(ResourceGroupId group, String property)
+ " its replacement {} and will remove that instead", property, replacement);
});
try {
ThriftClientTypes.MANAGER.executeVoid(context,
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context,
client -> client.removeResourceGroupProperty(TraceUtil.traceInfo(), context.rpcCreds(),
group.canonical(), property));
} catch (AccumuloException | AccumuloSecurityException e) {
Expand All @@ -246,7 +246,7 @@ public void remove(ResourceGroupId group)
throws AccumuloException, AccumuloSecurityException, ResourceGroupNotFoundException {
checkArgument(group != null, "group argument must be supplied");
try {
ThriftClientTypes.MANAGER.executeVoid(context, client -> client
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context, client -> client
.removeResourceGroupNode(TraceUtil.traceInfo(), context.rpcCreds(), group.canonical()));
} catch (AccumuloException | AccumuloSecurityException e) {
Throwable t = e.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.manager.thrift.AssistantManagerClientService;
import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
import org.apache.accumulo.core.manager.thrift.TFateId;
import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
Expand Down Expand Up @@ -975,9 +975,9 @@ private void _flush(TableId tableId, Text start, Text end, boolean wait)
// so pass the tableid to both calls

while (true) {
ManagerClientService.Client client = null;
AssistantManagerClientService.Client client = null;
try {
client = ThriftClientTypes.MANAGER.getConnectionWithRetry(context);
client = ThriftClientTypes.ASSISTANT_MANAGER.getConnectionWithRetry(context);
flushID =
client.initiateFlush(TraceUtil.traceInfo(), context.rpcCreds(), tableId.canonical());
break;
Expand All @@ -994,9 +994,9 @@ private void _flush(TableId tableId, Text start, Text end, boolean wait)
}

while (true) {
ManagerClientService.Client client = null;
AssistantManagerClientService.Client client = null;
try {
client = ThriftClientTypes.MANAGER.getConnectionWithRetry(context);
client = ThriftClientTypes.ASSISTANT_MANAGER.getConnectionWithRetry(context);
client.waitForFlush(TraceUtil.traceInfo(), context.rpcCreds(), tableId.canonical(),
TextUtil.getByteBuffer(start), TextUtil.getByteBuffer(end), flushID,
wait ? Long.MAX_VALUE : 1);
Expand Down Expand Up @@ -1059,7 +1059,7 @@ private Map<String,String> tryToModifyProperties(String tableName,

try {
// Send to server
ThriftClientTypes.MANAGER.executeVoid(context,
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context,
client -> client.modifyTableProperties(TraceUtil.traceInfo(), context.rpcCreds(),
tableName, vProperties));
for (String property : vProperties.getProperties().keySet()) {
Expand Down Expand Up @@ -1126,7 +1126,7 @@ private Map<String,String> modifyPropertiesUnwrapped(String tableName,
private void setPropertyNoChecks(final String tableName, final String property,
final String value)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
ThriftClientTypes.MANAGER.executeVoid(context, client -> client
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context, client -> client
.setTableProperty(TraceUtil.traceInfo(), context.rpcCreds(), tableName, property, value));
}

Expand All @@ -1147,7 +1147,7 @@ public void removeProperty(final String tableName, final String property)

private void removePropertyNoChecks(final String tableName, final String property)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
ThriftClientTypes.MANAGER.executeVoid(context, client -> client
ThriftClientTypes.ASSISTANT_MANAGER.executeVoid(context, client -> client
.removeTableProperty(TraceUtil.traceInfo(), context.rpcCreds(), tableName, property));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.rpc.clients;

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.ConcurrentModificationException;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.NamespaceNotFoundException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.ThriftConcurrentModificationException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.manager.thrift.AssistantManagerClientService.Client;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AssistantManagerThriftClient extends ThriftClientTypes<Client>
implements ManagerClient<Client> {

private static final Logger LOG = LoggerFactory.getLogger(AssistantManagerThriftClient.class);

AssistantManagerThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
}

@Override
public Client getConnection(ClientContext context) {
return getManagerConnection(LOG, this, context);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this will always connect to the primary manager. Need to call [this code] (

var workers = context.getServerPaths().getAssistantManagers(AddressSelector.all(), true);
) to get a list of assistant managers and pick one.

For picking one I have been considering using hashing for #6279 to better spread the connections. If we randomly pick one and we C clients then every assistant manager can have up to C connections or more. If we hash clients to AM assistant managers then each one will have on the order of C/AM connections. This could also help w/ debugging where a client goes to the same manager consistently. However not sure what to hash on in the client. For #6279 was thinking of doing hash mod using the compactor address. To start with it would probably be best to just randomly pick one though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could tie in TServerClient.getThriftServerConnection somehow. Then the debug property would apply to Manager connections as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keith-turner I saw the code in FateManager, but the FateThriftClient still used the default getConnection method from ManagerClient.

Should FateThriftClient also be updated to select from the assistant Managers?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should FateThriftClient also be updated to select from the assistant Managers?

Can not do that now, fate meta client is only available on the primary manager. After #6224,#6227, and #6232 are merged then both fate clients (meta and user) would be available on all managers. After those are merged could refactor the client code that initiates fate table ops to use any manger.

@keith-turner I saw the code in FateManager, but the FateThriftClient still used the default getConnection method from ManagerClient.

The code in FateThriftClient is for clients that interact with creating and waiting on fate operations. The code in FateManager is distributing execution of fate operations across managers. So what we currently have is fate operations can execute on all managers, but accumulo clients must still work w/ the primary manager to initiate and wait on fate operations. After #6232 is merged then accumulo clients could interact w/ any manager.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay! I opened #6291 for simplifying the thrift client code a bit.

}

public <R> R executeTableCommand(ClientContext context, Exec<R,Client> exec)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this code copied from MangerThriftClient? If so would be good to push to the super type and refactor for any specialization needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need to create a new super type that the primary and assistant manager share, not sure. See that a fate client also extends the base type and it may not need some of these methods.

throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Client client = null;
while (true) {
try {
client = getConnectionWithRetry(context);
return exec.execute(client);
} catch (TTransportException tte) {
LOG.debug("ManagerClient request failed, retrying ... ", tte);
sleepUninterruptibly(100, MILLISECONDS);
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (ThriftTableOperationException e) {
switch (e.getType()) {
case NAMESPACE_NOTFOUND:
throw new TableNotFoundException(e.getTableName(), new NamespaceNotFoundException(e));
case NOTFOUND:
throw new TableNotFoundException(e);
default:
throw new AccumuloException(e);
}
} catch (ThriftNotActiveServiceException e) {
// Let it loop, fetching a new location
LOG.debug("Contacted a Manager which is no longer active, retrying");
sleepUninterruptibly(100, MILLISECONDS);
} catch (ThriftConcurrentModificationException e) {
throw new ConcurrentModificationException(e.getMessage(), e);
} catch (Exception e) {
throw new AccumuloException(e);
} finally {
if (client != null) {
ThriftUtil.close(client, context);
}
}
}
}

@Override
public <R> R execute(ClientContext context, Exec<R,Client> exec)
throws AccumuloException, AccumuloSecurityException {
try {
return executeTableCommand(context, exec);
} catch (TableNotFoundException e) {
throw new AssertionError(e);
}
}

public void executeVoidTableCommand(ClientContext context, ExecVoid<Client> exec)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Client client = null;
while (true) {
try {
client = getConnectionWithRetry(context);
exec.execute(client);
return;
} catch (TTransportException tte) {
LOG.debug("ManagerClient request failed, retrying ... ", tte);
sleepUninterruptibly(100, MILLISECONDS);
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (ThriftTableOperationException e) {
switch (e.getType()) {
case NAMESPACE_NOTFOUND:
throw new TableNotFoundException(e.getTableName(), new NamespaceNotFoundException(e));
case NOTFOUND:
throw new TableNotFoundException(e);
default:
throw new AccumuloException(e);
}
} catch (ThriftNotActiveServiceException e) {
// Let it loop, fetching a new location
LOG.debug("Contacted a Manager which is no longer active, retrying");
sleepUninterruptibly(100, MILLISECONDS);
} catch (ThriftConcurrentModificationException e) {
throw new ConcurrentModificationException(e.getMessage(), e);
} catch (Exception e) {
throw new AccumuloException(e);
} finally {
if (client != null) {
ThriftUtil.close(client, context);
}
}
}
}

@Override
public void executeVoid(ClientContext context, ExecVoid<Client> exec)
throws AccumuloException, AccumuloSecurityException {
try {
executeVoidTableCommand(context, exec);
} catch (TableNotFoundException e) {
throw new AccumuloException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.manager.thrift.ManagerClientService.Client;
import org.apache.accumulo.core.manager.thrift.PrimaryManagerClientService.Client;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to rename this class from ManagerThriftClient to PrimaryManagerThriftClient

import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public abstract class ThriftClientTypes<C extends TServiceClient> {

public static final ManagerThriftClient MANAGER = new ManagerThriftClient("mgr");

public static final AssistantManagerThriftClient ASSISTANT_MANAGER =
new AssistantManagerThriftClient("asst_mgr");

public static final TabletServerThriftClient TABLET_SERVER =
new TabletServerThriftClient("tserver");

Expand Down
Loading