diff --git a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java index 122bc51439..759f1de867 100644 --- a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java +++ b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java @@ -59,6 +59,7 @@ import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import java.io.ByteArrayInputStream; import java.io.Closeable; @@ -300,6 +301,11 @@ public void mutateRow( .build()); responseObserver.onCompleted(); return; + } catch (StatusRuntimeException e) { + responseObserver.onNext( + MutateRowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build()); + responseObserver.onCompleted(); + return; } responseObserver.onNext( @@ -354,10 +360,16 @@ public void bulkMutateRows( .build()); responseObserver.onCompleted(); return; + } catch (StatusRuntimeException e) { + responseObserver.onNext( + MutateRowsResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build()); + responseObserver.onCompleted(); + return; } responseObserver.onNext( MutateRowsResult.newBuilder() + // Note that the default instance == OK .setStatus(com.google.rpc.Status.getDefaultInstance()) .build()); responseObserver.onCompleted(); @@ -388,6 +400,14 @@ public void readRow(ReadRowRequest request, StreamObserver responseOb client .dataClient() .readRow(tableId, request.getRowKey(), FILTERS.fromProto(request.getFilter())); + if (row != null) { + RowResult.Builder resultBuilder = convertRowResult(row); + responseObserver.onNext( + // Note that the default instance == OK + resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + } else { + logger.info(String.format("readRow() did not find row: %s", request.getRowKey())); + } } catch (ApiException e) { responseObserver.onNext( RowResult.newBuilder() @@ -399,30 +419,23 @@ public void readRow(ReadRowRequest request, StreamObserver responseOb .build()); responseObserver.onCompleted(); return; - } - - if (row != null) { - try { - RowResult.Builder resultBuilder = convertRowResult(row); - responseObserver.onNext( - resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); - } catch (RuntimeException e) { - // If client encounters problem, don't return any row result. - responseObserver.onNext( - RowResult.newBuilder() - .setStatus( - com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage(e.getMessage()) - .build()) - .build()); - responseObserver.onCompleted(); - return; - } - } else { - logger.info(String.format("readRow() did not find row: %s", request.getRowKey())); + } catch (StatusRuntimeException e) { responseObserver.onNext( - RowResult.newBuilder().setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + RowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build()); + responseObserver.onCompleted(); + return; + } catch (RuntimeException e) { + // If client encounters problem, don't return any row result. + responseObserver.onNext( + RowResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; } responseObserver.onCompleted(); } @@ -441,6 +454,11 @@ public void readRows(ReadRowsRequest request, StreamObserver respons Query query = Query.fromProto(request.getRequest()); try { rows = client.dataClient().readRows(query); + int cancelAfterRows = request.getCancelAfterRows(); + RowsResult.Builder resultBuilder = convertRowsResult(rows, cancelAfterRows); + responseObserver.onNext( + // Note that the default instance == OK + resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); } catch (ApiException e) { responseObserver.onNext( RowsResult.newBuilder() @@ -452,13 +470,11 @@ public void readRows(ReadRowsRequest request, StreamObserver respons .build()); responseObserver.onCompleted(); return; - } - - int cancelAfterRows = request.getCancelAfterRows(); - try { - RowsResult.Builder resultBuilder = convertRowsResult(rows, cancelAfterRows); + } catch (StatusRuntimeException e) { responseObserver.onNext( - resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + RowsResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build()); + responseObserver.onCompleted(); + return; } catch (RuntimeException e) { // If client encounters problem, don't return any row result. responseObserver.onNext( @@ -578,6 +594,11 @@ public void sampleRowKeys( .build()); responseObserver.onCompleted(); return; + } catch (StatusRuntimeException e) { + responseObserver.onNext( + SampleRowKeysResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build()); + responseObserver.onCompleted(); + return; } SampleRowKeysResult.Builder resultBuilder = SampleRowKeysResult.newBuilder(); @@ -588,6 +609,7 @@ public void sampleRowKeys( .setOffsetBytes(keyOffset.getOffsetBytes()); } responseObserver.onNext( + // Note that the default instance == OK resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); responseObserver.onCompleted(); } @@ -618,11 +640,17 @@ public void checkAndMutateRow( .build()); responseObserver.onCompleted(); return; + } catch (StatusRuntimeException e) { + responseObserver.onNext( + CheckAndMutateRowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build()); + responseObserver.onCompleted(); + return; } CheckAndMutateRowResult.Builder resultBuilder = CheckAndMutateRowResult.newBuilder(); resultBuilder.getResultBuilder().setPredicateMatched(matched); responseObserver.onNext( + // Note that the default instance == OK resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); responseObserver.onCompleted(); } @@ -642,6 +670,16 @@ public void readModifyWriteRow( ReadModifyWriteRow mutation = ReadModifyWriteRow.fromProto(request.getRequest()); try { row = client.dataClient().readModifyWriteRow(mutation); + if (row != null) { + RowResult.Builder resultBuilder = convertRowResult(row); + responseObserver.onNext( + // Note that the default instance == OK + resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + } else { + logger.info( + String.format( + "readModifyWriteRow() did not find row: %s", request.getRequest().getRowKey())); + } } catch (ApiException e) { responseObserver.onNext( RowResult.newBuilder() @@ -653,32 +691,23 @@ public void readModifyWriteRow( .build()); responseObserver.onCompleted(); return; - } - - if (row != null) { - try { - RowResult.Builder resultBuilder = convertRowResult(row); - responseObserver.onNext( - resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); - } catch (RuntimeException e) { - // If client encounters problem, fail the whole operation. - responseObserver.onNext( - RowResult.newBuilder() - .setStatus( - com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage(e.getMessage()) - .build()) - .build()); - responseObserver.onCompleted(); - return; - } - } else { - logger.info( - String.format( - "readModifyWriteRow() did not find row: %s", request.getRequest().getRowKey())); + } catch (StatusRuntimeException e) { responseObserver.onNext( - RowResult.newBuilder().setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + RowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build()); + responseObserver.onCompleted(); + return; + } catch (RuntimeException e) { + // If client encounters problem, fail the whole operation. + responseObserver.onNext( + RowResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; } responseObserver.onCompleted(); } @@ -727,13 +756,7 @@ public void executeQuery( return; } catch (StatusRuntimeException e) { responseObserver.onNext( - ExecuteQueryResult.newBuilder() - .setStatus( - com.google.rpc.Status.newBuilder() - .setCode(e.getStatus().getCode().value()) - .setMessage(e.getStatus().getDescription()) - .build()) - .build()); + ExecuteQueryResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build()); responseObserver.onCompleted(); return; } catch (RuntimeException e) { diff --git a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxyMain.java b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxyMain.java index f817197d14..77a764f551 100644 --- a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxyMain.java +++ b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxyMain.java @@ -34,6 +34,12 @@ public static void main(String[] args) throws InterruptedException, IOException CbtTestProxy cbtTestProxy = CbtTestProxy.create(); logger.info(String.format("Test proxy starting on %d", port)); - ServerBuilder.forPort(port).addService(cbtTestProxy).build().start().awaitTermination(); + ServerBuilder.forPort(port) + .addService(cbtTestProxy) + .maxInboundMessageSize(Integer.MAX_VALUE) + .maxInboundMetadataSize(Integer.MAX_VALUE) + .build() + .start() + .awaitTermination(); } }