diff --git a/impl/grpc/pom.xml b/impl/grpc/pom.xml new file mode 100644 index 00000000..f6583ac9 --- /dev/null +++ b/impl/grpc/pom.xml @@ -0,0 +1,37 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-grpc + Serverless Workflow :: Impl :: gRPC + + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + io.grpc + grpc-stub + + + com.google.protobuf + protobuf-java-util + + + io.serverlessworkflow + serverlessworkflow-impl-json + + + com.github.os72 + protoc-jar + + + io.grpc + grpc-protobuf + + + \ No newline at end of file diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java new file mode 100644 index 00000000..cd1c51ef --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.Message; +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import java.util.concurrent.CompletableFuture; + +class CollectionStreamObserver implements StreamObserver { + private final WorkflowModelCollection modelCollection; + private final WorkflowModelFactory modelFactory; + private final CompletableFuture future; + + public CollectionStreamObserver(WorkflowModelFactory modelFactory) { + this.modelCollection = modelFactory.createCollection(); + this.modelFactory = modelFactory; + this.future = new CompletableFuture<>(); + } + + @Override + public void onNext(Message value) { + modelCollection.add(ProtobufMessageUtils.convert(value, modelFactory)); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + future.complete(modelCollection); + } + + public CompletableFuture future() { + return future; + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java new file mode 100644 index 00000000..158fab51 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.DescriptorProtos; + +record FileDescriptorContext( + DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java new file mode 100644 index 00000000..f2123e0e --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java @@ -0,0 +1,157 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.github.os72.protocjar.Protoc; +import com.google.protobuf.DescriptorProtos; +import io.serverlessworkflow.impl.resources.ExternalResourceHandler; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class FileDescriptorReader { + + private static final Logger logger = LoggerFactory.getLogger(FileDescriptorReader.class); + + static FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) { + Path grpcDir = + tryCreateTempGrpcDir() + .orElseThrow( + () -> new IllegalStateException("Could not create temporary gRPC directory")); + + try (InputStream inputStream = externalResourceHandler.open()) { + + Path protoFile = grpcDir.resolve(externalResourceHandler.name()); + if (!Files.exists(protoFile)) { + Files.createDirectories(protoFile); + } + + Files.copy(inputStream, protoFile, StandardCopyOption.REPLACE_EXISTING); + + Path descriptorOutput = grpcDir.resolve("descriptor.protobin"); + + try { + + generateFileDescriptor(grpcDir, protoFile, descriptorOutput); + + DescriptorProtos.FileDescriptorSet fileDescriptorSet = + DescriptorProtos.FileDescriptorSet.newBuilder() + .mergeFrom(Files.readAllBytes(descriptorOutput)) + .build(); + + return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name()); + + } catch (IOException e) { + throw new UncheckedIOException( + "Unable to read external resource handler: " + externalResourceHandler.name(), e); + } + } catch (IOException e) { + throw new UncheckedIOException("Unable to read descriptor file", e); + } + } + + private static Optional tryCreateTempGrpcDir() { + try { + return Optional.of(Files.createTempDirectory("serverless-workflow-")); + } catch (IOException e) { + throw new UncheckedIOException("Error while creating temporary gRPC directory", e); + } + } + + /** + * Calls protoc binary with --descriptor_set_out= option set. First attempts to use + * the embedded protoc from protoc-jar library. If that fails with FileNotFoundException + * (unsupported architecture), falls back to using the system's installed protoc via + * ProcessBuilder. + * + * @param grpcDir a temporary directory + * @param protoFile the .proto file used by protoc to generate the file descriptor + * @param descriptorOutput the output directory where the descriptor file will be generated + */ + private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) { + String[] protocArgs = + new String[] { + "--include_imports", + "--descriptor_set_out=" + descriptorOutput.toAbsolutePath(), + "-I", + grpcDir.toAbsolutePath().toString(), + protoFile.toAbsolutePath().toString() + }; + + try { + // First attempt: use protoc-jar library + int status = Protoc.runProtoc(protocArgs); + + if (status != 0) { + throw new RuntimeException( + "Unable to generate file descriptor, 'protoc' execution failed with status " + status); + } + } catch (FileNotFoundException e) { + // Fallback: try using system's installed protoc + logger.warn( + "Protoc binary not available for this architecture via protoc-jar. " + + "Attempting to use system-installed protoc..."); + generateFileDescriptorWithProcessBuilder(protocArgs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to generate file descriptor", e); + } catch (IOException e) { + throw new UncheckedIOException("Unable to generate file descriptor", e); + } + } + + /** + * Fallback method to generate file descriptor using system's installed protoc via ProcessBuilder. + * + * @param protocArgs the arguments to pass to protoc command + */ + private static void generateFileDescriptorWithProcessBuilder(String[] protocArgs) { + try { + String[] command = new String[protocArgs.length + 1]; + command[0] = "protoc"; + System.arraycopy(protocArgs, 0, command, 1, protocArgs.length); + + ProcessBuilder processBuilder = new ProcessBuilder(command); + + processBuilder.redirectErrorStream(true); + + Process process = processBuilder.start(); + + int exitCode = process.waitFor(); + + if (exitCode != 0) { + throw new IllegalStateException( + "Unable to generate file descriptor using system protoc. Exit code: " + exitCode); + } + } catch (IOException e) { + throw new UncheckedIOException( + "Unable to execute system protoc. Please ensure 'protoc' is installed and available in your system PATH.", + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Protoc execution was interrupted", e); + } + } + + private FileDescriptorReader() {} +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java new file mode 100644 index 00000000..dabde19b --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.grpc.Channel; +import io.grpc.ManagedChannelBuilder; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; + +class GrpcChannelResolver { + + static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider"; + + static Channel channel( + WorkflowContext workflowContext, + TaskContext taskContext, + GrpcRequestContext grpcRequestContext) { + return workflowContext + .definition() + .application() + .additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext) + .orElseGet( + () -> + ManagedChannelBuilder.forAddress( + grpcRequestContext.address(), grpcRequestContext.port()) + .usePlaintext() + .build()); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java new file mode 100644 index 00000000..44a10127 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -0,0 +1,201 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.MethodDescriptor; +import io.grpc.protobuf.ProtoUtils; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +public class GrpcExecutor implements CallableTask { + + private final GrpcRequestContext requestContext; + private final WorkflowValueResolver> arguments; + private final FileDescriptorContext fileDescriptorContext; + + public GrpcExecutor( + GrpcRequestContext requestContext, + WorkflowValueResolver> arguments, + FileDescriptorContext fileDescriptorContext) { + this.requestContext = requestContext; + this.arguments = arguments; + this.fileDescriptorContext = fileDescriptorContext; + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + return buildGrpcCallExecutor( + workflowContext, taskContext, this.arguments.apply(workflowContext, taskContext, input)); + } + + private CompletableFuture buildGrpcCallExecutor( + WorkflowContext workflowContext, TaskContext taskContext, Map arguments) { + + Channel channel = GrpcChannelResolver.channel(workflowContext, taskContext, requestContext); + + DescriptorProtos.FileDescriptorProto fileDescriptorProto = + fileDescriptorContext.fileDescriptorSet().getFileList().stream() + .filter(file -> file.getName().equals(fileDescriptorContext.inputProto())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Proto file not found in descriptor set")); + + try { + Descriptors.FileDescriptor fileDescriptor = + Descriptors.FileDescriptor.buildFrom( + fileDescriptorProto, new Descriptors.FileDescriptor[] {}); + + Descriptors.ServiceDescriptor serviceDescriptor = + Objects.requireNonNull( + fileDescriptor.findServiceByName(requestContext.service()), + "Service not found: " + requestContext.service()); + + Descriptors.MethodDescriptor methodDescriptor = + Objects.requireNonNull( + serviceDescriptor.findMethodByName(requestContext.method()), + "Method not found: " + requestContext.method()); + + MethodDescriptor.MethodType methodType = ProtobufMessageUtils.getMethodType(methodDescriptor); + + ClientCall call = + buildClientCall(channel, methodType, serviceDescriptor, methodDescriptor); + + return switch (methodType) { + case CLIENT_STREAMING -> + handleClientStreaming(workflowContext, arguments, methodDescriptor, call); + case BIDI_STREAMING -> + handleBidiStreaming(workflowContext, arguments, methodDescriptor, call); + case SERVER_STREAMING -> + handleServerStreaming(workflowContext, methodDescriptor, arguments, call); + case UNARY, UNKNOWN -> handleAsyncUnary(workflowContext, methodDescriptor, arguments, call); + }; + + } catch (Descriptors.DescriptorValidationException | InvalidProtocolBufferException e) { + throw new WorkflowException(WorkflowError.runtime(taskContext, e).build()); + } + } + + private static ClientCall buildClientCall( + Channel channel, + MethodDescriptor.MethodType methodType, + Descriptors.ServiceDescriptor serviceDescriptor, + Descriptors.MethodDescriptor methodDescriptor) { + return channel.newCall( + MethodDescriptor.newBuilder() + .setType(methodType) + .setFullMethodName( + MethodDescriptor.generateFullMethodName( + serviceDescriptor.getFullName(), methodDescriptor.getName())) + .setRequestMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getInputType()).buildPartial())) + .setResponseMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getOutputType()).buildPartial())) + .build(), + CallOptions.DEFAULT.withWaitForReady()); + } + + private static CompletableFuture handleClientStreaming( + WorkflowContext workflowContext, + Map parameters, + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + return ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), + workflowContext.definition().application().modelFactory()); + } + + private static CompletableFuture handleBidiStreaming( + WorkflowContext workflowContext, + Map parameters, + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + + return ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver), + workflowContext.definition().application().modelFactory()); + } + + private static CompletableFuture handleServerStreaming( + WorkflowContext workflowContext, + Descriptors.MethodDescriptor methodDescriptor, + Map parameters, + ClientCall call) + throws InvalidProtocolBufferException { + CollectionStreamObserver observer = + new CollectionStreamObserver(workflowContext.definition().application().modelFactory()); + ClientCalls.asyncServerStreamingCall( + call, ProtobufMessageUtils.buildMessage(methodDescriptor, parameters).build(), observer); + return observer.future(); + } + + private static CompletableFuture handleAsyncUnary( + WorkflowContext workflowContext, + Descriptors.MethodDescriptor methodDescriptor, + Map parameters, + ClientCall call) + throws InvalidProtocolBufferException { + + CompletableFuture future = new CompletableFuture<>(); + ClientCalls.asyncUnaryCall( + call, + ProtobufMessageUtils.buildMessage(methodDescriptor, parameters).build(), + new StreamObserver<>() { + private WorkflowModel model; + + @Override + public void onNext(Message value) { + model = + ProtobufMessageUtils.convert( + value, workflowContext.definition().application().modelFactory()); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + future.complete(model); + } + }); + return future; + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java new file mode 100644 index 00000000..0e4f39c6 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java @@ -0,0 +1,66 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.serverlessworkflow.api.types.CallGRPC; +import io.serverlessworkflow.api.types.GRPCArguments; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.WithGRPCService; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import java.util.Map; + +public class GrpcExecutorBuilder implements CallableTaskBuilder { + + private GrpcRequestContext grpcRequestContext; + private FileDescriptorContext fileDescriptorContext; + private WorkflowValueResolver> arguments; + + @Override + public boolean accept(Class clazz) { + return clazz.equals(CallGRPC.class); + } + + @Override + public void init(CallGRPC task, WorkflowDefinition definition, WorkflowMutablePosition position) { + + GRPCArguments with = task.getWith(); + WithGRPCService service = with.getService(); + + this.arguments = + WorkflowUtils.buildMapResolver( + definition.application(), + with.getArguments() != null ? with.getArguments().getAdditionalProperties() : Map.of()); + + this.grpcRequestContext = + new GrpcRequestContext( + service.getHost(), service.getPort(), with.getMethod(), service.getName()); + + this.fileDescriptorContext = + definition + .resourceLoader() + .loadStatic(with.getProto().getEndpoint(), FileDescriptorReader::readDescriptor); + } + + @Override + public CallableTask build() { + return new GrpcExecutor(this.grpcRequestContext, this.arguments, this.fileDescriptorContext); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java new file mode 100644 index 00000000..cbdb5716 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java @@ -0,0 +1,18 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +record GrpcRequestContext(String address, int port, String method, String service) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java new file mode 100644 index 00000000..0c2eec48 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -0,0 +1,103 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import io.grpc.MethodDescriptor; +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.UnaryOperator; + +class ProtobufMessageUtils { + + static WorkflowModel convert(Message message, WorkflowModelFactory modelFactory) { + StringBuilder str = new StringBuilder(); + try { + JsonFormat.printer().appendTo(message, str); + return modelFactory.from(str.toString()); + } catch (IOException e) { + throw new UncheckedIOException("Error converting protobuf message to JSON", e); + } + } + + static MethodDescriptor.MethodType getMethodType( + com.google.protobuf.Descriptors.MethodDescriptor methodDesc) { + DescriptorProtos.MethodDescriptorProto methodDescProto = methodDesc.toProto(); + if (methodDescProto.getClientStreaming()) { + if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.BIDI_STREAMING; + } + return MethodDescriptor.MethodType.CLIENT_STREAMING; + } else if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.SERVER_STREAMING; + } else { + return MethodDescriptor.MethodType.UNARY; + } + } + + static CompletableFuture asyncStreamingCall( + Map parameters, + com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, + UnaryOperator> streamObserverFunction, + WorkflowModelFactory modelFactory) { + CollectionStreamObserver responseObserver = new CollectionStreamObserver(modelFactory); + StreamObserver requestObserver = streamObserverFunction.apply(responseObserver); + for (Object entry : parameters.entrySet()) { + try { + requestObserver.onNext( + buildMessage(entry, DynamicMessage.newBuilder(methodDescriptor.getInputType())) + .build()); + } catch (InvalidProtocolBufferException e) { + requestObserver.onError(e); + } + } + requestObserver.onCompleted(); + return responseObserver.future(); + } + + static Message.Builder buildMessage(Object object, Message.Builder builder) + throws InvalidProtocolBufferException { + try { + // let's use Jackson to serialize the object to string for now, although we probably need to + // revisit this. + JsonFormat.parser().merge(JsonUtils.mapper().writeValueAsString(object), builder); + return builder; + } catch (JsonProcessingException e) { + throw new InvalidProtocolBufferException(e); + } + } + + static Message.Builder buildMessage( + Descriptors.MethodDescriptor methodDescriptor, Map parameters) + throws InvalidProtocolBufferException { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType()); + return buildMessage(parameters, builder); + } + + private ProtobufMessageUtils() {} +} diff --git a/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder new file mode 100644 index 00000000..6acd9ba6 --- /dev/null +++ b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.grpc.GrpcExecutorBuilder \ No newline at end of file diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java index 7554f6d7..8ca1724e 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java @@ -64,7 +64,7 @@ public static Collector arrayNodeCollector() { return new Collector() { @Override public BiConsumer accumulator() { - return (arrayNode, item) -> arrayNode.add(item); + return ArrayNode::add; } @Override diff --git a/impl/pom.xml b/impl/pom.xml index 06fa06ac..5f9fe4a8 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -17,6 +17,7 @@ 9.2.1 3.7.0 25.0.2 + 25.0.1 @@ -111,6 +112,11 @@ serverlessworkflow-impl-container ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-grpc + ${project.version} + net.thisptr jackson-jq @@ -199,5 +205,6 @@ test javascript python + grpc diff --git a/impl/test/pom.xml b/impl/test/pom.xml index fd47aae9..f040989b 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -53,6 +53,10 @@ io.serverlessworkflow serverlessworkflow-impl-container + + io.serverlessworkflow + serverlessworkflow-impl-grpc + org.glassfish.jersey.core jersey-client @@ -85,6 +89,11 @@ org.awaitility awaitility + + io.grpc + grpc-netty + test + @@ -95,7 +104,33 @@ + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.25.8:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${version.io.grpc.java}:exe:${os.detected.classifier} + ${project.basedir}/src/test/resources/workflows-samples/grpc/proto + + + + + test-compile + test-compile-custom + + + + maven-jar-plugin diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java new file mode 100644 index 00000000..aa34497a --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorBiDiStreamingHandler; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcBiDirectionalStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorBiDiStreamingHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcContributors() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-bidi-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto") + .getFile(); + + WorkflowModel model = + workflowDefinition.instance(Map.of("protoFilePath", "file://" + filename)).start().join(); + + Collection collection = model.asCollection(); + + Assertions.assertThat(collection).hasSize(5); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java new file mode 100644 index 00000000..aca9615d --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorClientStreamingHandler; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcClientStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorClientStreamingHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-client-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + List> list = + workflowDefinition.instance(Map.of()).start().join().asCollection().stream() + .map(m -> m.asMap().orElseThrow()) + .toList(); + + Assertions.assertThat(list).isNotEmpty(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java new file mode 100644 index 00000000..78ac67ca --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorServerStreamingHandler; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcServerStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorServerStreamingHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcContributors() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-server-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto") + .getFile(); + + WorkflowModel model = + workflowDefinition.instance(Map.of("protoFilePath", "file://" + filename)).start().join(); + + Collection collection = model.asCollection(); + + Assertions.assertThat(collection).hasSize(5); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java new file mode 100644 index 00000000..59777fb0 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorUnaryArgsExprHandler; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcUnaryArgsExprTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorUnaryArgsExprHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-unary-args-expr-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + Map output = + workflowDefinition + .instance(Map.of("github", "bootable[origin]")) + .start() + .join() + .asMap() + .orElseThrow(); + + Assertions.assertThat(output).contains(Map.entry("message", "Success with bootable[origin]")); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java new file mode 100644 index 00000000..6205ff27 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.test.grpc.handlers.PersonUnaryHandler; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcUnaryTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = ServerBuilder.forPort(PORT_FOR_EXAMPLES).addService(new PersonUnaryHandler()).build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath("workflows-samples/grpc/get-person-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/person.proto") + .getFile(); + + Map output = + workflowDefinition + .instance(Map.of("protoFilePath", "file://" + filename)) + .start() + .join() + .asMap() + .orElseThrow(); + + Assertions.assertThat(output).contains(Map.entry("name", "John Doe"), Map.entry("id", 891182)); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java new file mode 100644 index 00000000..7ee446fe --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.BiDirectionalStreamingGrpc.BiDirectionalStreamingImplBase; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; + +public class ContributorBiDiStreamingHandler extends BiDirectionalStreamingImplBase { + + @Override + public StreamObserver createContributor( + StreamObserver responseObserver) { + + return new StreamObserver() { + @Override + public void onNext(Contributors.AddContributionRequest value) { + for (int i = 0; i < 5; i++) { + Contributors.AddContributionResponse addContributionResponse = + Contributors.AddContributionResponse.newBuilder() + .setMessage("Contribution " + i + 1 + "added successfully") + .build(); + responseObserver.onNext(addContributionResponse); + } + } + + @Override + public void onError(Throwable t) { + // no-op + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java new file mode 100644 index 00000000..cdf5900f --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.ClientStreamingGrpc.ClientStreamingImplBase; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; + +public class ContributorClientStreamingHandler extends ClientStreamingImplBase { + + private static final Map GITHUBS = new ConcurrentHashMap<>(); + + @Override + public StreamObserver createContributor( + StreamObserver responseObserver) { + + return new StreamObserver<>() { + @Override + public void onNext(Contributors.AddContributionRequest value) { + String github = value.getGithub(); + GITHUBS.compute( + github, + (key, counter) -> { + if (counter == null) { + LongAdder longAdder = new LongAdder(); + longAdder.increment(); + return longAdder; + } + counter.increment(); + return counter; + }); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() { + StringBuilder stringBuilder = new StringBuilder(); + Set> entries = GITHUBS.entrySet(); + for (Map.Entry entry : entries) { + stringBuilder + .append(entry.getKey()) + .append(" has ") + .append(entry.getValue()) + .append(" contributions"); + } + responseObserver.onNext( + Contributors.AddContributionResponse.newBuilder() + .setMessage(stringBuilder.toString()) + .build()); + responseObserver.onCompleted(); + } + }; + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java new file mode 100644 index 00000000..53979739 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; +import io.serverlessworkflow.impl.executors.grpc.contributors.ServerStreamingGrpc.ServerStreamingImplBase; + +public class ContributorServerStreamingHandler extends ServerStreamingImplBase { + + @Override + public void createContributor( + Contributors.AddContributionRequest request, + StreamObserver responseObserver) { + + for (int i = 0; i < 5; i++) { + Contributors.AddContributionResponse addContributionResponse = + Contributors.AddContributionResponse.newBuilder() + .setMessage("Success: (" + (i + 1) + ")") + .build(); + responseObserver.onNext(addContributionResponse); + } + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java new file mode 100644 index 00000000..1ac2f2eb --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import static io.serverlessworkflow.impl.executors.grpc.contributors.UnaryArgsExprGrpc.UnaryArgsExprImplBase; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; + +public class ContributorUnaryArgsExprHandler extends UnaryArgsExprImplBase { + + @Override + public void createContributor( + Contributors.AddContributionRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + Contributors.AddContributionResponse.newBuilder() + .setMessage("Success with " + request.getGithub()) + .build()); + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java new file mode 100644 index 00000000..e72f4613 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import person.PersonGrpc.PersonImplBase; +import person.PersonOuterClass; + +public class PersonUnaryHandler extends PersonImplBase { + + @Override + public void getPerson( + PersonOuterClass.GetPersonRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + PersonOuterClass.GetPersonResponse.newBuilder().setId(891182).setName("John Doe").build()); + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml new file mode 100644 index 00000000..e5ab493a --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: BiDirectionalStreaming + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml new file mode 100644 index 00000000..e7d089fc --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: ClientStreaming + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml new file mode 100644 index 00000000..c74c68b4 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: ServerStreaming + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml new file mode 100644 index 00000000..5d4ec2c9 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: UnaryArgsExpr + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: ${ .github } \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml new file mode 100644 index 00000000..221c373e --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml @@ -0,0 +1,16 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/person.proto + service: + name: Person + host: localhost + port: 5011 + method: GetPerson \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto new file mode 100644 index 00000000..a7cb71e0 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto @@ -0,0 +1,27 @@ +syntax = "proto2"; + +package io.serverlessworkflow.impl.executors.grpc.contributors; + +message AddContributionRequest { + required string github = 1; +} + +message AddContributionResponse { + required string message = 1; +} + +service ClientStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {} +} + +service ServerStreaming { + rpc CreateContributor(AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service BiDirectionalStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service UnaryArgsExpr { + rpc CreateContributor(AddContributionRequest) returns (AddContributionResponse) {} +} \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto new file mode 100644 index 00000000..b86dd98f --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto @@ -0,0 +1,25 @@ +syntax = "proto2"; + +package person; + +message GetPersonRequest {} + +message GetPersonResponse { + required string name = 1; + required int32 id = 2; +} + +message GetCarRequest {} + +message GetCarResponse { + required string name = 1; + required string brand = 2; +} + +service Person { + rpc GetPerson(GetPersonRequest) returns (GetPersonResponse); +} + +service Car { + rpc GetCar(GetCarRequest) returns (GetCarResponse); +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1c1e694c..368d8ea2 100644 --- a/pom.xml +++ b/pom.xml @@ -59,12 +59,16 @@ 3.3.0 3.6.0 + 3.2.1 3.15.0 + 3.11.4 + 3.25.8 3.1.4 3.6.2 3.5.4 3.2.8 + 1.78.0 3.5.0 ${java.version} 1.3.3 @@ -80,6 +84,7 @@ 4.3.0 + 4.32.1 1.5.28 2.21.0 2.21 @@ -203,7 +208,32 @@ jakarta.validation-api ${version.jakarta.validation} - + + io.grpc + grpc-netty + ${version.io.grpc.java} + test + + + io.grpc + grpc-stub + ${version.io.grpc.java} + + + io.grpc + grpc-protobuf + ${version.io.grpc.java} + + + com.google.protobuf + protobuf-java-util + ${version.com.google.protobuf.protobuf.java.util} + + + com.github.os72 + protoc-jar + ${version.com.github.os72.protoc.jar} + org.junit.jupiter