Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
Expand Down Expand Up @@ -300,6 +301,11 @@ public void mutateRow(
.build());
responseObserver.onCompleted();
return;
} catch (StatusRuntimeException e) {
responseObserver.onNext(
MutateRowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
responseObserver.onCompleted();
return;
}

responseObserver.onNext(
Expand Down Expand Up @@ -354,10 +360,16 @@ public void bulkMutateRows(
.build());
responseObserver.onCompleted();
return;
} catch (StatusRuntimeException e) {
responseObserver.onNext(
MutateRowsResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
responseObserver.onCompleted();
return;
}

responseObserver.onNext(
MutateRowsResult.newBuilder()
// Note that the default instance == OK
.setStatus(com.google.rpc.Status.getDefaultInstance())
.build());
responseObserver.onCompleted();
Expand Down Expand Up @@ -388,6 +400,14 @@ public void readRow(ReadRowRequest request, StreamObserver<RowResult> responseOb
client
.dataClient()
.readRow(tableId, request.getRowKey(), FILTERS.fromProto(request.getFilter()));
if (row != null) {
RowResult.Builder resultBuilder = convertRowResult(row);
responseObserver.onNext(
// Note that the default instance == OK
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
} else {
logger.info(String.format("readRow() did not find row: %s", request.getRowKey()));
}
} catch (ApiException e) {
responseObserver.onNext(
RowResult.newBuilder()
Expand All @@ -399,30 +419,23 @@ public void readRow(ReadRowRequest request, StreamObserver<RowResult> responseOb
.build());
responseObserver.onCompleted();
return;
}

if (row != null) {
try {
RowResult.Builder resultBuilder = convertRowResult(row);
responseObserver.onNext(
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
} catch (RuntimeException e) {
// If client encounters problem, don't return any row result.
responseObserver.onNext(
RowResult.newBuilder()
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(e.getMessage())
.build())
.build());
responseObserver.onCompleted();
return;
}
} else {
logger.info(String.format("readRow() did not find row: %s", request.getRowKey()));
} catch (StatusRuntimeException e) {
responseObserver.onNext(
RowResult.newBuilder().setStatus(com.google.rpc.Status.getDefaultInstance()).build());
RowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
responseObserver.onCompleted();
return;
} catch (RuntimeException e) {
// If client encounters problem, don't return any row result.
responseObserver.onNext(
RowResult.newBuilder()
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(e.getMessage())
.build())
.build());
responseObserver.onCompleted();
return;
}
responseObserver.onCompleted();
}
Expand All @@ -441,6 +454,11 @@ public void readRows(ReadRowsRequest request, StreamObserver<RowsResult> respons
Query query = Query.fromProto(request.getRequest());
try {
rows = client.dataClient().readRows(query);
int cancelAfterRows = request.getCancelAfterRows();
RowsResult.Builder resultBuilder = convertRowsResult(rows, cancelAfterRows);
responseObserver.onNext(
// Note that the default instance == OK
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
} catch (ApiException e) {
responseObserver.onNext(
RowsResult.newBuilder()
Expand All @@ -452,13 +470,11 @@ public void readRows(ReadRowsRequest request, StreamObserver<RowsResult> respons
.build());
responseObserver.onCompleted();
return;
}

int cancelAfterRows = request.getCancelAfterRows();
try {
RowsResult.Builder resultBuilder = convertRowsResult(rows, cancelAfterRows);
} catch (StatusRuntimeException e) {
responseObserver.onNext(
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
RowsResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
responseObserver.onCompleted();
return;
} catch (RuntimeException e) {
// If client encounters problem, don't return any row result.
responseObserver.onNext(
Expand Down Expand Up @@ -578,6 +594,11 @@ public void sampleRowKeys(
.build());
responseObserver.onCompleted();
return;
} catch (StatusRuntimeException e) {
responseObserver.onNext(
SampleRowKeysResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
responseObserver.onCompleted();
return;
}

SampleRowKeysResult.Builder resultBuilder = SampleRowKeysResult.newBuilder();
Expand All @@ -588,6 +609,7 @@ public void sampleRowKeys(
.setOffsetBytes(keyOffset.getOffsetBytes());
}
responseObserver.onNext(
// Note that the default instance == OK
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
responseObserver.onCompleted();
}
Expand Down Expand Up @@ -618,11 +640,17 @@ public void checkAndMutateRow(
.build());
responseObserver.onCompleted();
return;
} catch (StatusRuntimeException e) {
responseObserver.onNext(
CheckAndMutateRowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
responseObserver.onCompleted();
return;
}

CheckAndMutateRowResult.Builder resultBuilder = CheckAndMutateRowResult.newBuilder();
resultBuilder.getResultBuilder().setPredicateMatched(matched);
responseObserver.onNext(
// Note that the default instance == OK
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
responseObserver.onCompleted();
}
Expand All @@ -642,6 +670,16 @@ public void readModifyWriteRow(
ReadModifyWriteRow mutation = ReadModifyWriteRow.fromProto(request.getRequest());
try {
row = client.dataClient().readModifyWriteRow(mutation);
if (row != null) {
RowResult.Builder resultBuilder = convertRowResult(row);
responseObserver.onNext(
// Note that the default instance == OK
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
} else {
logger.info(
String.format(
"readModifyWriteRow() did not find row: %s", request.getRequest().getRowKey()));
}
} catch (ApiException e) {
responseObserver.onNext(
RowResult.newBuilder()
Expand All @@ -653,32 +691,23 @@ public void readModifyWriteRow(
.build());
responseObserver.onCompleted();
return;
}

if (row != null) {
try {
RowResult.Builder resultBuilder = convertRowResult(row);
responseObserver.onNext(
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
} catch (RuntimeException e) {
// If client encounters problem, fail the whole operation.
responseObserver.onNext(
RowResult.newBuilder()
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(e.getMessage())
.build())
.build());
responseObserver.onCompleted();
return;
}
} else {
logger.info(
String.format(
"readModifyWriteRow() did not find row: %s", request.getRequest().getRowKey()));
} catch (StatusRuntimeException e) {
responseObserver.onNext(
RowResult.newBuilder().setStatus(com.google.rpc.Status.getDefaultInstance()).build());
RowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
responseObserver.onCompleted();
return;
} catch (RuntimeException e) {
// If client encounters problem, fail the whole operation.
responseObserver.onNext(
RowResult.newBuilder()
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(e.getMessage())
.build())
.build());
responseObserver.onCompleted();
return;
}
responseObserver.onCompleted();
}
Expand Down Expand Up @@ -727,13 +756,7 @@ public void executeQuery(
return;
} catch (StatusRuntimeException e) {
responseObserver.onNext(
ExecuteQueryResult.newBuilder()
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(e.getStatus().getCode().value())
.setMessage(e.getStatus().getDescription())
.build())
.build());
ExecuteQueryResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
responseObserver.onCompleted();
return;
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ public static void main(String[] args) throws InterruptedException, IOException

CbtTestProxy cbtTestProxy = CbtTestProxy.create();
logger.info(String.format("Test proxy starting on %d", port));
ServerBuilder.forPort(port).addService(cbtTestProxy).build().start().awaitTermination();
ServerBuilder.forPort(port)
.addService(cbtTestProxy)
.maxInboundMessageSize(Integer.MAX_VALUE)
.maxInboundMetadataSize(Integer.MAX_VALUE)
.build()
.start()
.awaitTermination();
}
}
Loading