From d51f5a6d54869dbef89d6b20b1c0e9835ec75b88 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Tue, 3 Feb 2026 17:26:45 -0300 Subject: [PATCH 01/14] Add support to gRPC Signed-off-by: Matheus Cruz --- impl/grpc/pom.xml | 41 +++ .../executors/grpc/FileDescriptorContext.java | 21 ++ .../executors/grpc/FileDescriptorReader.java | 108 ++++++++ .../executors/grpc/GrpcChannelResolver.java | 41 +++ .../impl/executors/grpc/GrpcExecutor.java | 239 ++++++++++++++++++ .../executors/grpc/GrpcExecutorBuilder.java | 71 ++++++ .../executors/grpc/GrpcRequestContext.java | 18 ++ .../executors/grpc/ProtobufMessageUtils.java | 104 ++++++++ .../executors/grpc/WaitingStreamObserver.java | 73 ++++++ ...orkflow.impl.executors.CallableTaskBuilder | 1 + .../executors/http/HttpClientResolver.java | 10 + .../impl/executors/http/HttpExecutor.java | 8 +- impl/pom.xml | 7 + impl/test/pom.xml | 34 +++ .../grpc/GrpcBiDirectionalStreamingTest.java | 78 ++++++ .../test/grpc/GrpcClientStreamingTest.java | 69 +++++ .../test/grpc/GrpcServerStreamingTest.java | 78 ++++++ .../impl/test/grpc/GrpcUnaryArgsExprTest.java | 73 ++++++ .../impl/test/grpc/GrpcUnaryTest.java | 75 ++++++ .../ContributorBiDiStreamingHandler.java | 51 ++++ .../ContributorClientStreamingHandler.java | 73 ++++++ .../ContributorServerStreamingHandler.java | 38 +++ .../ContributorUnaryArgsExprHandler.java | 35 +++ .../grpc/handlers/PersonUnaryHandler.java | 32 +++ impl/test/src/test/proto/contributors.proto | 27 ++ impl/test/src/test/proto/person.proto | 25 ++ .../grpc/contributors-bidi-stream-call.yaml | 18 ++ .../grpc/contributors-client-stream-call.yaml | 18 ++ .../grpc/contributors-server-stream-call.yaml | 18 ++ .../contributors-unary-args-expr-call.yaml | 18 ++ .../grpc/get-person-call.yaml | 16 ++ .../grpc/proto/contributors.proto | 27 ++ .../workflows-samples/grpc/proto/person.proto | 25 ++ pom.xml | 33 ++- 34 files changed, 1601 insertions(+), 2 deletions(-) create mode 100644 impl/grpc/pom.xml create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java create mode 100644 impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java create mode 100644 impl/test/src/test/proto/contributors.proto create mode 100644 impl/test/src/test/proto/person.proto create mode 100644 impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto create mode 100644 impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto diff --git a/impl/grpc/pom.xml b/impl/grpc/pom.xml new file mode 100644 index 00000000..a3fa8cde --- /dev/null +++ b/impl/grpc/pom.xml @@ -0,0 +1,41 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-grpc + Serverless Workflow :: Impl :: gRPC + + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + io.serverlessworkflow + serverlessworkflow-api + + + io.serverlessworkflow + serverlessworkflow-impl-jackson + + + io.grpc + grpc-stub + + + com.google.protobuf + protobuf-java-util + + + com.github.os72 + protoc-jar + + + io.grpc + grpc-protobuf + + + \ No newline at end of file diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java new file mode 100644 index 00000000..a111e9dd --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.DescriptorProtos; + +public record FileDescriptorContext( + DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java new file mode 100644 index 00000000..36d53b4b --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java @@ -0,0 +1,108 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.github.os72.protocjar.Protoc; +import com.google.protobuf.DescriptorProtos; +import io.serverlessworkflow.impl.resources.ExternalResourceHandler; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Optional; + +public class FileDescriptorReader { + + public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) { + Path grpcDir = + tryCreateTempGrpcDir() + .orElseThrow( + () -> new IllegalStateException("Could not create temporary gRPC directory")); + + try (InputStream inputStream = externalResourceHandler.open()) { + + Path protoFile = grpcDir.resolve(externalResourceHandler.name()); + if (!Files.exists(protoFile)) { + Files.createDirectories(protoFile); + } + + Files.copy(inputStream, protoFile, StandardCopyOption.REPLACE_EXISTING); + + Path descriptorOutput = grpcDir.resolve("descriptor.protobin"); + + try { + + generateFileDescriptor(grpcDir, protoFile, descriptorOutput); + + DescriptorProtos.FileDescriptorSet fileDescriptorSet = + DescriptorProtos.FileDescriptorSet.newBuilder() + .mergeFrom(Files.readAllBytes(descriptorOutput)) + .build(); + + return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name()); + + } catch (IOException e) { + throw new UncheckedIOException( + "Unable to read external resource handler: " + externalResourceHandler.name(), e); + } + } catch (IOException e) { + throw new UncheckedIOException("Unable to read descriptor file", e); + } + } + + private Optional tryCreateTempGrpcDir() { + try { + return Optional.of(Files.createTempDirectory("serverless-workflow-")); + } catch (IOException e) { + throw new UncheckedIOException("Error while creating temporary gRPC directory", e); + } + } + + /** + * Calls protoc binary with --descriptor_set_out= option set. + * + * @param grpcDir a temporary directory + * @param protoFile the .proto file used by protoc to generate the file descriptor + * @param descriptorOutput the output directory where the descriptor file will be generated + */ + private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) { + String[] protocArgs = + new String[] { + "--include_imports", + "--descriptor_set_out=" + descriptorOutput.toAbsolutePath(), + "-I", + grpcDir.toAbsolutePath().toString(), + protoFile.toAbsolutePath().toString() + }; + + try { + + int status = Protoc.runProtoc(protocArgs); + + if (status != 0) { + throw new RuntimeException( + "Unable to generate file descriptor, 'protoc' execution failed with status " + status); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to generate file descriptor", e); + } catch (IOException e) { + throw new UncheckedIOException("Unable to generate file descriptor", e); + } + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java new file mode 100644 index 00000000..c13f5de5 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.grpc.Channel; +import io.grpc.ManagedChannelBuilder; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; + +public class GrpcChannelResolver { + + public static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider"; + + public static Channel channel( + WorkflowContext workflowContext, + TaskContext taskContext, + GrpcRequestContext grpcRequestContext) { + WorkflowApplication appl = workflowContext.definition().application(); + return appl.additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext) + .orElseGet( + () -> + ManagedChannelBuilder.forAddress( + grpcRequestContext.address(), grpcRequestContext.port()) + .usePlaintext() + .build()); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java new file mode 100644 index 00000000..78d32dae --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -0,0 +1,239 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.MethodDescriptor; +import io.grpc.protobuf.ProtoUtils; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.api.types.ExternalResource; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +public class GrpcExecutor implements CallableTask { + + private final GrpcRequestContext requestContext; + private final WorkflowValueResolver> arguments; + private final FileDescriptorContext fileDescriptorContext; + private final ExternalResource proto; + + public GrpcExecutor( + GrpcRequestContext builder, + WorkflowValueResolver> arguments, + FileDescriptorContext fileDescriptorContext, + ExternalResource proto) { + this.requestContext = builder; + this.arguments = arguments; + this.fileDescriptorContext = fileDescriptorContext; + this.proto = proto; + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + + Map arguments = this.arguments.apply(workflowContext, taskContext, input); + + return buildGrpcCallExecutor(workflowContext, taskContext, arguments); + } + + private CompletableFuture buildGrpcCallExecutor( + WorkflowContext workflowContext, TaskContext taskContext, Map arguments) { + + Channel channel = GrpcChannelResolver.channel(workflowContext, taskContext, requestContext); + + String protoName = fileDescriptorContext.inputProto(); + + DescriptorProtos.FileDescriptorProto fileDescriptorProto = + fileDescriptorContext.fileDescriptorSet().getFileList().stream() + .filter( + file -> + file.getName() + .equals(this.proto.getName() != null ? this.proto.getName() : protoName)) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Proto file not found in descriptor set")); + + try { + Descriptors.FileDescriptor fileDescriptor = + Descriptors.FileDescriptor.buildFrom( + fileDescriptorProto, new Descriptors.FileDescriptor[] {}); + + Descriptors.ServiceDescriptor serviceDescriptor = + fileDescriptor.findServiceByName(requestContext.service()); + + Objects.requireNonNull(serviceDescriptor, "Service not found: " + requestContext.service()); + + Descriptors.MethodDescriptor methodDescriptor = + serviceDescriptor.findMethodByName(requestContext.method()); + + Objects.requireNonNull(methodDescriptor, "Method not found: " + requestContext.method()); + + MethodDescriptor.MethodType methodType = ProtobufMessageUtils.getMethodType(methodDescriptor); + + ClientCall call = + buildClientCall(channel, methodType, serviceDescriptor, methodDescriptor); + + return switch (methodType) { + case CLIENT_STREAMING -> + CompletableFuture.completedFuture( + handleClientStreaming(workflowContext, arguments, methodDescriptor, call)); + case BIDI_STREAMING -> + CompletableFuture.completedFuture( + handleBidiStreaming(workflowContext, arguments, methodDescriptor, call)); + case SERVER_STREAMING -> + CompletableFuture.completedFuture( + handleServerStreaming(workflowContext, methodDescriptor, arguments, call)); + case UNARY, UNKNOWN -> handleAsyncUnary(workflowContext, methodDescriptor, arguments, call); + }; + + } catch (Descriptors.DescriptorValidationException + | InvalidProtocolBufferException + | JsonProcessingException e) { + throw new WorkflowException(WorkflowError.runtime(taskContext, e).build()); + } + } + + private static ClientCall buildClientCall( + Channel channel, + MethodDescriptor.MethodType methodType, + Descriptors.ServiceDescriptor serviceDescriptor, + Descriptors.MethodDescriptor methodDescriptor) { + return channel.newCall( + MethodDescriptor.newBuilder() + .setType(methodType) + .setFullMethodName( + MethodDescriptor.generateFullMethodName( + serviceDescriptor.getFullName(), methodDescriptor.getName())) + .setRequestMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getInputType()).buildPartial())) + .setResponseMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getOutputType()).buildPartial())) + .build(), + CallOptions.DEFAULT.withWaitForReady()); + } + + private static WorkflowModel handleClientStreaming( + WorkflowContext workflowContext, + Map parameters, + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + JsonNode jsonNode = + ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), + nodes -> nodes.isEmpty() ? NullNode.instance : nodes.get(0)); + return workflowContext.definition().application().modelFactory().fromAny(jsonNode); + } + + private static WorkflowModel handleBidiStreaming( + WorkflowContext workflowContext, + Map parameters, + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + return workflowContext + .definition() + .application() + .modelFactory() + .fromAny( + ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver), + v -> { + Collection nodes = v; + List list = new ArrayList<>(nodes); + return JsonUtils.fromValue(list); + })); + } + + private static WorkflowModel handleServerStreaming( + WorkflowContext workflowContext, + Descriptors.MethodDescriptor methodDescriptor, + Map parameters, + ClientCall call) + throws InvalidProtocolBufferException, JsonProcessingException { + Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters); + List nodes = new ArrayList<>(); + ClientCalls.blockingServerStreamingCall(call, builder.build()) + .forEachRemaining(message -> nodes.add(ProtobufMessageUtils.convert(message))); + return workflowContext.definition().application().modelFactory().fromAny(nodes); + } + + private static CompletableFuture handleAsyncUnary( + WorkflowContext workflowContext, + Descriptors.MethodDescriptor methodDescriptor, + Map parameters, + ClientCall call) + throws InvalidProtocolBufferException, JsonProcessingException { + + CompletableFuture future = new CompletableFuture<>(); + + Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters); + + ClientCalls.asyncUnaryCall( + call, + builder.build(), + new StreamObserver() { + @Override + public void onNext(Message value) { + WorkflowModel model = + workflowContext + .definition() + .application() + .modelFactory() + .fromAny(ProtobufMessageUtils.convert(value)); + future.complete(model); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + // no-op + } + }); + return future; + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java new file mode 100644 index 00000000..49ac240d --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.serverlessworkflow.api.types.CallGRPC; +import io.serverlessworkflow.api.types.ExternalResource; +import io.serverlessworkflow.api.types.GRPCArguments; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.WithGRPCService; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import java.util.Map; + +public class GrpcExecutorBuilder implements CallableTaskBuilder { + + private ExternalResource proto; + private GrpcRequestContext grpcRequestContext; + private FileDescriptorContext fileDescriptorContext; + private WorkflowValueResolver> arguments; + + @Override + public boolean accept(Class clazz) { + return clazz.equals(CallGRPC.class); + } + + @Override + public void init(CallGRPC task, WorkflowDefinition definition, WorkflowMutablePosition position) { + + GRPCArguments with = task.getWith(); + WithGRPCService service = with.getService(); + this.proto = with.getProto(); + + this.arguments = + WorkflowUtils.buildMapResolver( + definition.application(), + with.getArguments() != null ? with.getArguments().getAdditionalProperties() : Map.of()); + + this.grpcRequestContext = + new GrpcRequestContext( + service.getHost(), service.getPort(), with.getMethod(), service.getName()); + + FileDescriptorReader fileDescriptorReader = new FileDescriptorReader(); + this.fileDescriptorContext = + definition + .resourceLoader() + .loadStatic(with.getProto().getEndpoint(), fileDescriptorReader::readDescriptor); + } + + @Override + public CallableTask build() { + return new GrpcExecutor( + this.grpcRequestContext, this.arguments, this.fileDescriptorContext, this.proto); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java new file mode 100644 index 00000000..2abab646 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java @@ -0,0 +1,18 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +public record GrpcRequestContext(String address, int port, String method, String service) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java new file mode 100644 index 00000000..384b449c --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -0,0 +1,104 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import io.grpc.MethodDescriptor; +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.api.WorkflowFormat; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +public interface ProtobufMessageUtils { + + static JsonNode convert(Message message) { + StringBuilder str = new StringBuilder(); + try { + JsonFormat.printer().appendTo(message, str); + return WorkflowFormat.JSON.mapper().readTree(str.toString()); + } catch (IOException e) { + throw new UncheckedIOException("Error converting protobuf message to JSON", e); + } + } + + static MethodDescriptor.MethodType getMethodType( + com.google.protobuf.Descriptors.MethodDescriptor methodDesc) { + DescriptorProtos.MethodDescriptorProto methodDescProto = methodDesc.toProto(); + if (methodDescProto.getClientStreaming()) { + if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.BIDI_STREAMING; + } + return MethodDescriptor.MethodType.CLIENT_STREAMING; + } else if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.SERVER_STREAMING; + } else { + return MethodDescriptor.MethodType.UNARY; + } + } + + static JsonNode asyncStreamingCall( + Map parameters, + com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, + UnaryOperator> streamObserverFunction, + Function, JsonNode> nodesFunction) { + WaitingStreamObserver responseObserver = new WaitingStreamObserver(); + StreamObserver requestObserver = streamObserverFunction.apply(responseObserver); + + for (var entry : parameters.entrySet()) { + try { + Message message = + buildMessage(entry, DynamicMessage.newBuilder(methodDescriptor.getInputType())).build(); + requestObserver.onNext(message); + } catch (Exception e) { + requestObserver.onError(e); + throw new RuntimeException(e); + } + responseObserver.checkForServerStreamErrors(); + } + requestObserver.onCompleted(); + + return nodesFunction.apply( + responseObserver.get().stream() + .map(ProtobufMessageUtils::convert) + .collect(Collectors.toList())); + } + + static Message.Builder buildMessage(Object object, Message.Builder builder) + throws InvalidProtocolBufferException, JsonProcessingException { + JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(object), builder); + return builder; + } + + static Message.Builder buildMessage( + Descriptors.MethodDescriptor methodDescriptor, Map parameters) + throws InvalidProtocolBufferException, JsonProcessingException { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType()); + JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(parameters), builder); + return builder; + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java new file mode 100644 index 00000000..971951d3 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.Message; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +public class WaitingStreamObserver implements StreamObserver { + + List responses = new ArrayList<>(); + CompletableFuture> responsesFuture = new CompletableFuture<>(); + + @Override + public void onNext(Message messageReply) { + responses.add(messageReply); + } + + @Override + public void onError(Throwable throwable) { + responsesFuture.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + responsesFuture.complete(responses); + } + + public List get() { + try { + return responsesFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (ExecutionException e) { + throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause()); + } + } + + public void checkForServerStreamErrors() { + if (responsesFuture.isCompletedExceptionally()) { + try { + responsesFuture.join(); + } catch (CompletionException e) { + throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause()); + } + } + } + + private String getServerStreamErrorMessage(Throwable throwable) { + return String.format( + "Received an error through gRPC server stream with status: %s", + Status.fromThrowable(throwable)); + } +} diff --git a/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder new file mode 100644 index 00000000..6acd9ba6 --- /dev/null +++ b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.grpc.GrpcExecutorBuilder \ No newline at end of file diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java index fc40b257..c26ea3ee 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java @@ -20,10 +20,13 @@ import io.serverlessworkflow.impl.WorkflowContext; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; +import jakarta.ws.rs.client.ClientRequestFilter; +import java.util.Optional; public class HttpClientResolver { public static final String HTTP_CLIENT_PROVIDER = "httpClientProvider"; + public static final String HTTP_CLIENT_FILTER_PROVIDER = "httpClientFilterProvider"; private static class DefaultHolder { private static final Client client = ClientBuilder.newClient(); @@ -35,5 +38,12 @@ public static Client client(WorkflowContext workflowContext, TaskContext taskCon .orElseGet(() -> DefaultHolder.client); } + public static Optional clientRequestFilter( + WorkflowContext workflowContext, TaskContext taskContext) { + WorkflowApplication application = workflowContext.definition().application(); + return application.additionalObject( + HTTP_CLIENT_FILTER_PROVIDER, workflowContext, taskContext); + } + private HttpClientResolver() {} } diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java index c787cd2b..1413e582 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java @@ -21,6 +21,7 @@ import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.executors.CallableTask; +import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.Invocation.Builder; import jakarta.ws.rs.client.WebTarget; import java.net.URI; @@ -69,7 +70,12 @@ public CompletableFuture apply( p.apply(workflow, taskContext, input))) .orElse(uriSupplier.apply(workflow, taskContext, input)); - WebTarget target = HttpClientResolver.client(workflow, taskContext).target(uri); + Client client = HttpClientResolver.client(workflow, taskContext); + + HttpClientResolver.clientRequestFilter(workflow, taskContext).ifPresent(client::register); + + WebTarget target = client.target(uri); + for (Entry entry : queryMap.map(q -> q.apply(workflow, taskContext, input)).orElse(Map.of()).entrySet()) { target = target.queryParam(entry.getKey(), entry.getValue()); diff --git a/impl/pom.xml b/impl/pom.xml index 06fa06ac..5f9fe4a8 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -17,6 +17,7 @@ 9.2.1 3.7.0 25.0.2 + 25.0.1 @@ -111,6 +112,11 @@ serverlessworkflow-impl-container ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-grpc + ${project.version} + net.thisptr jackson-jq @@ -199,5 +205,6 @@ test javascript python + grpc diff --git a/impl/test/pom.xml b/impl/test/pom.xml index fd47aae9..bad15d72 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -53,6 +53,10 @@ io.serverlessworkflow serverlessworkflow-impl-container + + io.serverlessworkflow + serverlessworkflow-impl-grpc + org.glassfish.jersey.core jersey-client @@ -85,6 +89,11 @@ org.awaitility awaitility + + io.grpc + grpc-netty + test + @@ -95,7 +104,32 @@ + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.25.8:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${version.io.grpc.java}:exe:${os.detected.classifier} + + + + + test-compile + test-compile-custom + + + + maven-jar-plugin diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java new file mode 100644 index 00000000..aa34497a --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorBiDiStreamingHandler; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcBiDirectionalStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorBiDiStreamingHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcContributors() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-bidi-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto") + .getFile(); + + WorkflowModel model = + workflowDefinition.instance(Map.of("protoFilePath", "file://" + filename)).start().join(); + + Collection collection = model.asCollection(); + + Assertions.assertThat(collection).hasSize(5); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java new file mode 100644 index 00000000..52ffc88a --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorClientStreamingHandler; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcClientStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorClientStreamingHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-client-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + Map output = + workflowDefinition.instance(Map.of()).start().join().asMap().orElseThrow(); + + Assertions.assertThat(output) + .contains(Map.entry("message", "dependabot[bot] has 1 contributions")); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java new file mode 100644 index 00000000..78ac67ca --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorServerStreamingHandler; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcServerStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorServerStreamingHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcContributors() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-server-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto") + .getFile(); + + WorkflowModel model = + workflowDefinition.instance(Map.of("protoFilePath", "file://" + filename)).start().join(); + + Collection collection = model.asCollection(); + + Assertions.assertThat(collection).hasSize(5); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java new file mode 100644 index 00000000..59777fb0 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorUnaryArgsExprHandler; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcUnaryArgsExprTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES) + .addService(new ContributorUnaryArgsExprHandler()) + .build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-unary-args-expr-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + Map output = + workflowDefinition + .instance(Map.of("github", "bootable[origin]")) + .start() + .join() + .asMap() + .orElseThrow(); + + Assertions.assertThat(output).contains(Map.entry("message", "Success with bootable[origin]")); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java new file mode 100644 index 00000000..6205ff27 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.test.grpc.handlers.PersonUnaryHandler; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class GrpcUnaryTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = ServerBuilder.forPort(PORT_FOR_EXAMPLES).addService(new PersonUnaryHandler()).build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath("workflows-samples/grpc/get-person-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/person.proto") + .getFile(); + + Map output = + workflowDefinition + .instance(Map.of("protoFilePath", "file://" + filename)) + .start() + .join() + .asMap() + .orElseThrow(); + + Assertions.assertThat(output).contains(Map.entry("name", "John Doe"), Map.entry("id", 891182)); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java new file mode 100644 index 00000000..7ee446fe --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorBiDiStreamingHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.BiDirectionalStreamingGrpc.BiDirectionalStreamingImplBase; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; + +public class ContributorBiDiStreamingHandler extends BiDirectionalStreamingImplBase { + + @Override + public StreamObserver createContributor( + StreamObserver responseObserver) { + + return new StreamObserver() { + @Override + public void onNext(Contributors.AddContributionRequest value) { + for (int i = 0; i < 5; i++) { + Contributors.AddContributionResponse addContributionResponse = + Contributors.AddContributionResponse.newBuilder() + .setMessage("Contribution " + i + 1 + "added successfully") + .build(); + responseObserver.onNext(addContributionResponse); + } + } + + @Override + public void onError(Throwable t) { + // no-op + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java new file mode 100644 index 00000000..cdf5900f --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorClientStreamingHandler.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.ClientStreamingGrpc.ClientStreamingImplBase; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; + +public class ContributorClientStreamingHandler extends ClientStreamingImplBase { + + private static final Map GITHUBS = new ConcurrentHashMap<>(); + + @Override + public StreamObserver createContributor( + StreamObserver responseObserver) { + + return new StreamObserver<>() { + @Override + public void onNext(Contributors.AddContributionRequest value) { + String github = value.getGithub(); + GITHUBS.compute( + github, + (key, counter) -> { + if (counter == null) { + LongAdder longAdder = new LongAdder(); + longAdder.increment(); + return longAdder; + } + counter.increment(); + return counter; + }); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() { + StringBuilder stringBuilder = new StringBuilder(); + Set> entries = GITHUBS.entrySet(); + for (Map.Entry entry : entries) { + stringBuilder + .append(entry.getKey()) + .append(" has ") + .append(entry.getValue()) + .append(" contributions"); + } + responseObserver.onNext( + Contributors.AddContributionResponse.newBuilder() + .setMessage(stringBuilder.toString()) + .build()); + responseObserver.onCompleted(); + } + }; + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java new file mode 100644 index 00000000..53979739 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorServerStreamingHandler.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; +import io.serverlessworkflow.impl.executors.grpc.contributors.ServerStreamingGrpc.ServerStreamingImplBase; + +public class ContributorServerStreamingHandler extends ServerStreamingImplBase { + + @Override + public void createContributor( + Contributors.AddContributionRequest request, + StreamObserver responseObserver) { + + for (int i = 0; i < 5; i++) { + Contributors.AddContributionResponse addContributionResponse = + Contributors.AddContributionResponse.newBuilder() + .setMessage("Success: (" + (i + 1) + ")") + .build(); + responseObserver.onNext(addContributionResponse); + } + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java new file mode 100644 index 00000000..1ac2f2eb --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorUnaryArgsExprHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import static io.serverlessworkflow.impl.executors.grpc.contributors.UnaryArgsExprGrpc.UnaryArgsExprImplBase; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; + +public class ContributorUnaryArgsExprHandler extends UnaryArgsExprImplBase { + + @Override + public void createContributor( + Contributors.AddContributionRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + Contributors.AddContributionResponse.newBuilder() + .setMessage("Success with " + request.getGithub()) + .build()); + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java new file mode 100644 index 00000000..e72f4613 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonUnaryHandler.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import person.PersonGrpc.PersonImplBase; +import person.PersonOuterClass; + +public class PersonUnaryHandler extends PersonImplBase { + + @Override + public void getPerson( + PersonOuterClass.GetPersonRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + PersonOuterClass.GetPersonResponse.newBuilder().setId(891182).setName("John Doe").build()); + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/proto/contributors.proto b/impl/test/src/test/proto/contributors.proto new file mode 100644 index 00000000..a7cb71e0 --- /dev/null +++ b/impl/test/src/test/proto/contributors.proto @@ -0,0 +1,27 @@ +syntax = "proto2"; + +package io.serverlessworkflow.impl.executors.grpc.contributors; + +message AddContributionRequest { + required string github = 1; +} + +message AddContributionResponse { + required string message = 1; +} + +service ClientStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {} +} + +service ServerStreaming { + rpc CreateContributor(AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service BiDirectionalStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service UnaryArgsExpr { + rpc CreateContributor(AddContributionRequest) returns (AddContributionResponse) {} +} \ No newline at end of file diff --git a/impl/test/src/test/proto/person.proto b/impl/test/src/test/proto/person.proto new file mode 100644 index 00000000..b86dd98f --- /dev/null +++ b/impl/test/src/test/proto/person.proto @@ -0,0 +1,25 @@ +syntax = "proto2"; + +package person; + +message GetPersonRequest {} + +message GetPersonResponse { + required string name = 1; + required int32 id = 2; +} + +message GetCarRequest {} + +message GetCarResponse { + required string name = 1; + required string brand = 2; +} + +service Person { + rpc GetPerson(GetPersonRequest) returns (GetPersonResponse); +} + +service Car { + rpc GetCar(GetCarRequest) returns (GetCarResponse); +} \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml new file mode 100644 index 00000000..e5ab493a --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-bidi-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: BiDirectionalStreaming + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml new file mode 100644 index 00000000..e7d089fc --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: ClientStreaming + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml new file mode 100644 index 00000000..c74c68b4 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-server-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: ServerStreaming + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml new file mode 100644 index 00000000..5d4ec2c9 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-unary-args-expr-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/contributors.proto + service: + name: UnaryArgsExpr + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: ${ .github } \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml new file mode 100644 index 00000000..221c373e --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/get-person-call.yaml @@ -0,0 +1,16 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: workflows-samples/grpc/proto/person.proto + service: + name: Person + host: localhost + port: 5011 + method: GetPerson \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto new file mode 100644 index 00000000..a7cb71e0 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto @@ -0,0 +1,27 @@ +syntax = "proto2"; + +package io.serverlessworkflow.impl.executors.grpc.contributors; + +message AddContributionRequest { + required string github = 1; +} + +message AddContributionResponse { + required string message = 1; +} + +service ClientStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {} +} + +service ServerStreaming { + rpc CreateContributor(AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service BiDirectionalStreaming { + rpc CreateContributor(stream AddContributionRequest) returns (stream AddContributionResponse) {} +} + +service UnaryArgsExpr { + rpc CreateContributor(AddContributionRequest) returns (AddContributionResponse) {} +} \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto new file mode 100644 index 00000000..b86dd98f --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto @@ -0,0 +1,25 @@ +syntax = "proto2"; + +package person; + +message GetPersonRequest {} + +message GetPersonResponse { + required string name = 1; + required int32 id = 2; +} + +message GetCarRequest {} + +message GetCarResponse { + required string name = 1; + required string brand = 2; +} + +service Person { + rpc GetPerson(GetPersonRequest) returns (GetPersonResponse); +} + +service Car { + rpc GetCar(GetCarRequest) returns (GetCarResponse); +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5c984d05..94f6ef74 100644 --- a/pom.xml +++ b/pom.xml @@ -59,12 +59,16 @@ 3.3.0 3.6.0 + 3.2.1 3.15.0 + 3.11.4 + 3.25.8 3.1.4 3.6.2 3.5.4 3.2.8 + 1.78.0 3.5.0 ${java.version} 1.3.1 @@ -80,7 +84,9 @@ 4.3.0 + 4.32.1 1.5.27 + 2.21.0 2.21 2.0.0 @@ -203,7 +209,32 @@ jakarta.validation-api ${version.jakarta.validation} - + + io.grpc + grpc-netty + ${version.io.grpc.java} + test + + + io.grpc + grpc-stub + ${version.io.grpc.java} + + + io.grpc + grpc-protobuf + ${version.io.grpc.java} + + + com.google.protobuf + protobuf-java-util + ${version.com.google.protobuf.protobuf.java.util} + + + com.github.os72 + protoc-jar + ${version.com.github.os72.protoc.jar} + org.junit.jupiter From a42b6f2b1f5b97553b6834c590e439b34b6c364a Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Tue, 10 Feb 2026 11:05:31 -0300 Subject: [PATCH 02/14] Apply pull request suggestions Signed-off-by: Matheus Cruz --- .../impl/executors/http/HttpClientResolver.java | 10 ---------- .../impl/executors/http/HttpExecutor.java | 2 -- 2 files changed, 12 deletions(-) diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java index c26ea3ee..fc40b257 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpClientResolver.java @@ -20,13 +20,10 @@ import io.serverlessworkflow.impl.WorkflowContext; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; -import jakarta.ws.rs.client.ClientRequestFilter; -import java.util.Optional; public class HttpClientResolver { public static final String HTTP_CLIENT_PROVIDER = "httpClientProvider"; - public static final String HTTP_CLIENT_FILTER_PROVIDER = "httpClientFilterProvider"; private static class DefaultHolder { private static final Client client = ClientBuilder.newClient(); @@ -38,12 +35,5 @@ public static Client client(WorkflowContext workflowContext, TaskContext taskCon .orElseGet(() -> DefaultHolder.client); } - public static Optional clientRequestFilter( - WorkflowContext workflowContext, TaskContext taskContext) { - WorkflowApplication application = workflowContext.definition().application(); - return application.additionalObject( - HTTP_CLIENT_FILTER_PROVIDER, workflowContext, taskContext); - } - private HttpClientResolver() {} } diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java index 1413e582..127f669d 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java @@ -72,8 +72,6 @@ public CompletableFuture apply( Client client = HttpClientResolver.client(workflow, taskContext); - HttpClientResolver.clientRequestFilter(workflow, taskContext).ifPresent(client::register); - WebTarget target = client.target(uri); for (Entry entry : From d60e93155939b1401ffb3adc4a01dab47d6a5487 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Tue, 10 Feb 2026 15:49:32 -0300 Subject: [PATCH 03/14] Point to proto in resources dir Signed-off-by: Matheus Cruz --- impl/test/pom.xml | 1 + impl/test/src/test/proto/contributors.proto | 27 --------------------- impl/test/src/test/proto/person.proto | 25 ------------------- 3 files changed, 1 insertion(+), 52 deletions(-) delete mode 100644 impl/test/src/test/proto/contributors.proto delete mode 100644 impl/test/src/test/proto/person.proto diff --git a/impl/test/pom.xml b/impl/test/pom.xml index bad15d72..f040989b 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -120,6 +120,7 @@ com.google.protobuf:protoc:3.25.8:exe:${os.detected.classifier} grpc-java io.grpc:protoc-gen-grpc-java:${version.io.grpc.java}:exe:${os.detected.classifier} + ${project.basedir}/src/test/resources/workflows-samples/grpc/proto diff --git a/impl/test/src/test/proto/contributors.proto b/impl/test/src/test/proto/contributors.proto deleted file mode 100644 index a7cb71e0..00000000 --- a/impl/test/src/test/proto/contributors.proto +++ /dev/null @@ -1,27 +0,0 @@ -syntax = "proto2"; - -package io.serverlessworkflow.impl.executors.grpc.contributors; - -message AddContributionRequest { - required string github = 1; -} - -message AddContributionResponse { - required string message = 1; -} - -service ClientStreaming { - rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {} -} - -service ServerStreaming { - rpc CreateContributor(AddContributionRequest) returns (stream AddContributionResponse) {} -} - -service BiDirectionalStreaming { - rpc CreateContributor(stream AddContributionRequest) returns (stream AddContributionResponse) {} -} - -service UnaryArgsExpr { - rpc CreateContributor(AddContributionRequest) returns (AddContributionResponse) {} -} \ No newline at end of file diff --git a/impl/test/src/test/proto/person.proto b/impl/test/src/test/proto/person.proto deleted file mode 100644 index b86dd98f..00000000 --- a/impl/test/src/test/proto/person.proto +++ /dev/null @@ -1,25 +0,0 @@ -syntax = "proto2"; - -package person; - -message GetPersonRequest {} - -message GetPersonResponse { - required string name = 1; - required int32 id = 2; -} - -message GetCarRequest {} - -message GetCarResponse { - required string name = 1; - required string brand = 2; -} - -service Person { - rpc GetPerson(GetPersonRequest) returns (GetPersonResponse); -} - -service Car { - rpc GetCar(GetCarRequest) returns (GetCarResponse); -} \ No newline at end of file From 6b233535859de1ce9537b6faaddd404a5730d853 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Tue, 10 Feb 2026 16:49:51 -0300 Subject: [PATCH 04/14] Apply @fjtirado suggestions Signed-off-by: Matheus Cruz --- .../impl/executors/grpc/GrpcExecutor.java | 65 +++++++++---------- .../executors/grpc/ProtobufMessageUtils.java | 29 +++++---- .../impl/jackson/JsonUtils.java | 2 +- .../test/grpc/GrpcClientStreamingTest.java | 10 +-- 4 files changed, 52 insertions(+), 54 deletions(-) diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java index 78d32dae..51ebb151 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -16,8 +16,6 @@ package io.serverlessworkflow.impl.executors.grpc; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.NullNode; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -36,12 +34,9 @@ import io.serverlessworkflow.impl.WorkflowError; import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.executors.CallableTask; -import io.serverlessworkflow.impl.jackson.JsonUtils; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -155,13 +150,16 @@ private static WorkflowModel handleClientStreaming( Map parameters, Descriptors.MethodDescriptor methodDescriptor, ClientCall call) { - JsonNode jsonNode = - ProtobufMessageUtils.asyncStreamingCall( - parameters, - methodDescriptor, - responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), - nodes -> nodes.isEmpty() ? NullNode.instance : nodes.get(0)); - return workflowContext.definition().application().modelFactory().fromAny(jsonNode); + + return ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), + workflowContext.definition().application().modelFactory(), + nodes -> + nodes.isEmpty() + ? workflowContext.definition().application().modelFactory().fromNull() + : nodes); } private static WorkflowModel handleBidiStreaming( @@ -169,20 +167,13 @@ private static WorkflowModel handleBidiStreaming( Map parameters, Descriptors.MethodDescriptor methodDescriptor, ClientCall call) { - return workflowContext - .definition() - .application() - .modelFactory() - .fromAny( - ProtobufMessageUtils.asyncStreamingCall( - parameters, - methodDescriptor, - responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver), - v -> { - Collection nodes = v; - List list = new ArrayList<>(nodes); - return JsonUtils.fromValue(list); - })); + + return ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver), + workflowContext.definition().application().modelFactory(), + v -> v); } private static WorkflowModel handleServerStreaming( @@ -192,10 +183,15 @@ private static WorkflowModel handleServerStreaming( ClientCall call) throws InvalidProtocolBufferException, JsonProcessingException { Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters); - List nodes = new ArrayList<>(); + WorkflowModelCollection modelCollection = + workflowContext.definition().application().modelFactory().createCollection(); ClientCalls.blockingServerStreamingCall(call, builder.build()) - .forEachRemaining(message -> nodes.add(ProtobufMessageUtils.convert(message))); - return workflowContext.definition().application().modelFactory().fromAny(nodes); + .forEachRemaining( + message -> + modelCollection.add( + ProtobufMessageUtils.convert( + message, workflowContext.definition().application().modelFactory()))); + return modelCollection; } private static CompletableFuture handleAsyncUnary( @@ -212,15 +208,12 @@ private static CompletableFuture handleAsyncUnary( ClientCalls.asyncUnaryCall( call, builder.build(), - new StreamObserver() { + new StreamObserver<>() { @Override public void onNext(Message value) { WorkflowModel model = - workflowContext - .definition() - .application() - .modelFactory() - .fromAny(ProtobufMessageUtils.convert(value)); + ProtobufMessageUtils.convert( + value, workflowContext.definition().application().modelFactory()); future.complete(model); } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java index 384b449c..92e7a9f2 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -16,7 +16,6 @@ package io.serverlessworkflow.impl.executors.grpc; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -26,21 +25,22 @@ import io.grpc.MethodDescriptor; import io.grpc.stub.StreamObserver; import io.serverlessworkflow.api.WorkflowFormat; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; +import io.serverlessworkflow.impl.WorkflowModelFactory; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.function.UnaryOperator; -import java.util.stream.Collectors; public interface ProtobufMessageUtils { - static JsonNode convert(Message message) { + static WorkflowModel convert(Message message, WorkflowModelFactory modelFactory) { StringBuilder str = new StringBuilder(); try { JsonFormat.printer().appendTo(message, str); - return WorkflowFormat.JSON.mapper().readTree(str.toString()); + return modelFactory.from(str.toString()); } catch (IOException e) { throw new UncheckedIOException("Error converting protobuf message to JSON", e); } @@ -61,11 +61,12 @@ static MethodDescriptor.MethodType getMethodType( } } - static JsonNode asyncStreamingCall( + static WorkflowModel asyncStreamingCall( Map parameters, com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, UnaryOperator> streamObserverFunction, - Function, JsonNode> nodesFunction) { + WorkflowModelFactory modelFactory, + Function nodesFunction) { WaitingStreamObserver responseObserver = new WaitingStreamObserver(); StreamObserver requestObserver = streamObserverFunction.apply(responseObserver); @@ -82,10 +83,13 @@ static JsonNode asyncStreamingCall( } requestObserver.onCompleted(); - return nodesFunction.apply( - responseObserver.get().stream() - .map(ProtobufMessageUtils::convert) - .collect(Collectors.toList())); + WorkflowModelCollection collection = modelFactory.createCollection(); + + responseObserver.get().stream() + .map(m -> ProtobufMessageUtils.convert(m, modelFactory)) + .forEach(collection::add); + + return nodesFunction.apply(collection); } static Message.Builder buildMessage(Object object, Message.Builder builder) @@ -98,7 +102,6 @@ static Message.Builder buildMessage( Descriptors.MethodDescriptor methodDescriptor, Map parameters) throws InvalidProtocolBufferException, JsonProcessingException { DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType()); - JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(parameters), builder); - return builder; + return buildMessage(parameters, builder); } } diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java index 7554f6d7..8ca1724e 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/JsonUtils.java @@ -64,7 +64,7 @@ public static Collector arrayNodeCollector() { return new Collector() { @Override public BiConsumer accumulator() { - return (arrayNode, item) -> arrayNode.add(item); + return ArrayNode::add; } @Override diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java index 52ffc88a..aca9615d 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java @@ -23,6 +23,7 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.test.grpc.handlers.ContributorClientStreamingHandler; import java.io.IOException; +import java.util.List; import java.util.Map; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -60,10 +61,11 @@ void grpcPerson() throws IOException { WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); - Map output = - workflowDefinition.instance(Map.of()).start().join().asMap().orElseThrow(); + List> list = + workflowDefinition.instance(Map.of()).start().join().asCollection().stream() + .map(m -> m.asMap().orElseThrow()) + .toList(); - Assertions.assertThat(output) - .contains(Map.entry("message", "dependabot[bot] has 1 contributions")); + Assertions.assertThat(list).isNotEmpty(); } } From 5110ecf64e7b9319a5cb780141464b998bb299b0 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Tue, 10 Feb 2026 16:54:59 -0300 Subject: [PATCH 05/14] Refactor asyncStreamingCall method Signed-off-by: Matheus Cruz --- .../impl/executors/grpc/GrpcExecutor.java | 21 +++++++++---------- .../executors/grpc/ProtobufMessageUtils.java | 5 ++--- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java index 51ebb151..05b51673 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -151,15 +151,15 @@ private static WorkflowModel handleClientStreaming( Descriptors.MethodDescriptor methodDescriptor, ClientCall call) { - return ProtobufMessageUtils.asyncStreamingCall( - parameters, - methodDescriptor, - responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), - workflowContext.definition().application().modelFactory(), - nodes -> - nodes.isEmpty() - ? workflowContext.definition().application().modelFactory().fromNull() - : nodes); + WorkflowModel workflowModel = ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), + workflowContext.definition().application().modelFactory()); + + return workflowModel.asCollection().isEmpty() + ? workflowContext.definition().application().modelFactory().fromNull() + : (WorkflowModelCollection) workflowModel.asCollection(); } private static WorkflowModel handleBidiStreaming( @@ -172,8 +172,7 @@ private static WorkflowModel handleBidiStreaming( parameters, methodDescriptor, responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver), - workflowContext.definition().application().modelFactory(), - v -> v); + workflowContext.definition().application().modelFactory()); } private static WorkflowModel handleServerStreaming( diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java index 92e7a9f2..d6c9c90e 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -65,8 +65,7 @@ static WorkflowModel asyncStreamingCall( Map parameters, com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, UnaryOperator> streamObserverFunction, - WorkflowModelFactory modelFactory, - Function nodesFunction) { + WorkflowModelFactory modelFactory) { WaitingStreamObserver responseObserver = new WaitingStreamObserver(); StreamObserver requestObserver = streamObserverFunction.apply(responseObserver); @@ -89,7 +88,7 @@ static WorkflowModel asyncStreamingCall( .map(m -> ProtobufMessageUtils.convert(m, modelFactory)) .forEach(collection::add); - return nodesFunction.apply(collection); + return collection; } static Message.Builder buildMessage(Object object, Message.Builder builder) From 299e39a4ec98ffce9960b23e692ea951cf5878a1 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 12 Feb 2026 10:49:40 -0300 Subject: [PATCH 06/14] Apply spotless Signed-off-by: Matheus Cruz --- .../impl/executors/grpc/GrpcExecutor.java | 7 ++++--- .../impl/executors/grpc/ProtobufMessageUtils.java | 1 - 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java index 05b51673..b859aadf 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -151,15 +151,16 @@ private static WorkflowModel handleClientStreaming( Descriptors.MethodDescriptor methodDescriptor, ClientCall call) { - WorkflowModel workflowModel = ProtobufMessageUtils.asyncStreamingCall( + WorkflowModel workflowModel = + ProtobufMessageUtils.asyncStreamingCall( parameters, methodDescriptor, responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), workflowContext.definition().application().modelFactory()); return workflowModel.asCollection().isEmpty() - ? workflowContext.definition().application().modelFactory().fromNull() - : (WorkflowModelCollection) workflowModel.asCollection(); + ? workflowContext.definition().application().modelFactory().fromNull() + : (WorkflowModelCollection) workflowModel.asCollection(); } private static WorkflowModel handleBidiStreaming( diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java index d6c9c90e..9a74be5d 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; -import java.util.function.Function; import java.util.function.UnaryOperator; public interface ProtobufMessageUtils { From 35251a4a8e4c7a2f6f26a1858feb384cd88a772f Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 12 Feb 2026 11:28:09 -0300 Subject: [PATCH 07/14] Remove jackson Signed-off-by: Matheus Cruz --- impl/grpc/pom.xml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/impl/grpc/pom.xml b/impl/grpc/pom.xml index a3fa8cde..e577f354 100644 --- a/impl/grpc/pom.xml +++ b/impl/grpc/pom.xml @@ -17,10 +17,6 @@ io.serverlessworkflow serverlessworkflow-api - - io.serverlessworkflow - serverlessworkflow-impl-jackson - io.grpc grpc-stub From 99b23ca75994c873e30e33beba149b8aa17c1296 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 12 Feb 2026 14:17:53 -0300 Subject: [PATCH 08/14] Add support to gRPC Signed-off-by: Matheus Cruz --- .../serverlessworkflow/impl/executors/grpc/GrpcExecutor.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java index b859aadf..705d2569 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -62,10 +62,7 @@ public GrpcExecutor( @Override public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - - Map arguments = this.arguments.apply(workflowContext, taskContext, input); - - return buildGrpcCallExecutor(workflowContext, taskContext, arguments); + return buildGrpcCallExecutor(workflowContext, taskContext, this.arguments.apply(workflowContext, taskContext, input)); } private CompletableFuture buildGrpcCallExecutor( From c3f610899089be0f84646abd017abac0c27e719e Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 12 Feb 2026 14:20:03 -0300 Subject: [PATCH 09/14] Remove local var Signed-off-by: Matheus Cruz --- .../serverlessworkflow/impl/executors/grpc/GrpcExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java index 705d2569..f4538509 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -62,7 +62,8 @@ public GrpcExecutor( @Override public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - return buildGrpcCallExecutor(workflowContext, taskContext, this.arguments.apply(workflowContext, taskContext, input)); + return buildGrpcCallExecutor( + workflowContext, taskContext, this.arguments.apply(workflowContext, taskContext, input)); } private CompletableFuture buildGrpcCallExecutor( From f4d9950a16871f423be2b0b0dc274e1742806350 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Thu, 12 Feb 2026 21:01:11 +0100 Subject: [PATCH 10/14] Review suggestions Signed-off-by: fjtirado --- impl/grpc/pom.xml | 8 +- .../grpc/CollectionStreamObserver.java | 54 +++++++++++ .../executors/grpc/FileDescriptorReader.java | 7 +- .../executors/grpc/GrpcChannelResolver.java | 7 +- .../impl/executors/grpc/GrpcExecutor.java | 93 +++++++------------ .../executors/grpc/GrpcExecutorBuilder.java | 9 +- .../executors/grpc/ProtobufMessageUtils.java | 44 ++++----- .../executors/grpc/WaitingStreamObserver.java | 73 --------------- 8 files changed, 121 insertions(+), 174 deletions(-) create mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java delete mode 100644 impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java diff --git a/impl/grpc/pom.xml b/impl/grpc/pom.xml index e577f354..f6583ac9 100644 --- a/impl/grpc/pom.xml +++ b/impl/grpc/pom.xml @@ -13,10 +13,6 @@ io.serverlessworkflow serverlessworkflow-impl-core - - io.serverlessworkflow - serverlessworkflow-api - io.grpc grpc-stub @@ -25,6 +21,10 @@ com.google.protobuf protobuf-java-util + + io.serverlessworkflow + serverlessworkflow-impl-json + com.github.os72 protoc-jar diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java new file mode 100644 index 00000000..cd1c51ef --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/CollectionStreamObserver.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.Message; +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import java.util.concurrent.CompletableFuture; + +class CollectionStreamObserver implements StreamObserver { + private final WorkflowModelCollection modelCollection; + private final WorkflowModelFactory modelFactory; + private final CompletableFuture future; + + public CollectionStreamObserver(WorkflowModelFactory modelFactory) { + this.modelCollection = modelFactory.createCollection(); + this.modelFactory = modelFactory; + this.future = new CompletableFuture<>(); + } + + @Override + public void onNext(Message value) { + modelCollection.add(ProtobufMessageUtils.convert(value, modelFactory)); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + future.complete(modelCollection); + } + + public CompletableFuture future() { + return future; + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java index 36d53b4b..26bb821d 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java @@ -28,7 +28,8 @@ public class FileDescriptorReader { - public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) { + public static FileDescriptorContext readDescriptor( + ExternalResourceHandler externalResourceHandler) { Path grpcDir = tryCreateTempGrpcDir() .orElseThrow( @@ -65,7 +66,7 @@ public FileDescriptorContext readDescriptor(ExternalResourceHandler externalReso } } - private Optional tryCreateTempGrpcDir() { + private static Optional tryCreateTempGrpcDir() { try { return Optional.of(Files.createTempDirectory("serverless-workflow-")); } catch (IOException e) { @@ -105,4 +106,6 @@ private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path de throw new UncheckedIOException("Unable to generate file descriptor", e); } } + + private FileDescriptorReader() {} } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java index c13f5de5..b68d7d9d 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java @@ -18,7 +18,6 @@ import io.grpc.Channel; import io.grpc.ManagedChannelBuilder; import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; public class GrpcChannelResolver { @@ -29,8 +28,10 @@ public static Channel channel( WorkflowContext workflowContext, TaskContext taskContext, GrpcRequestContext grpcRequestContext) { - WorkflowApplication appl = workflowContext.definition().application(); - return appl.additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext) + return workflowContext + .definition() + .application() + .additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext) .orElseGet( () -> ManagedChannelBuilder.forAddress( diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java index f4538509..fed38068 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -15,7 +15,6 @@ */ package io.serverlessworkflow.impl.executors.grpc; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -28,13 +27,11 @@ import io.grpc.protobuf.ProtoUtils; import io.grpc.stub.ClientCalls; import io.grpc.stub.StreamObserver; -import io.serverlessworkflow.api.types.ExternalResource; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowError; import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.executors.CallableTask; import java.util.Map; @@ -46,17 +43,14 @@ public class GrpcExecutor implements CallableTask { private final GrpcRequestContext requestContext; private final WorkflowValueResolver> arguments; private final FileDescriptorContext fileDescriptorContext; - private final ExternalResource proto; public GrpcExecutor( GrpcRequestContext builder, WorkflowValueResolver> arguments, - FileDescriptorContext fileDescriptorContext, - ExternalResource proto) { + FileDescriptorContext fileDescriptorContext) { this.requestContext = builder; this.arguments = arguments; this.fileDescriptorContext = fileDescriptorContext; - this.proto = proto; } @Override @@ -71,14 +65,9 @@ private CompletableFuture buildGrpcCallExecutor( Channel channel = GrpcChannelResolver.channel(workflowContext, taskContext, requestContext); - String protoName = fileDescriptorContext.inputProto(); - DescriptorProtos.FileDescriptorProto fileDescriptorProto = fileDescriptorContext.fileDescriptorSet().getFileList().stream() - .filter( - file -> - file.getName() - .equals(this.proto.getName() != null ? this.proto.getName() : protoName)) + .filter(file -> file.getName().equals(fileDescriptorContext.inputProto())) .findFirst() .orElseThrow(() -> new IllegalStateException("Proto file not found in descriptor set")); @@ -88,14 +77,14 @@ private CompletableFuture buildGrpcCallExecutor( fileDescriptorProto, new Descriptors.FileDescriptor[] {}); Descriptors.ServiceDescriptor serviceDescriptor = - fileDescriptor.findServiceByName(requestContext.service()); - - Objects.requireNonNull(serviceDescriptor, "Service not found: " + requestContext.service()); + Objects.requireNonNull( + fileDescriptor.findServiceByName(requestContext.service()), + "Service not found: " + requestContext.service()); Descriptors.MethodDescriptor methodDescriptor = - serviceDescriptor.findMethodByName(requestContext.method()); - - Objects.requireNonNull(methodDescriptor, "Method not found: " + requestContext.method()); + Objects.requireNonNull( + serviceDescriptor.findMethodByName(requestContext.method()), + "Method not found: " + requestContext.method()); MethodDescriptor.MethodType methodType = ProtobufMessageUtils.getMethodType(methodDescriptor); @@ -104,20 +93,15 @@ private CompletableFuture buildGrpcCallExecutor( return switch (methodType) { case CLIENT_STREAMING -> - CompletableFuture.completedFuture( - handleClientStreaming(workflowContext, arguments, methodDescriptor, call)); + handleClientStreaming(workflowContext, arguments, methodDescriptor, call); case BIDI_STREAMING -> - CompletableFuture.completedFuture( - handleBidiStreaming(workflowContext, arguments, methodDescriptor, call)); + handleBidiStreaming(workflowContext, arguments, methodDescriptor, call); case SERVER_STREAMING -> - CompletableFuture.completedFuture( - handleServerStreaming(workflowContext, methodDescriptor, arguments, call)); + handleServerStreaming(workflowContext, methodDescriptor, arguments, call); case UNARY, UNKNOWN -> handleAsyncUnary(workflowContext, methodDescriptor, arguments, call); }; - } catch (Descriptors.DescriptorValidationException - | InvalidProtocolBufferException - | JsonProcessingException e) { + } catch (Descriptors.DescriptorValidationException | InvalidProtocolBufferException e) { throw new WorkflowException(WorkflowError.runtime(taskContext, e).build()); } } @@ -143,25 +127,19 @@ private static ClientCall buildClientCall( CallOptions.DEFAULT.withWaitForReady()); } - private static WorkflowModel handleClientStreaming( + private static CompletableFuture handleClientStreaming( WorkflowContext workflowContext, Map parameters, Descriptors.MethodDescriptor methodDescriptor, ClientCall call) { - - WorkflowModel workflowModel = - ProtobufMessageUtils.asyncStreamingCall( - parameters, - methodDescriptor, - responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), - workflowContext.definition().application().modelFactory()); - - return workflowModel.asCollection().isEmpty() - ? workflowContext.definition().application().modelFactory().fromNull() - : (WorkflowModelCollection) workflowModel.asCollection(); + return ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), + workflowContext.definition().application().modelFactory()); } - private static WorkflowModel handleBidiStreaming( + private static CompletableFuture handleBidiStreaming( WorkflowContext workflowContext, Map parameters, Descriptors.MethodDescriptor methodDescriptor, @@ -174,22 +152,17 @@ private static WorkflowModel handleBidiStreaming( workflowContext.definition().application().modelFactory()); } - private static WorkflowModel handleServerStreaming( + private static CompletableFuture handleServerStreaming( WorkflowContext workflowContext, Descriptors.MethodDescriptor methodDescriptor, Map parameters, ClientCall call) - throws InvalidProtocolBufferException, JsonProcessingException { - Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters); - WorkflowModelCollection modelCollection = - workflowContext.definition().application().modelFactory().createCollection(); - ClientCalls.blockingServerStreamingCall(call, builder.build()) - .forEachRemaining( - message -> - modelCollection.add( - ProtobufMessageUtils.convert( - message, workflowContext.definition().application().modelFactory()))); - return modelCollection; + throws InvalidProtocolBufferException { + CollectionStreamObserver observer = + new CollectionStreamObserver(workflowContext.definition().application().modelFactory()); + ClientCalls.asyncServerStreamingCall( + call, ProtobufMessageUtils.buildMessage(methodDescriptor, parameters).build(), observer); + return observer.future(); } private static CompletableFuture handleAsyncUnary( @@ -197,22 +170,20 @@ private static CompletableFuture handleAsyncUnary( Descriptors.MethodDescriptor methodDescriptor, Map parameters, ClientCall call) - throws InvalidProtocolBufferException, JsonProcessingException { + throws InvalidProtocolBufferException { CompletableFuture future = new CompletableFuture<>(); - - Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters); - ClientCalls.asyncUnaryCall( call, - builder.build(), + ProtobufMessageUtils.buildMessage(methodDescriptor, parameters).build(), new StreamObserver<>() { + private WorkflowModel model; + @Override public void onNext(Message value) { - WorkflowModel model = + model = ProtobufMessageUtils.convert( value, workflowContext.definition().application().modelFactory()); - future.complete(model); } @Override @@ -222,7 +193,7 @@ public void onError(Throwable t) { @Override public void onCompleted() { - // no-op + future.complete(model); } }); return future; diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java index 49ac240d..0e4f39c6 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java @@ -16,7 +16,6 @@ package io.serverlessworkflow.impl.executors.grpc; import io.serverlessworkflow.api.types.CallGRPC; -import io.serverlessworkflow.api.types.ExternalResource; import io.serverlessworkflow.api.types.GRPCArguments; import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.api.types.WithGRPCService; @@ -30,7 +29,6 @@ public class GrpcExecutorBuilder implements CallableTaskBuilder { - private ExternalResource proto; private GrpcRequestContext grpcRequestContext; private FileDescriptorContext fileDescriptorContext; private WorkflowValueResolver> arguments; @@ -45,7 +43,6 @@ public void init(CallGRPC task, WorkflowDefinition definition, WorkflowMutablePo GRPCArguments with = task.getWith(); WithGRPCService service = with.getService(); - this.proto = with.getProto(); this.arguments = WorkflowUtils.buildMapResolver( @@ -56,16 +53,14 @@ public void init(CallGRPC task, WorkflowDefinition definition, WorkflowMutablePo new GrpcRequestContext( service.getHost(), service.getPort(), with.getMethod(), service.getName()); - FileDescriptorReader fileDescriptorReader = new FileDescriptorReader(); this.fileDescriptorContext = definition .resourceLoader() - .loadStatic(with.getProto().getEndpoint(), fileDescriptorReader::readDescriptor); + .loadStatic(with.getProto().getEndpoint(), FileDescriptorReader::readDescriptor); } @Override public CallableTask build() { - return new GrpcExecutor( - this.grpcRequestContext, this.arguments, this.fileDescriptorContext, this.proto); + return new GrpcExecutor(this.grpcRequestContext, this.arguments, this.fileDescriptorContext); } } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java index 9a74be5d..2a14dbae 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -24,13 +24,13 @@ import com.google.protobuf.util.JsonFormat; import io.grpc.MethodDescriptor; import io.grpc.stub.StreamObserver; -import io.serverlessworkflow.api.WorkflowFormat; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.jackson.JsonUtils; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.function.UnaryOperator; public interface ProtobufMessageUtils { @@ -60,45 +60,41 @@ static MethodDescriptor.MethodType getMethodType( } } - static WorkflowModel asyncStreamingCall( + static CompletableFuture asyncStreamingCall( Map parameters, com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, UnaryOperator> streamObserverFunction, WorkflowModelFactory modelFactory) { - WaitingStreamObserver responseObserver = new WaitingStreamObserver(); + CollectionStreamObserver responseObserver = new CollectionStreamObserver(modelFactory); StreamObserver requestObserver = streamObserverFunction.apply(responseObserver); - - for (var entry : parameters.entrySet()) { + for (Object entry : parameters.entrySet()) { try { - Message message = - buildMessage(entry, DynamicMessage.newBuilder(methodDescriptor.getInputType())).build(); - requestObserver.onNext(message); - } catch (Exception e) { + requestObserver.onNext( + buildMessage(entry, DynamicMessage.newBuilder(methodDescriptor.getInputType())) + .build()); + } catch (InvalidProtocolBufferException e) { requestObserver.onError(e); - throw new RuntimeException(e); } - responseObserver.checkForServerStreamErrors(); } requestObserver.onCompleted(); - - WorkflowModelCollection collection = modelFactory.createCollection(); - - responseObserver.get().stream() - .map(m -> ProtobufMessageUtils.convert(m, modelFactory)) - .forEach(collection::add); - - return collection; + return responseObserver.future(); } static Message.Builder buildMessage(Object object, Message.Builder builder) - throws InvalidProtocolBufferException, JsonProcessingException { - JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(object), builder); - return builder; + throws InvalidProtocolBufferException { + try { + // lets use Jackson to serialize the object to string for now, although we probably need to + // revisit this. + JsonFormat.parser().merge(JsonUtils.mapper().writeValueAsString(object), builder); + return builder; + } catch (JsonProcessingException e) { + throw new InvalidProtocolBufferException(e); + } } static Message.Builder buildMessage( Descriptors.MethodDescriptor methodDescriptor, Map parameters) - throws InvalidProtocolBufferException, JsonProcessingException { + throws InvalidProtocolBufferException { DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType()); return buildMessage(parameters, builder); } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java deleted file mode 100644 index 971951d3..00000000 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.executors.grpc; - -import com.google.protobuf.Message; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; - -public class WaitingStreamObserver implements StreamObserver { - - List responses = new ArrayList<>(); - CompletableFuture> responsesFuture = new CompletableFuture<>(); - - @Override - public void onNext(Message messageReply) { - responses.add(messageReply); - } - - @Override - public void onError(Throwable throwable) { - responsesFuture.completeExceptionally(throwable); - } - - @Override - public void onCompleted() { - responsesFuture.complete(responses); - } - - public List get() { - try { - return responsesFuture.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); - } catch (ExecutionException e) { - throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause()); - } - } - - public void checkForServerStreamErrors() { - if (responsesFuture.isCompletedExceptionally()) { - try { - responsesFuture.join(); - } catch (CompletionException e) { - throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause()); - } - } - } - - private String getServerStreamErrorMessage(Throwable throwable) { - return String.format( - "Received an error through gRPC server stream with status: %s", - Status.fromThrowable(throwable)); - } -} From a3fbbfa831b77d553eccc748e4187139b91ff803 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 12 Feb 2026 20:18:37 -0300 Subject: [PATCH 11/14] Change FileDescriptorReader to be a interface Signed-off-by: Matheus Cruz --- .../impl/executors/grpc/FileDescriptorReader.java | 7 ++----- .../impl/executors/grpc/ProtobufMessageUtils.java | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java index 26bb821d..d2ccc49d 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java @@ -26,10 +26,9 @@ import java.nio.file.StandardCopyOption; import java.util.Optional; -public class FileDescriptorReader { +public interface FileDescriptorReader { - public static FileDescriptorContext readDescriptor( - ExternalResourceHandler externalResourceHandler) { + static FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) { Path grpcDir = tryCreateTempGrpcDir() .orElseThrow( @@ -106,6 +105,4 @@ private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path de throw new UncheckedIOException("Unable to generate file descriptor", e); } } - - private FileDescriptorReader() {} } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java index 2a14dbae..7b9bcd42 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -83,7 +83,7 @@ static CompletableFuture asyncStreamingCall( static Message.Builder buildMessage(Object object, Message.Builder builder) throws InvalidProtocolBufferException { try { - // lets use Jackson to serialize the object to string for now, although we probably need to + // let's use Jackson to serialize the object to string for now, although we probably need to // revisit this. JsonFormat.parser().merge(JsonUtils.mapper().writeValueAsString(object), builder); return builder; From 61a53fc1969a0f5a86d02b2f775712480b107a03 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Thu, 12 Feb 2026 21:27:50 -0300 Subject: [PATCH 12/14] Add support to local system protoc binary Signed-off-by: Matheus Cruz --- .../executors/grpc/FileDescriptorReader.java | 51 ++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java index d2ccc49d..7967c22c 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java @@ -18,6 +18,7 @@ import com.github.os72.protocjar.Protoc; import com.google.protobuf.DescriptorProtos; import io.serverlessworkflow.impl.resources.ExternalResourceHandler; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; @@ -25,9 +26,13 @@ import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public interface FileDescriptorReader { + Logger logger = LoggerFactory.getLogger(FileDescriptorReader.class); + static FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) { Path grpcDir = tryCreateTempGrpcDir() @@ -74,7 +79,10 @@ private static Optional tryCreateTempGrpcDir() { } /** - * Calls protoc binary with --descriptor_set_out= option set. + * Calls protoc binary with --descriptor_set_out= option set. First attempts to use + * the embedded protoc from protoc-jar library. If that fails with FileNotFoundException + * (unsupported architecture), falls back to using the system's installed protoc via + * ProcessBuilder. * * @param grpcDir a temporary directory * @param protoFile the .proto file used by protoc to generate the file descriptor @@ -91,13 +99,19 @@ private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path de }; try { - + // First attempt: use protoc-jar library int status = Protoc.runProtoc(protocArgs); if (status != 0) { throw new RuntimeException( "Unable to generate file descriptor, 'protoc' execution failed with status " + status); } + } catch (FileNotFoundException e) { + // Fallback: try using system's installed protoc + logger.warn( + "Protoc binary not available for this architecture via protoc-jar. " + + "Attempting to use system-installed protoc..."); + generateFileDescriptorWithProcessBuilder(protocArgs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Unable to generate file descriptor", e); @@ -105,4 +119,37 @@ private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path de throw new UncheckedIOException("Unable to generate file descriptor", e); } } + + /** + * Fallback method to generate file descriptor using system's installed protoc via ProcessBuilder. + * + * @param protocArgs the arguments to pass to protoc command + */ + private static void generateFileDescriptorWithProcessBuilder(String[] protocArgs) { + try { + String[] command = new String[protocArgs.length + 1]; + command[0] = "protoc"; + System.arraycopy(protocArgs, 0, command, 1, protocArgs.length); + + ProcessBuilder processBuilder = new ProcessBuilder(command); + + processBuilder.redirectErrorStream(true); + + Process process = processBuilder.start(); + + int exitCode = process.waitFor(); + + if (exitCode != 0) { + throw new RuntimeException("Unable to generate file descriptor using system protoc."); + } + + } catch (IOException e) { + throw new UncheckedIOException( + "Unable to execute system protoc. Please ensure 'protoc' is installed and available in your system PATH.", + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Protoc execution was interrupted", e); + } + } } From 75dd7eeea7ab3a8ee729177f64d8505f72337ca8 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti <65240126+fjtirado@users.noreply.github.com> Date: Fri, 13 Feb 2026 11:12:48 +0100 Subject: [PATCH 13/14] Undo changes on HttpExecutor Signed-off-by: fjtirado --- .../impl/executors/http/HttpExecutor.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java index 127f669d..c787cd2b 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java @@ -21,7 +21,6 @@ import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.executors.CallableTask; -import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.Invocation.Builder; import jakarta.ws.rs.client.WebTarget; import java.net.URI; @@ -70,10 +69,7 @@ public CompletableFuture apply( p.apply(workflow, taskContext, input))) .orElse(uriSupplier.apply(workflow, taskContext, input)); - Client client = HttpClientResolver.client(workflow, taskContext); - - WebTarget target = client.target(uri); - + WebTarget target = HttpClientResolver.client(workflow, taskContext).target(uri); for (Entry entry : queryMap.map(q -> q.apply(workflow, taskContext, input)).orElse(Map.of()).entrySet()) { target = target.queryParam(entry.getKey(), entry.getValue()); From 3915ae376b74297b4139e46b53c6a098927662b2 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Fri, 13 Feb 2026 11:24:04 +0100 Subject: [PATCH 14/14] FileDescriptorReader is a static classs, not an interface. Also reducing the number of public classes Signed-off-by: fjtirado --- .../impl/executors/grpc/FileDescriptorContext.java | 2 +- .../impl/executors/grpc/FileDescriptorReader.java | 12 +++++++----- .../impl/executors/grpc/GrpcChannelResolver.java | 6 +++--- .../impl/executors/grpc/GrpcExecutor.java | 4 ++-- .../impl/executors/grpc/GrpcRequestContext.java | 2 +- .../impl/executors/grpc/ProtobufMessageUtils.java | 4 +++- 6 files changed, 17 insertions(+), 13 deletions(-) diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java index a111e9dd..158fab51 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java @@ -17,5 +17,5 @@ import com.google.protobuf.DescriptorProtos; -public record FileDescriptorContext( +record FileDescriptorContext( DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java index 7967c22c..f2123e0e 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java @@ -29,9 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public interface FileDescriptorReader { +class FileDescriptorReader { - Logger logger = LoggerFactory.getLogger(FileDescriptorReader.class); + private static final Logger logger = LoggerFactory.getLogger(FileDescriptorReader.class); static FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) { Path grpcDir = @@ -140,16 +140,18 @@ private static void generateFileDescriptorWithProcessBuilder(String[] protocArgs int exitCode = process.waitFor(); if (exitCode != 0) { - throw new RuntimeException("Unable to generate file descriptor using system protoc."); + throw new IllegalStateException( + "Unable to generate file descriptor using system protoc. Exit code: " + exitCode); } - } catch (IOException e) { throw new UncheckedIOException( "Unable to execute system protoc. Please ensure 'protoc' is installed and available in your system PATH.", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException("Protoc execution was interrupted", e); + throw new IllegalStateException("Protoc execution was interrupted", e); } } + + private FileDescriptorReader() {} } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java index b68d7d9d..dabde19b 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java @@ -20,11 +20,11 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -public class GrpcChannelResolver { +class GrpcChannelResolver { - public static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider"; + static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider"; - public static Channel channel( + static Channel channel( WorkflowContext workflowContext, TaskContext taskContext, GrpcRequestContext grpcRequestContext) { diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java index fed38068..44a10127 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -45,10 +45,10 @@ public class GrpcExecutor implements CallableTask { private final FileDescriptorContext fileDescriptorContext; public GrpcExecutor( - GrpcRequestContext builder, + GrpcRequestContext requestContext, WorkflowValueResolver> arguments, FileDescriptorContext fileDescriptorContext) { - this.requestContext = builder; + this.requestContext = requestContext; this.arguments = arguments; this.fileDescriptorContext = fileDescriptorContext; } diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java index 2abab646..cbdb5716 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java @@ -15,4 +15,4 @@ */ package io.serverlessworkflow.impl.executors.grpc; -public record GrpcRequestContext(String address, int port, String method, String service) {} +record GrpcRequestContext(String address, int port, String method, String service) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java index 7b9bcd42..0c2eec48 100644 --- a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -33,7 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.UnaryOperator; -public interface ProtobufMessageUtils { +class ProtobufMessageUtils { static WorkflowModel convert(Message message, WorkflowModelFactory modelFactory) { StringBuilder str = new StringBuilder(); @@ -98,4 +98,6 @@ static Message.Builder buildMessage( DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType()); return buildMessage(parameters, builder); } + + private ProtobufMessageUtils() {} }