diff --git a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventEncoderFactory.java b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventEncoderFactory.java index 73af2b5da..de9ee4393 100644 --- a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventEncoderFactory.java +++ b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventEncoderFactory.java @@ -15,6 +15,7 @@ import software.amazon.smithy.java.core.serde.event.EventEncoderFactory; import software.amazon.smithy.java.core.serde.event.EventStreamingException; import software.amazon.smithy.java.core.serde.event.FrameEncoder; +import software.amazon.smithy.java.core.serde.event.FrameTransformer; /** * A {@link EventEncoderFactory} for AWS events. @@ -24,6 +25,7 @@ public final class AwsEventEncoderFactory implements EventEncoderFactory transformer; private final Function exceptionHandler; private AwsEventEncoderFactory( @@ -31,12 +33,14 @@ private AwsEventEncoderFactory( Schema schema, Codec codec, String payloadMediaType, + FrameTransformer transformer, Function exceptionHandler ) { this.initialEventType = Objects.requireNonNull(initialEventType, "initialEventType"); this.schema = Objects.requireNonNull(schema, "schema").isMember() ? schema.memberTarget() : schema; this.codec = Objects.requireNonNull(codec, "codec"); this.payloadMediaType = Objects.requireNonNull(payloadMediaType, "payloadMediaType"); + this.transformer = Objects.requireNonNull(transformer, "transformer"); this.exceptionHandler = Objects.requireNonNull(exceptionHandler, "exceptionHandler"); } @@ -53,12 +57,14 @@ public static AwsEventEncoderFactory forInputStream( InputEventStreamingApiOperation operation, Codec codec, String payloadMediaType, + FrameTransformer transformer, Function exceptionHandler ) { return new AwsEventEncoderFactory(InitialEventType.INITIAL_REQUEST, operation.inputStreamMember(), codec, payloadMediaType, + transformer, exceptionHandler); } @@ -75,18 +81,25 @@ public static AwsEventEncoderFactory forOutputStream( OutputEventStreamingApiOperation operation, Codec codec, String payloadMediaType, + FrameTransformer transformer, Function exceptionHandler ) { return new AwsEventEncoderFactory(InitialEventType.INITIAL_RESPONSE, operation.outputStreamMember(), codec, payloadMediaType, + transformer, exceptionHandler); } @Override public EventEncoder newEventEncoder() { - return new AwsEventShapeEncoder(initialEventType, schema, codec, payloadMediaType, exceptionHandler); + return new AwsEventShapeEncoder(initialEventType, + schema, + codec, + payloadMediaType, + transformer, + exceptionHandler); } @Override diff --git a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoder.java b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoder.java index e564cf51b..ca2a7a95b 100644 --- a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoder.java +++ b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoder.java @@ -9,7 +9,6 @@ import java.time.Instant; import java.util.Map; import java.util.Objects; -import java.util.concurrent.Flow; import java.util.function.Supplier; import software.amazon.eventstream.HeaderValue; import software.amazon.eventstream.Message; @@ -21,6 +20,7 @@ import software.amazon.smithy.java.core.serde.ShapeDeserializer; import software.amazon.smithy.java.core.serde.SpecificShapeDeserializer; import software.amazon.smithy.java.core.serde.event.EventDecoder; +import software.amazon.smithy.java.core.serde.event.EventStream; /** * A decoder for AWS events @@ -36,7 +36,6 @@ public final class AwsEventShapeDecoder> eventBuilder; private final Schema eventSchema; private final Codec codec; - private volatile Flow.Publisher publisher; AwsEventShapeDecoder( InitialEventType initialEventType, @@ -54,19 +53,9 @@ public final class AwsEventShapeDecoder publisher) { - this.publisher = publisher; - } - private E decodeEvent(AwsEventFrame frame) { var message = frame.unwrap(); var eventType = getEventType(message); @@ -85,12 +74,13 @@ private E decodeEvent(AwsEventFrame frame) { return builder.build(); } - private IR decodeInitialResponse(AwsEventFrame frame) { + @Override + public IR decodeInitialEvent(AwsEventFrame frame, EventStream eventStream) { var message = frame.unwrap(); var builder = initialEventBuilder.get(); - var publisherMember = getPublisherMember(builder.schema()); + var publisherMember = getEventStreamMember(builder.schema()); // Set the publisher member - var responseDeserializer = new InitialResponseDeserializer(publisherMember, publisher); + var responseDeserializer = new InitialResponseDeserializer(publisherMember, eventStream); builder.deserialize(responseDeserializer); // Deserialize the rest of the members if any var headers = message.getHeaders(); @@ -100,7 +90,7 @@ private IR decodeInitialResponse(AwsEventFrame frame) { return builder.build(); } - private Schema getPublisherMember(Schema schema) { + private Schema getEventStreamMember(Schema schema) { for (var member : schema.members()) { if (member.memberTarget().hasTrait(TraitKey.STREAMING_TRAIT)) { return member; @@ -115,16 +105,16 @@ private String getEventType(Message message) { static class InitialResponseDeserializer extends SpecificShapeDeserializer { private final Schema publisherMember; - private final Flow.Publisher publisher; + private final EventStream eventStream; - InitialResponseDeserializer(Schema publisherMember, Flow.Publisher publisher) { + InitialResponseDeserializer(Schema publisherMember, EventStream eventStream) { this.publisherMember = publisherMember; - this.publisher = publisher; + this.eventStream = eventStream; } @Override - public Flow.Publisher readEventStream(Schema schema) { - return publisher; + public EventStream readEventStream(Schema schema) { + return eventStream; } @Override diff --git a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventShapeEncoder.java b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventShapeEncoder.java index 7f1a8e042..eef6f7a3c 100644 --- a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventShapeEncoder.java +++ b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventShapeEncoder.java @@ -26,6 +26,7 @@ import software.amazon.smithy.java.core.serde.SpecificShapeSerializer; import software.amazon.smithy.java.core.serde.event.EventEncoder; import software.amazon.smithy.java.core.serde.event.EventStreamingException; +import software.amazon.smithy.java.core.serde.event.FrameTransformer; import software.amazon.smithy.model.shapes.ShapeId; public final class AwsEventShapeEncoder implements EventEncoder { @@ -35,6 +36,7 @@ public final class AwsEventShapeEncoder implements EventEncoder { private final String payloadMediaType; private final Map, ShapeSerializer>> possibleTypes; private final Map possibleExceptions; + private final FrameTransformer frameTransformer; private final Function exceptionHandler; public AwsEventShapeEncoder( @@ -42,6 +44,7 @@ public AwsEventShapeEncoder( Schema eventSchema, Codec codec, String payloadMediaType, + FrameTransformer frameTransformer, Function exceptionHandler ) { this.initialEventType = Objects.requireNonNull(initialEventType, "initialEventType"); @@ -51,6 +54,7 @@ public AwsEventShapeEncoder( codec, initialEventType.value()); this.possibleExceptions = possibleExceptions(Objects.requireNonNull(eventSchema, "eventSchema")); + this.frameTransformer = Objects.requireNonNull(frameTransformer, "frameTransformer"); this.exceptionHandler = Objects.requireNonNull(exceptionHandler, "exceptionHandler"); } @@ -62,7 +66,8 @@ public AwsEventFrame encode(SerializableStruct item) { headers.put(":message-type", HeaderValue.fromString("event")); headers.put(":event-type", HeaderValue.fromString(typeHolder.get())); headers.put(":content-type", HeaderValue.fromString(payloadMediaType)); - return new AwsEventFrame(new Message(headers, payload)); + var frame = new AwsEventFrame(new Message(headers, payload)); + return frameTransformer.apply(frame); } private byte[] encodeInput( diff --git a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsFrameDecoder.java b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsFrameDecoder.java index 8e4f1d371..5e0bf597d 100644 --- a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsFrameDecoder.java +++ b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsFrameDecoder.java @@ -24,7 +24,7 @@ public AwsFrameDecoder(FrameTransformer transformer) { public List decode(ByteBuffer buffer) { decoder.feed(buffer); var messages = decoder.getDecodedMessages(); - var result = new ArrayList(); + var result = new ArrayList(messages.size()); for (var message : messages) { var event = new AwsEventFrame(message); var transformed = transformer.apply(event); diff --git a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/RpcEventStreamsUtil.java b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/RpcEventStreamsUtil.java index f8a344dc8..e89aced71 100644 --- a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/RpcEventStreamsUtil.java +++ b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/RpcEventStreamsUtil.java @@ -5,16 +5,14 @@ package software.amazon.smithy.java.aws.events; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Flow; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.schema.TraitKey; import software.amazon.smithy.java.core.serde.event.EventDecoderFactory; import software.amazon.smithy.java.core.serde.event.EventEncoderFactory; -import software.amazon.smithy.java.core.serde.event.EventStreamFrameDecodingProcessor; -import software.amazon.smithy.java.core.serde.event.EventStreamFrameEncodingProcessor; +import software.amazon.smithy.java.core.serde.event.EventStream; +import software.amazon.smithy.java.core.serde.event.InternalEventStreamReader; +import software.amazon.smithy.java.core.serde.event.InternalEventStreamWriter; import software.amazon.smithy.java.io.datastream.DataStream; /** @@ -24,47 +22,36 @@ public final class RpcEventStreamsUtil { private RpcEventStreamsUtil() {} - public static Flow.Publisher bodyForEventStreaming( + @SuppressWarnings("unchecked") + public static DataStream bodyForEventStreaming( EventEncoderFactory eventStreamEncodingFactory, SerializableStruct input ) { - Flow.Publisher eventStream = input.getMemberValue(streamingMember(input.schema())); - return EventStreamFrameEncodingProcessor.create(eventStream, eventStreamEncodingFactory, input); - } - - // TODO: Make more synchronous - public static O deserializeResponse( - EventDecoderFactory eventDecoderFactory, - DataStream bodyDataStream - ) { - var result = new CompletableFuture(); - var processor = EventStreamFrameDecodingProcessor.create(bodyDataStream, eventDecoderFactory); - - // A subscriber to serialize the initial event. - processor.subscribe(new Flow.Subscriber<>() { - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscription.request(1); - } - + EventStream eventStream = input.getMemberValue(streamingMember(input.schema())); + InternalEventStreamWriter writer = + InternalEventStreamWriter.toInternal(eventStream); + writer.bootstrap(new InternalEventStreamWriter.Bootstrap<>() { @Override - @SuppressWarnings("unchecked") - public void onNext(SerializableStruct item) { - result.complete((O) item); + public EventEncoderFactory encoder() { + return eventStreamEncodingFactory; } @Override - public void onError(Throwable throwable) { - result.completeExceptionally(throwable); - } - - @Override - public void onComplete() { - result.completeExceptionally(new RuntimeException("Unexpected event stream completion")); + public SerializableStruct initialEvent() { + return input; } }); + return writer.toDataStream(); + } - return result.join(); + public static O deserializeResponse( + EventDecoderFactory eventDecoderFactory, + DataStream bodyDataStream + ) { + var reader = InternalEventStreamReader.newReader(bodyDataStream, + eventDecoderFactory, + true); + return reader.readInitialEvent(); } private static Schema streamingMember(Schema schema) { @@ -75,5 +62,4 @@ private static Schema streamingMember(Schema schema) { } throw new IllegalArgumentException("No streaming member found"); } - } diff --git a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoderTest.java b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoderTest.java index befffa5a4..6a47fff86 100644 --- a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoderTest.java +++ b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoderTest.java @@ -35,7 +35,7 @@ public void testDecodeInitialResponse() { var frame = new AwsEventFrame(message); // Act - var struct = createDecoder().decode(frame); + var struct = createDecoder().decodeInitialEvent(frame, null); // Assert assertInstanceOf(TestOperationOutput.class, struct); diff --git a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeEncoderTest.java b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeEncoderTest.java index fb6ba54ea..8bb8a8962 100644 --- a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeEncoderTest.java +++ b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeEncoderTest.java @@ -20,6 +20,7 @@ import software.amazon.smithy.java.aws.events.model.TestOperationInput; import software.amazon.smithy.java.core.serde.Codec; import software.amazon.smithy.java.core.serde.event.EventStreamingException; +import software.amazon.smithy.java.core.serde.event.FrameTransformer; import software.amazon.smithy.java.json.JsonCodec; class AwsEventShapeEncoderTest { @@ -135,6 +136,7 @@ static AwsEventShapeEncoder createEncoder() { TestOperation.instance().inputStreamMember(), // event schema createJsonCodec(), // codec "text/json", + FrameTransformer.identity(), (e) -> new EventStreamingException("InternalServerException", "Internal Server Error")); } diff --git a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperationInput.java b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperationInput.java index 0c9569716..787f29eb9 100644 --- a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperationInput.java +++ b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperationInput.java @@ -6,7 +6,6 @@ package software.amazon.smithy.java.aws.events.model; import java.util.Objects; -import java.util.concurrent.Flow.Publisher; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SchemaUtils; import software.amazon.smithy.java.core.schema.SerializableStruct; @@ -14,6 +13,7 @@ import software.amazon.smithy.java.core.serde.ShapeDeserializer; import software.amazon.smithy.java.core.serde.ShapeSerializer; import software.amazon.smithy.java.core.serde.ToStringSerializer; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.model.shapes.ShapeId; import software.amazon.smithy.utils.SmithyGenerated; @@ -29,7 +29,7 @@ public final class TestOperationInput implements SerializableStruct { private final transient String headerString; private final transient String inputStringMember; - private final transient Publisher stream; + private final transient EventStream stream; private TestOperationInput(Builder builder) { this.headerString = builder.headerString; @@ -45,7 +45,7 @@ public String getInputStringMember() { return inputStringMember; } - public Publisher getStream() { + public EventStream getStream() { return stream; } @@ -130,7 +130,7 @@ public static Builder builder() { public static final class Builder implements ShapeBuilder { private String headerString; private String inputStringMember; - private Publisher stream; + private EventStream stream; private Builder() {} @@ -158,7 +158,7 @@ public Builder inputStringMember(String inputStringMember) { /** * @return this builder. */ - public Builder stream(Publisher stream) { + public Builder stream(EventStream stream) { this.stream = stream; return this; } @@ -176,7 +176,8 @@ public void setMemberValue(Schema member, Object value) { case 1 -> inputStringMember( (String) SchemaUtils.validateSameMember($SCHEMA_INPUT_STRING_MEMBER, member, value)); case 2 -> - stream((Publisher) SchemaUtils.validateSameMember($SCHEMA_STREAM, member, value)); + stream((EventStream) SchemaUtils + .validateSameMember($SCHEMA_STREAM, member, value)); default -> ShapeBuilder.super.setMemberValue(member, value); } } @@ -201,7 +202,7 @@ public void accept(Builder builder, Schema member, ShapeDeserializer de) { switch (member.memberIndex()) { case 0 -> builder.headerString(de.readString(member)); case 1 -> builder.inputStringMember(de.readString(member)); - case 2 -> builder.stream((Publisher) de.readEventStream(member)); + case 2 -> builder.stream((EventStream) de.readEventStream(member)); default -> throw new IllegalArgumentException("Unexpected member: " + member.memberName()); } } diff --git a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperationOutput.java b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperationOutput.java index aaeb88ae6..3ddd4f1d1 100644 --- a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperationOutput.java +++ b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperationOutput.java @@ -6,7 +6,6 @@ package software.amazon.smithy.java.aws.events.model; import java.util.Objects; -import java.util.concurrent.Flow.Publisher; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SchemaUtils; import software.amazon.smithy.java.core.schema.SerializableStruct; @@ -14,6 +13,7 @@ import software.amazon.smithy.java.core.serde.ShapeDeserializer; import software.amazon.smithy.java.core.serde.ShapeSerializer; import software.amazon.smithy.java.core.serde.ToStringSerializer; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.model.shapes.ShapeId; import software.amazon.smithy.utils.SmithyGenerated; @@ -29,7 +29,7 @@ public final class TestOperationOutput implements SerializableStruct { private final transient Integer intMemberHeader; private final transient String stringMember; - private final transient Publisher outputStream; + private final transient EventStream outputStream; private TestOperationOutput(Builder builder) { this.intMemberHeader = builder.intMemberHeader; @@ -45,7 +45,7 @@ public String getStringMember() { return stringMember; } - public Publisher getOutputStream() { + public EventStream getOutputStream() { return outputStream; } @@ -130,7 +130,7 @@ public static Builder builder() { public static final class Builder implements ShapeBuilder { private Integer intMemberHeader; private String stringMember; - private Publisher outputStream; + private EventStream outputStream; private Builder() {} @@ -158,7 +158,7 @@ public Builder stringMember(String stringMember) { /** * @return this builder. */ - public Builder outputStream(Publisher outputStream) { + public Builder outputStream(EventStream outputStream) { this.outputStream = outputStream; return this; } @@ -175,7 +175,7 @@ public void setMemberValue(Schema member, Object value) { case 0 -> intMemberHeader((Integer) SchemaUtils.validateSameMember($SCHEMA_INT_MEMBER_HEADER, member, value)); case 1 -> stringMember((String) SchemaUtils.validateSameMember($SCHEMA_STRING_MEMBER, member, value)); - case 2 -> outputStream((Publisher) SchemaUtils + case 2 -> outputStream((EventStream) SchemaUtils .validateSameMember($SCHEMA_OUTPUT_STREAM, member, value)); default -> ShapeBuilder.super.setMemberValue(member, value); } @@ -201,7 +201,7 @@ public void accept(Builder builder, Schema member, ShapeDeserializer de) { switch (member.memberIndex()) { case 0 -> builder.intMemberHeader(de.readInteger(member)); case 1 -> builder.stringMember(de.readString(member)); - case 2 -> builder.outputStream((Publisher) de.readEventStream(member)); + case 2 -> builder.outputStream((EventStream) de.readEventStream(member)); default -> throw new IllegalArgumentException("Unexpected member: " + member.memberName()); } } diff --git a/aws/client/aws-client-restjson/src/main/java/software/amazon/smithy/java/aws/client/restjson/RestJsonClientProtocol.java b/aws/client/aws-client-restjson/src/main/java/software/amazon/smithy/java/aws/client/restjson/RestJsonClientProtocol.java index f3123c398..18484625d 100644 --- a/aws/client/aws-client-restjson/src/main/java/software/amazon/smithy/java/aws/client/restjson/RestJsonClientProtocol.java +++ b/aws/client/aws-client-restjson/src/main/java/software/amazon/smithy/java/aws/client/restjson/RestJsonClientProtocol.java @@ -27,6 +27,7 @@ import software.amazon.smithy.java.core.serde.event.EventDecoderFactory; import software.amazon.smithy.java.core.serde.event.EventEncoderFactory; import software.amazon.smithy.java.core.serde.event.EventStreamingException; +import software.amazon.smithy.java.core.serde.event.FrameTransformer; import software.amazon.smithy.java.http.api.HttpRequest; import software.amazon.smithy.java.http.binding.RequestSerializer; import software.amazon.smithy.java.json.JsonCodec; @@ -114,6 +115,7 @@ protected EventEncoderFactory getEventEncoderFactory( inputOperation, payloadCodec(), payloadMediaType(), + FrameTransformer.identity(), (e) -> new EventStreamingException("InternalServerException", "Internal Server Error")); } diff --git a/aws/client/aws-client-restxml/src/main/java/software/amazon/smithy/java/aws/client/restxml/RestXmlClientProtocol.java b/aws/client/aws-client-restxml/src/main/java/software/amazon/smithy/java/aws/client/restxml/RestXmlClientProtocol.java index ca775ef98..f38e4d38e 100644 --- a/aws/client/aws-client-restxml/src/main/java/software/amazon/smithy/java/aws/client/restxml/RestXmlClientProtocol.java +++ b/aws/client/aws-client-restxml/src/main/java/software/amazon/smithy/java/aws/client/restxml/RestXmlClientProtocol.java @@ -29,6 +29,7 @@ import software.amazon.smithy.java.core.serde.event.EventDecoderFactory; import software.amazon.smithy.java.core.serde.event.EventEncoderFactory; import software.amazon.smithy.java.core.serde.event.EventStreamingException; +import software.amazon.smithy.java.core.serde.event.FrameTransformer; import software.amazon.smithy.java.http.api.HttpResponse; import software.amazon.smithy.java.xml.XmlCodec; import software.amazon.smithy.java.xml.XmlUtil; @@ -88,6 +89,7 @@ protected EventEncoderFactory getEventEncoderFactory( inputOperation, payloadCodec(), payloadMediaType(), + FrameTransformer.identity(), (e) -> new EventStreamingException("InternalServerException", "Internal Server Error")); } diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java index 391c427c2..4713a7087 100644 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java @@ -119,7 +119,7 @@ private java.net.http.HttpRequest createJavaRequest(Context context, HttpRequest bodyPublisher = BodyPublishers.ofByteArray(ByteBufferUtils.getBytes(request.body().asByteBuffer())); } } else { - bodyPublisher = BodyPublishers.fromPublisher(request.body()); + bodyPublisher = BodyPublishers.ofInputStream(() -> request.body().asInputStream()); } var httpRequestBuilder = java.net.http.HttpRequest.newBuilder() diff --git a/client/client-rpcv2-cbor/src/main/java/software/amazon/smithy/java/client/rpcv2/RpcV2CborProtocol.java b/client/client-rpcv2-cbor/src/main/java/software/amazon/smithy/java/client/rpcv2/RpcV2CborProtocol.java index 7b0e87940..5d19bfe21 100644 --- a/client/client-rpcv2-cbor/src/main/java/software/amazon/smithy/java/client/rpcv2/RpcV2CborProtocol.java +++ b/client/client-rpcv2-cbor/src/main/java/software/amazon/smithy/java/client/rpcv2/RpcV2CborProtocol.java @@ -33,6 +33,7 @@ import software.amazon.smithy.java.core.serde.event.EventDecoderFactory; import software.amazon.smithy.java.core.serde.event.EventEncoderFactory; import software.amazon.smithy.java.core.serde.event.EventStreamingException; +import software.amazon.smithy.java.core.serde.event.FrameTransformer; import software.amazon.smithy.java.http.api.HttpHeaders; import software.amazon.smithy.java.http.api.HttpRequest; import software.amazon.smithy.java.http.api.HttpResponse; @@ -161,6 +162,7 @@ private EventEncoderFactory getEventEncoderFactory( return AwsEventEncoderFactory.forInputStream(inputOperation, payloadCodec(), PAYLOAD_MEDIA_TYPE, + FrameTransformer.identity(), (e) -> new EventStreamingException("InternalServerException", "Internal Server Error")); } diff --git a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/JavaSymbolProvider.java b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/JavaSymbolProvider.java index 58cdf1aca..162b98445 100644 --- a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/JavaSymbolProvider.java +++ b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/JavaSymbolProvider.java @@ -15,12 +15,12 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Flow; import software.amazon.smithy.codegen.core.CodegenException; import software.amazon.smithy.codegen.core.Symbol; import software.amazon.smithy.codegen.core.SymbolProvider; import software.amazon.smithy.java.core.schema.Unit; import software.amazon.smithy.java.core.serde.document.Document; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.java.io.datastream.DataStream; import software.amazon.smithy.java.logging.InternalLogger; import software.amazon.smithy.model.Model; @@ -194,7 +194,7 @@ public Symbol memberShape(MemberShape memberShape) { + memberShape)); if (CodegenUtils.isEventStream(target)) { - return CodegenUtils.fromClass(Flow.Publisher.class) + return CodegenUtils.fromClass(EventStream.class) .toBuilder() .addReference(toSymbol(target)) .build(); diff --git a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/DeserializerGenerator.java b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/DeserializerGenerator.java index ddcdf0b32..e1f2d1248 100644 --- a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/DeserializerGenerator.java +++ b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/DeserializerGenerator.java @@ -5,11 +5,11 @@ package software.amazon.smithy.java.codegen.generators; -import java.util.concurrent.Flow; import software.amazon.smithy.codegen.core.Symbol; import software.amazon.smithy.codegen.core.SymbolProvider; import software.amazon.smithy.java.codegen.CodegenUtils; import software.amazon.smithy.java.codegen.writer.JavaWriter; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.model.Model; import software.amazon.smithy.model.shapes.BigDecimalShape; import software.amazon.smithy.model.shapes.BigIntegerShape; @@ -190,11 +190,11 @@ public Void structureShape(StructureShape structureShape) { @Override public Void unionShape(UnionShape unionShape) { if (unionShape.hasTrait(StreamingTrait.class)) { - Symbol flowPublisherType = CodegenUtils.fromClass(Flow.Publisher.class) + Symbol eventStreamType = CodegenUtils.fromClass(EventStream.class) .toBuilder() .addReference(symbolProvider.toSymbol(unionShape)) .build(); - writer.write("($T) ${deserializer:L}.readEventStream(${schemaName:L})", flowPublisherType); + writer.write("($T) ${deserializer:L}.readEventStream(${schemaName:L})", eventStreamType); } else { delegateDeser(); } diff --git a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/StructureDeserializerGenerator.java b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/StructureDeserializerGenerator.java index e5cbed6d4..416212a45 100644 --- a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/StructureDeserializerGenerator.java +++ b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/StructureDeserializerGenerator.java @@ -44,6 +44,7 @@ private static final class $$InnerDeserializer implements ${shapeDeserializer:T} private static final $$InnerDeserializer INSTANCE = new $$InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, ${sdkSchema:T} member, ${shapeDeserializer:T} de) {${?hasMembers} switch (member.memberIndex()) { ${cases:C|} diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/defaults/expected/DefaultStructure.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/defaults/expected/DefaultStructure.java index 0855c90da..74b01695a 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/defaults/expected/DefaultStructure.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/defaults/expected/DefaultStructure.java @@ -698,6 +698,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) { switch (member.memberIndex()) { case 0 -> builder.booleanMember(de.readBoolean(member)); diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/BuilderShape.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/BuilderShape.java index 107f11fa9..447f629e2 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/BuilderShape.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/BuilderShape.java @@ -107,6 +107,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) {} } } diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/List.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/List.java index 5544cbcbb..819fc5b2d 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/List.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/List.java @@ -107,6 +107,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) {} } } diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/Map.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/Map.java index a8c8defd7..54e538581 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/Map.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/Map.java @@ -107,6 +107,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) {} } } diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/NamingStruct.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/NamingStruct.java index 2beeb0c77..01e58cfc0 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/NamingStruct.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/NamingStruct.java @@ -343,6 +343,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) { switch (member.memberIndex()) { case 0 -> builder.other(de.readString(member)); diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/ObjectShape.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/ObjectShape.java index bf7c8ad95..c42a3fc33 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/ObjectShape.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/ObjectShape.java @@ -328,6 +328,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) { switch (member.memberIndex()) { case 0 -> builder.classMember(de.readString(member)); diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/Type.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/Type.java index 2d3600eff..3e9c8d391 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/Type.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/Type.java @@ -107,6 +107,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) {} } } diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/UnionWithTypeMember.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/UnionWithTypeMember.java index dd9b57067..2c0cdb534 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/UnionWithTypeMember.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/naming-conflict/expected/UnionWithTypeMember.java @@ -151,6 +151,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) { switch (member.memberIndex()) { case 0 -> builder.type(Type.builder().deserializeMember(de, member).build()); diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/primitive-types/expected/PrimitivesNotNullable.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/primitive-types/expected/PrimitivesNotNullable.java index cfb1dfa15..9c15eaf38 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/primitive-types/expected/PrimitivesNotNullable.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/primitive-types/expected/PrimitivesNotNullable.java @@ -317,6 +317,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) { switch (member.memberIndex()) { case 0 -> builder.byteMember(de.readByte(member)); diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/primitive-types/expected/PrimitivesNullable.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/primitive-types/expected/PrimitivesNullable.java index 88dad549d..6c611ed57 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/primitive-types/expected/PrimitivesNullable.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/primitive-types/expected/PrimitivesNullable.java @@ -285,6 +285,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) { switch (member.memberIndex()) { case 0 -> builder.byteMember(de.readByte(member)); diff --git a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/union-type/expected/UnionType.java b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/union-type/expected/UnionType.java index 8c9f6ef07..5e92bbc21 100644 --- a/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/union-type/expected/UnionType.java +++ b/codegen/plugins/types-codegen/src/test/resources/software/amazon/smithy/java/codegen/types/test-cases/union-type/expected/UnionType.java @@ -657,6 +657,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc private static final $InnerDeserializer INSTANCE = new $InnerDeserializer(); @Override + @SuppressWarnings("unchecked") public void accept(Builder builder, Schema member, ShapeDeserializer de) { switch (member.memberIndex()) { case 0 -> builder.blobValue(de.readBlob(member)); diff --git a/core/src/main/java/software/amazon/smithy/java/core/schema/ValidatorOfUniqueItems.java b/core/src/main/java/software/amazon/smithy/java/core/schema/ValidatorOfUniqueItems.java index aac12d7e6..42c23bf34 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/schema/ValidatorOfUniqueItems.java +++ b/core/src/main/java/software/amazon/smithy/java/core/schema/ValidatorOfUniqueItems.java @@ -14,12 +14,12 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.Flow; import java.util.function.BiConsumer; import software.amazon.smithy.java.core.serde.MapSerializer; import software.amazon.smithy.java.core.serde.ShapeSerializer; import software.amazon.smithy.java.core.serde.document.Document; import software.amazon.smithy.java.core.serde.document.DocumentParser; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.java.io.datastream.DataStream; /** @@ -135,7 +135,7 @@ public void writeDataStream(Schema schema, DataStream value) { } @Override - public void writeEventStream(Schema schema, Flow.Publisher value) { + public void writeEventStream(Schema schema, EventStream value) { addError("Event stream found in unique items"); } diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/BufferingFlatMapProcessor.java b/core/src/main/java/software/amazon/smithy/java/core/serde/BufferingFlatMapProcessor.java deleted file mode 100644 index 2394a84a3..000000000 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/BufferingFlatMapProcessor.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.core.serde; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Flow; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; -import software.amazon.smithy.java.logging.InternalLogger; - -/** - * A processor abstraction that maps inputs of type I from an upstream publisher to 0-n items of type O - * that are buffered without limit and published to a subscriber. This prevents the subscriber from receiving - * more items than requested when one I maps to multiple Os. - *

- * Note that this does not perform event publication on a different thread; both receipt of items and requests - * for more items can trigger publication of items on the calling thread. - * - * @param the type published by the upstream publisher. - * @param the type published to the downstream subscriber. - */ -public abstract class BufferingFlatMapProcessor implements - Flow.Processor, - Flow.Subscription { - private static final InternalLogger LOG = InternalLogger.getLogger(BufferingFlatMapProcessor.class); - private static final Throwable COMPLETE_SENTINEL = new RuntimeException(); - - private final AtomicReference terminalEventHolder = new AtomicReference<>(); - private final AtomicLong pendingRequests = new AtomicLong(); - private final AtomicInteger pendingFlushes = new AtomicInteger(); - private final BlockingQueue queue = new LinkedBlockingQueue<>(); - - private volatile Flow.Subscription upstreamSubscription; - private volatile Flow.Subscriber downstream; - private boolean terminated = false; - - public BufferingFlatMapProcessor( - Flow.Publisher publisher - ) { - publisher.subscribe(this); - } - - protected abstract Stream map(I item); - - @Override - public final void onSubscribe(Flow.Subscription subscription) { - upstreamSubscription = subscription; - if (pendingRequests.get() > 0 && pendingFlushes.get() == 0) { - flush(); - } - } - - @Override - public final void subscribe(Flow.Subscriber subscriber) { - downstream = subscriber; - subscriber.onSubscribe(this); - } - - @Override - public final void onNext(I item) { - try { - map(item).forEach(this::addToQueue); - } catch (Exception e) { - LOG.warn("Malformed input", e); - onError(new SerializationException("Malformed input", e)); - return; - } - flush(); - } - - /** - * Used to add the initial message to the queue. This message won't be sent until - * a flush happens, either by a calling {@link #request(long)} or {@link #onNext(Object)}. - *

- * This method will re-throw any exception caught when calling {@link #map} without calling - * {@link #onError(Throwable)} since it's assumed that the processor is not yet fully setup - * when this method is called. - */ - protected final void enqueueItem(I item) { - try { - map(item).forEach(this::addToQueue); - } catch (RuntimeException e) { - LOG.warn("Malformed input", e); - throw e; - } - } - - private void addToQueue(O item) { - queue.add(item); - } - - @Override - public final void onError(Throwable t) { - upstreamSubscription.cancel(); - terminalEventHolder.compareAndSet(null, t); - if (upstreamSubscription != null && downstream != null) { - flush(); - } - } - - @Override - public final void onComplete() { - terminalEventHolder.compareAndSet(null, COMPLETE_SENTINEL); - if (upstreamSubscription != null && downstream != null) { - flush(); - } - } - - @Override - public final void request(long n) { - if (n <= 0) { - onError(new IllegalArgumentException("got a request for " + n + " items")); - return; - } - - accumulate(pendingRequests, n); - flush(); - } - - private void flush() { - if (upstreamSubscription == null || downstream == null) { - LOG.warn("flush() requested before upstream and downstream fully wired, " + - "upstreamSubscription is null: {}, downstream is null: {}", - upstreamSubscription == null, - downstream == null); - onError(new IllegalStateException("flush() requested before upstream and downstream fully wired.")); - return; - } - - if (pendingFlushes.getAndIncrement() > 0) { - return; - } - - if (terminated) { - return; - } - - int loop = 1; - while (loop > 0) { - long pending = pendingRequests.get(); - - Flow.Subscriber subscriber = downstream; - long delivered = sendMessages(subscriber, pending); - boolean empty = queue.isEmpty(); - Throwable terminalEvent = terminalEventHolder.get(); - if (terminalEvent != null && attemptTermination(subscriber, terminalEvent, empty)) { - terminated = true; - return; - } - - if (delivered > 0) { - /* - * We still need to re-read at the start of the loop because additions to pendingRequest happen-before - * additions to pendingFlushes. If we reused this value in the next loop, there is a race condition: - * 1. Thread A is in `flush()`. - * 2. Thread B enters `request`. - * 3. Thread A decrements pendingRequests here. - * 4. Thread B increments pendingFlushes and returns because A is still flushing. - * 5. Thread A decrements pendingFlushes and noticed that something requested a flush. It loops around - * but _doesn't_ read the new value of pendingRequests, so it does nothing. - * - * In short, reload _all_ state on _every_Loop. You are only guaranteed to see updates to shared state - * after reading a value from pendingFlushes. - */ - accumulate(pendingRequests, -delivered); - - /* - * We need to accumulate our local `pending` value separately from the atomic `pendingRequests`. - * Consider this scenario with two buffers available for flush and one outstanding request: - * 1. Thread A is in `flush()`. It observes 1 pending request and flushes 1 buffer. - * 2. Thread B calls `request` and increments `pendingRequests` from 1 to 2. - * 3. Thread A enters this if statement. It delivered one message, subtracts 1 from `pendingRequests`, - * and stores the new sum of 1 in `pending`. - * 4. Thread A enters the next if statement and requests one buffer from the upstream subscription, - * even though it fulfilled the 1 request that was present in this loop and we have 1 more buffer - * that can be flushed. - * - * To avoid over-requesting buffers, we must only consider how much demand we successfully fulfilled - * verses how much we were willing to fulfill on this loop. To do this, we must only read a value from - * `pendingRequests` a single time, at the top of each loop. The rest of the loop must only work with - * the value read at the start. - */ - pending = accumulate(pending, -delivered); - } - - if (pending > 0) { - // do this inside the flush loop so a recursive flush -> request -> onNext -> flush - // call will be aborted by the `pendingFlushes` check. - upstreamSubscription.request(1); - } - - loop = pendingFlushes.addAndGet(-loop); - } - } - - protected void handleError(Throwable error, Flow.Subscriber subscriber) { - subscriber.onError(error); - } - - /** - * @return true if this decoder is in a terminal state - */ - private boolean attemptTermination(Flow.Subscriber subscriber, Throwable terminalEvent, boolean done) { - if (done && subscriber != null) { - if (terminalEvent == COMPLETE_SENTINEL) { - subscriber.onComplete(); - } else { - handleError(terminalEvent, subscriber); - } - return true; - } - - return false; - } - - /** - * Tries to flush up to the given demand and signals if we need data from - * upstream if there is unfulfilled demand. - * - * @param outstanding outstanding message demand to fulfill - * @return number of fulfilled requests - */ - private long sendMessages(Flow.Subscriber subscriber, long outstanding) { - long served = 0; - - if (subscriber != null) { - while (served < outstanding) { - O m = queue.poll(); - if (m == null) { - break; - } - served++; - subscriber.onNext(m); - } - } - - return served; - } - - @Override - public final void cancel() { - upstreamSubscription.cancel(); - } - - private static long accumulate(long current, long n) { - if (current == Long.MAX_VALUE || n == Long.MAX_VALUE) { - return Long.MAX_VALUE; - } - - try { - return Math.addExact(current, n); - } catch (ArithmeticException e) { - return Long.MAX_VALUE; - } - } - - private static void accumulate(AtomicLong l, long n) { - l.accumulateAndGet(n, BufferingFlatMapProcessor::accumulate); - } -} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/InterceptingSerializer.java b/core/src/main/java/software/amazon/smithy/java/core/serde/InterceptingSerializer.java index 1e96159a7..9cd87d48f 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/InterceptingSerializer.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/InterceptingSerializer.java @@ -9,11 +9,11 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.time.Instant; -import java.util.concurrent.Flow; import java.util.function.BiConsumer; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.serde.document.Document; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.java.io.datastream.DataStream; /** @@ -130,7 +130,7 @@ public void writeDataStream(Schema schema, DataStream value) { } @Override - public void writeEventStream(Schema schema, Flow.Publisher value) { + public void writeEventStream(Schema schema, EventStream value) { before(schema).writeEventStream(schema, value); after(schema); } diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/ShapeDeserializer.java b/core/src/main/java/software/amazon/smithy/java/core/serde/ShapeDeserializer.java index 8a854f8b7..d196ffc5c 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/ShapeDeserializer.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/ShapeDeserializer.java @@ -9,10 +9,10 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.time.Instant; -import java.util.concurrent.Flow; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.serde.document.Document; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.java.io.datastream.DataStream; /** @@ -240,7 +240,7 @@ default DataStream readDataStream(Schema schema) { * @param schema Schema of the event stream to read. * @return the event stream. */ - default Flow.Publisher readEventStream(Schema schema) { + default EventStream readEventStream(Schema schema) { throw new UnsupportedOperationException("Cannot read event stream from this deserializer"); } diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/ShapeSerializer.java b/core/src/main/java/software/amazon/smithy/java/core/serde/ShapeSerializer.java index fdd3a9f09..094719eea 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/ShapeSerializer.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/ShapeSerializer.java @@ -10,12 +10,12 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.time.Instant; -import java.util.concurrent.Flow; import java.util.function.BiConsumer; import software.amazon.smithy.java.core.schema.PreludeSchemas; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.serde.document.Document; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.java.io.datastream.DataStream; /** @@ -179,7 +179,7 @@ default void writeDataStream(Schema schema, DataStream value) { * @param schema Schema of the shape. * @param value Event Stream value. */ - default void writeEventStream(Schema schema, Flow.Publisher value) { + default void writeEventStream(Schema schema, EventStream value) { // by default, do nothing } diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/SpecificShapeDeserializer.java b/core/src/main/java/software/amazon/smithy/java/core/serde/SpecificShapeDeserializer.java index 828786d07..dddc3c2a1 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/SpecificShapeDeserializer.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/SpecificShapeDeserializer.java @@ -9,11 +9,11 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.time.Instant; -import java.util.concurrent.Flow; import software.amazon.smithy.java.core.schema.PreludeSchemas; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.serde.document.Document; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.java.io.datastream.DataStream; public abstract class SpecificShapeDeserializer implements ShapeDeserializer { @@ -115,7 +115,7 @@ public boolean isNull() { } @Override - public Flow.Publisher readEventStream(Schema schema) { + public EventStream readEventStream(Schema schema) { throw throwForInvalidState(schema); } diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/DefaultEventStreamReader.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/DefaultEventStreamReader.java new file mode 100644 index 000000000..314f4686d --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/DefaultEventStreamReader.java @@ -0,0 +1,274 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import software.amazon.smithy.java.core.schema.SerializableStruct; +import software.amazon.smithy.java.io.datastream.DataStream; +import software.amazon.smithy.java.logging.InternalLogger; + +/** + * Implementation of {@link InternalEventStreamReader} that reads from + * an InputStream and decodes frames into events of type T. This class extends + * {@link EventStreamReader} which is be the user facing aspect of it, and + * adds an {@link #readInitialEvent} method for protocol implementations to be + * able to read the initial event. If the protocol does not send an initial + * event, then the class must be constructed passing {@code expectInitialEvent} + * as false. When {@code expectInitialEvent} is true, the {@link #read()} method + * will throw {@link IllegalStateException} unless {@link #readInitialEvent()} + * has been previously called. + * + *

Thread Safety: This class is NOT thread-safe. Only one thread should call + * read methods at a time. + * + *

Frame Handling: The underlying FrameDecoder must be stateful and handle + * partial frames that span multiple reads from the InputStream. The decoder + * returns an empty list when it doesn't have enough bytes to construct a + * complete frame. + * + * @param the initial event type + * @param the event type + * @param the frame type + */ +final class DefaultEventStreamReader> + implements InternalEventStreamReader { + + private static final InternalLogger LOGGER = InternalLogger.getLogger(DefaultEventStreamReader.class); + private static final int DEFAULT_BUFFER_SIZE = 8192; + + private final byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; + // We need a queue to keep extra events since the decoder might return more + // than one event at the time. + private final Deque queue; + private final InputStream inputStream; + private final EventDecoder eventDecoder; + private final FrameDecoder frameDecoder; + private static final AtomicBoolean inputClosed = new AtomicBoolean(false); + private volatile State state; + + /** + * Creates a new DefaultEventStreamReader. + * + * @param dataStream the input stream to read from + * @param eventDecoderFactory factory for creating decoders + * @param expectInitialEvent true if the protocol expects an initial event + * @throws NullPointerException if any parameter is null + */ + public DefaultEventStreamReader( + DataStream dataStream, + EventDecoderFactory eventDecoderFactory, + boolean expectInitialEvent + ) { + this.inputStream = Objects.requireNonNull(dataStream, "dataStream").asInputStream(); + Objects.requireNonNull(eventDecoderFactory, "eventDecoderFactory"); + + this.state = (expectInitialEvent ? State.READING_INITIAL_EVENT : State.READING_EVENTS); + this.queue = new ArrayDeque<>(); + this.eventDecoder = eventDecoderFactory.newEventDecoder(); + this.frameDecoder = eventDecoderFactory.newFrameDecoder(); + } + + /** + * Reads the next event from the stream. This method blocks until an event is available + * or the stream ends. + * + * @return the next event, or null if end of stream or error occurred + * @throws IllegalStateException if the reader is closed + * @throws IllegalStateException if an initial event is expected but not yet read + * @throws RuntimeException if a previous read error occurred + * @throws UncheckedIOException if an I/O error occurs while reading + */ + @Override + public T read() { + switch (state) { + case READING_INITIAL_EVENT, CLOSED: + throw new IllegalStateException("Reader is not in reading events state, current: " + state); + case READING_EVENTS: + break; + case READING_COMPLETED: + return null; + } + + if (queue.isEmpty()) { + if (!enqueueEvent()) { + LOGGER.debug("No more events available, end of stream reached"); + return null; + } + } + + return queue.pollFirst(); + } + + /** + * Reads the initial response from the event stream. + * + *

Some protocols encode the initial response as the first event in the stream. + * This method reads and decodes that first event as an initial response of type IE. + * Any additional events read during this process are queued for subsequent {@link #read} calls. + * + * @return the initial response + * @throws IllegalStateException if the reader is closed + * @throws RuntimeException if end of stream is reached before getting initial response + * @throws UncheckedIOException if an I/O error occurs + * @throws ClassCastException if the first event cannot be cast to type IR + */ + @SuppressWarnings("unchecked") + public IE readInitialEvent() { + if (state != State.READING_INITIAL_EVENT) { + throw new IllegalStateException("Reader is not in reading events state, current: " + state); + } + + IE result = null; + + while (result == null) { + try { + int read = inputStream.read(buffer); + + if (read == -1) { + state = State.READING_COMPLETED; + throw new RuntimeException("Unexpected end of stream while reading initial event"); + } + + LOGGER.debug("Read {} bytes for initial event", read); + if (read == 0) { + continue; + } + + // Decode frames from the bytes read + // Note: FrameDecoder is stateful and handles partial frames + var frames = frameDecoder.decode(ByteBuffer.wrap(buffer, 0, read)); + + if (frames.isEmpty()) { + // Partial frame, need more data + LOGGER.debug("No complete frames decoded, reading more data"); + continue; + } + + // First frame is the initial response + result = (IE) eventDecoder.decodeInitialEvent(frames.getFirst(), this); + + // Queue any additional frames as regular events + for (var idx = 1; idx < frames.size(); idx++) { + T event = (T) eventDecoder.decode(frames.get(idx)); + queue.add(event); + } + } catch (IOException e) { + closeWithError(e); + throw new UncheckedIOException("I/O error reading initial event", e); + } catch (ClassCastException e) { + closeWithError(e); + throw new RuntimeException("Initial event type mismatch", e); + } + } + state = State.READING_EVENTS; + return result; + } + + @Override + public EventStreamReader asReader() { + return this; + } + + @Override + public EventStreamWriter asWriter() { + throw new UnsupportedOperationException("This reader cannot be used as a writer"); + } + + /** + * Closes the underlying input stream and releases resources. + * + * @throws UncheckedIOException if an I/O error occurs while closing + */ + @Override + public void close() { + if (inputClosed.compareAndSet(false, true)) { + try { + inputStream.close(); + } catch (IOException e) { + throw new UncheckedIOException("Error closing input stream", e); + } finally { + queue.clear(); + } + state = State.CLOSED; + } + } + + /** + * Closes the reader due to an error condition. + * + * @param e the error that caused the closure + * @throws NullPointerException if e is null + */ + @Override + public void closeWithError(Exception e) { + Objects.requireNonNull(e, "exception must not be null"); + LOGGER.error("Closing reader due to error", e); + close(); + } + + /** + * Reads from the input stream and enqueues decoded events. + * + * @return true if events were enqueued, false if end of stream + * @throws UncheckedIOException if an I/O error occurs + */ + @SuppressWarnings("unchecked") + private boolean enqueueEvent() { + if (state == State.READING_COMPLETED) { + return false; + } + + try { + // Read until we have at least one event in the queue + while (queue.isEmpty()) { + LOGGER.debug("Reading from input stream to enqueue events"); + + int read = inputStream.read(buffer); + + if (read == -1) { + LOGGER.debug("End of stream reached"); + state = State.READING_COMPLETED; + return false; + } + + LOGGER.debug("Read {} bytes from input stream", read); + if (read == 0) { + continue; + } + + // Decode frames from the bytes read + // FrameDecoder is stateful, accumulates bytes until complete frames available + var frames = frameDecoder.decode(ByteBuffer.wrap(buffer, 0, read)); + + // Decode each frame into an event and add to queue + for (F frame : frames) { + T event = (T) eventDecoder.decode(frame); + queue.add(event); + } + } + + return true; + + } catch (IOException e) { + closeWithError(e); + throw new UncheckedIOException("I/O error reading events", e); + } catch (ClassCastException e) { + closeWithError(e); + throw new RuntimeException("Event type mismatch during decode", e); + } + } + + enum State { + READING_INITIAL_EVENT, READING_EVENTS, READING_COMPLETED, CLOSED + } +} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/DefaultEventStreamWriter.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/DefaultEventStreamWriter.java new file mode 100644 index 000000000..7d5dad353 --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/DefaultEventStreamWriter.java @@ -0,0 +1,197 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import software.amazon.smithy.java.core.schema.SerializableStruct; +import software.amazon.smithy.java.io.datastream.DataStream; +import software.amazon.smithy.java.logging.InternalLogger; + +/** + * Default implementation of EventStreamWriter that encodes events and writes them + * to an internal EventPipeStream. This event bridges the user facing {@link EventStreamWriter} + * with the added functionality needed by the protocol implementation. + * + *

Thread Safety: This class is NOT thread-safe for concurrent writes. Only one + * thread should call write methods at a time. The underlying EventPipeStream is + * thread-safe for producer-consumer patterns. + * + * @param the initial event ype + * @param the event type + * @param tye frame type + */ +final class DefaultEventStreamWriter> + implements InternalEventStreamWriter { + private static final InternalLogger LOGGER = InternalLogger.getLogger(DefaultEventStreamWriter.class); + /** + * This latch is used to ensure that the protocol handler writes the initial event + * before any other event is written. Protocols that don't require the initial event still have + * to unlatch the writer by bootstrapping it with a null value. + */ + private final CountDownLatch readyLatch = new CountDownLatch(1); + /** + * Pipes bytes written by this writer to an input stream used + * to send them over the wire. + */ + private final EventPipeStream pipeStream; + private final AtomicBoolean closed = new AtomicBoolean(false); + private EventEncoder eventEncoder; + private FrameEncoder frameEncoder; + private volatile Throwable lastError; + + /** + * Creates a new DefaultEventStreamWriter. + */ + public DefaultEventStreamWriter() { + this.pipeStream = new EventPipeStream(); + } + + /** + * Writes an event to the stream. Blocks until the initial event has been written. + * + * @param event the event to write (must not be null) + * @throws NullPointerException if event or timeout is null + * @throws IllegalStateException if the stream is closed + */ + @Override + public void write(T event) { + Objects.requireNonNull(event, "event"); + checkState(); + + try { + LOGGER.debug("write event {}", event); + + // Wait for writer to be fully setup and the initial event to be written. + readyLatch.await(); + + doWrite(event); + } catch (InterruptedException e) { + lastError = e; + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while writing event", e); + } + } + + @Override + public void bootstrap(Bootstrap bootstrap) { + // Make sure that the protocol handler doesn't call bootstrap twice. + if (readyLatch.getCount() == 0) { + throw new IllegalStateException("bootstrap has been already called"); + } + setEventStreamEncodingFactory(bootstrap.encoder()); + writeInitialEvent(bootstrap.initialEvent()); + } + + /** + * Writes the initial event that must precede all other events. + */ + private void writeInitialEvent(SerializableStruct event) { + checkState(); + + LOGGER.debug("write initial event {}", event); + try { + // Some protocols, notably REST, encode their initial event in the + // http request. Callers pass a null value to allow the writing + // of regular events to start. + if (event != null) { + doWrite(event); + } + } finally { + // Always count down, even if write fails, to unblock waiting threads + readyLatch.countDown(); + } + } + + /** + * Sets the event encoder factory used to get the event and frame encoders used + * for encoding the events. + * + * @param factory the event encoder factory + */ + private void setEventStreamEncodingFactory(EventEncoderFactory factory) { + Objects.requireNonNull(factory, "eventEncoderFactory"); + this.eventEncoder = factory.newEventEncoder(); + this.frameEncoder = factory.newFrameEncoder(); + } + + /** + * Performs the actual encoding and writing of an event. + * + * @param event the event to write + * @throws RuntimeException if writing fails + */ + private void doWrite(SerializableStruct event) { + // Encode the event to a frame, then to bytes + var frame = eventEncoder.encode(event); + var encoded = frameEncoder.encode(frame); + + // Write to the stream (may block if queue is full) + pipeStream.write(encoded); + } + + /** + * Checks if the stream has been closed or if there has been a previous error. + * + * @throws IllegalStateException if closed + */ + private void checkState() { + if (lastError != null) { + throw new IllegalStateException("Producer failed", lastError); + } + if (closed.get()) { + throw new IllegalStateException("EventStreamWriter is closed"); + } + } + + @Override + public EventStreamReader asReader() { + throw new UnsupportedOperationException( + "This writer cannot be converted to a reader"); + } + + @Override + public EventStreamWriter asWriter() { + return this; + } + + /** + * Closes the stream with an error, signaling to the consumer that an error occurred. + * + * @param e the error that occurred (must not be null) + */ + @Override + public void closeWithError(Exception e) { + Objects.requireNonNull(e, "exception"); + if (closed.compareAndSet(false, true)) { + pipeStream.completeWithError(e); + lastError = e; + } + } + + /** + * Closes the stream normally, signaling to the consumer that no more events will be written. + */ + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + pipeStream.complete(); + } + } + + /** + * Returns the DataStream that is used to read the bytes from the + * encoded events written to this writer. + * + * @return the data stream to read encoded event from + */ + @Override + public DataStream toDataStream() { + return DataStream.ofInputStream(pipeStream); + } +} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventDecoder.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventDecoder.java index 3952e0b23..2936fa316 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventDecoder.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventDecoder.java @@ -5,7 +5,6 @@ package software.amazon.smithy.java.core.serde.event; -import java.util.concurrent.Flow; import software.amazon.smithy.java.core.schema.SerializableStruct; public interface EventDecoder> { @@ -13,12 +12,12 @@ public interface EventDecoder> { SerializableStruct decode(F frame); /** - * Called once after building the publisher to allow the decoder to do any one-time setup prior to start processing - * events. + * Decodes the frame into an initial event using the given even stream + * to set to the corresponding member. * - * @param publisher The events publisher. + * @param frame The frame to that contains the initial event + * @param stream the event stream value to be set in the member + * @return the initial event struct. */ - default void onPrepare(Flow.Publisher publisher) { - // does nothing by default. - } + SerializableStruct decodeInitialEvent(F frame, EventStream stream); } diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventDecoderFactory.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventDecoderFactory.java index a0219e64f..4d5768796 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventDecoderFactory.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventDecoderFactory.java @@ -10,5 +10,4 @@ public interface EventDecoderFactory> { EventDecoder newEventDecoder(); FrameDecoder newFrameDecoder(); - } diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventEncoderFactory.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventEncoderFactory.java index 8fbf3794f..99f6a1dd6 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventEncoderFactory.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventEncoderFactory.java @@ -5,6 +5,7 @@ package software.amazon.smithy.java.core.serde.event; +// TODO javadoc public interface EventEncoderFactory> { EventEncoder newEventEncoder(); diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventPipeStream.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventPipeStream.java new file mode 100644 index 000000000..46d7de09c --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventPipeStream.java @@ -0,0 +1,226 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import software.amazon.smithy.java.logging.InternalLogger; + +/** + * Bridges an event stream producer to an InputStream consumer. + * + *

Thread Safety: The {@link #write(ByteBuffer)} and {@link #complete()} methods + * should be called by one VT producer thread. The {@link #read()} methods must only + * be called from a single consumer thread. + * + *

Backpressure: There is no backpressure built in this class, {@code write()} will + * block until the consumer reads data, and similarly {@link #read()} will block until + * the producers writes data. + * + *

Termination: The writer must call {@link #complete()} to signal the + * consumer that the writing has finished. Failing to do so will block the reader + * indefinitely + */ +final class EventPipeStream extends InputStream { + private static final InternalLogger LOGGER = InternalLogger.getLogger(EventPipeStream.class); + /** + * Poison pill used to signal the end of the stream. + */ + private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0); + + /** + * A bounded queue to connect the thread making the event writes to the thread + * consuming them. + */ + private final BlockingQueue queue; + private volatile ByteBuffer current = null; + private volatile boolean completed = false; + private volatile Throwable lastError = null; + private volatile boolean closed = false; + + /** + * Creates a new EventInputStream with the default queue size of 64. + */ + public EventPipeStream() { + this.queue = new ArrayBlockingQueue<>(1); + } + + /** + * Writes a message to the stream. This method blocks if the internal queue is full. + * + * @param message the ByteBuffer to write (must not be null or empty) + * @throws NullPointerException if message is null + * @throws IllegalArgumentException if message has no remaining bytes + * @throws IllegalStateException if stream is already completed or closed + * @throws RuntimeException wrapping InterruptedException if interrupted while waiting + */ + public void write(ByteBuffer message) { + Objects.requireNonNull(message, "message"); + if (!message.hasRemaining()) { + throw new IllegalArgumentException("message must have remaining bytes"); + } + if (completed || closed) { + throw new IllegalStateException("Stream is already completed or closed"); + } + + LOGGER.debug("Writing event to stream with {} bytes", message.remaining()); + + try { + queue.put(message); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while writing", e); + } + } + + /** + * Signals that the producer has finished successfully. Subsequent reads will + * return -1 (EOF) once all queued data has been consumed. + * + *

This method is idempotent, calling it multiple times has no additional effect. + * + * @throws RuntimeException wrapping InterruptedException if interrupted + */ + public void complete() { + if (completed) { + return; + } + completed = true; + + try { + queue.put(POISON_PILL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while completing", e); + } + } + + /** + * Signals that the producer encountered an error. Subsequent reads will + * throw an IOException wrapping the provided error once all queued data + * has been consumed. + * + * @param error the error that occurred (must not be null) + * @throws NullPointerException if error is null + */ + public void completeWithError(Throwable error) { + Objects.requireNonNull(error, "error must not be null"); + if (completed) { + return; + } + + this.lastError = error; + complete(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + Objects.checkFromIndexSize(off, len, b.length); + + if (len == 0) { + return 0; + } + + LOGGER.debug("Read requested, polling"); + if (!ensureCurrent()) { + LOGGER.debug("Read requested, EOF reached"); + return -1; + } + + int available = Math.min(len, current.remaining()); + current.get(b, off, available); + LOGGER.debug("Read fulfilled, bytes {} (req: {})", available, (len - off)); + + if (!current.hasRemaining()) { + current = null; + } + + return available; + } + + @Override + public int read() throws IOException { + if (!ensureCurrent()) { + return -1; + } + LOGGER.debug("Read requested"); + byte b = current.get(); + + if (!current.hasRemaining()) { + current = null; + } + + return b & 0xFF; + } + + @Override + public int available() throws IOException { + checkError(); + + if (current != null && current != POISON_PILL) { + return current.remaining(); + } + + return 0; + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + closed = true; + + // Signal completion if not already done + if (!completed) { + complete(); + } + + // Drain the queue to unblock any waiting producers + queue.clear(); + } + + /** + * Ensures that {@code current} contains data to read, or returns false if EOF/error. + * + * @return true if data is available, false if EOF + * @throws IOException if an error was signaled via {@link #completeWithError(Throwable)} + */ + private boolean ensureCurrent() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + if (current == null) { + try { + current = queue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while reading", e); + } + } + + // Check for poison pill (EOF) + if (current == POISON_PILL) { + checkError(); // Throw if error was set + return false; + } + + return true; + } + + /** + * Checks if an error was signaled and throws it if present. + */ + private void checkError() throws IOException { + if (lastError != null) { + throw new IOException("Producer failed", lastError); + } + } +} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStream.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStream.java new file mode 100644 index 000000000..568c02336 --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStream.java @@ -0,0 +1,60 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import software.amazon.smithy.java.core.schema.SerializableStruct; + +/** + * An abstraction that represents an event stream. The event stream can be used for + * reading or writing events depending on the use case. + * + * @param The type of the event stream. + */ +public sealed interface EventStream extends AutoCloseable + permits EventStreamReader, EventStreamWriter { + /** + * Returns this event stream as a reader. + * + * @return this event stream as a reader. + * @throws IllegalStateException if the event stream is for writing. + */ + EventStreamReader asReader(); + + /** + * Returns this event stream as a writer. + * + * @return this event stream as a writer. + * @throws IllegalStateException if the event stream is for reading. + */ + EventStreamWriter asWriter(); + + /** + * Closes the event stream. + */ + @Override + void close(); + + /** + * Closes the event stream with the given error. + * + * @param e the cause to close the event stream. + */ + void closeWithError(Exception e); + + /** + * Creates a new writer to be used by the client to write events. This writer must + * be set to the event stream member of the structure and the user should keep a + * reference to it that can be used to write the events. The returned writer + * won't be fully setup until the containing structure is serialized by the protocol. + * Until then, attempts to use the write method will be blocked until ready. + * + * @param The type of the event. + * @return the new event writer. + */ + static EventStream newWriter() { + return new DefaultEventStreamWriter<>(); + } +} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamFrameDecodingProcessor.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamFrameDecodingProcessor.java deleted file mode 100644 index dce78c72b..000000000 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamFrameDecodingProcessor.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.core.serde.event; - -import java.nio.ByteBuffer; -import java.util.concurrent.Flow; -import java.util.stream.Stream; -import software.amazon.smithy.java.core.schema.SerializableStruct; -import software.amazon.smithy.java.core.serde.BufferingFlatMapProcessor; - -/** - * Processor to pipe raw byte arrays to frame encoders and then to event encoders. - * - * @param The frame type. - */ -public final class EventStreamFrameDecodingProcessor> - extends BufferingFlatMapProcessor { - private final FrameDecoder frameDecoder; - private final EventDecoder eventDecoder; - - EventStreamFrameDecodingProcessor( - Flow.Publisher publisher, - FrameDecoder frameDecoder, - EventDecoder eventDecoder - ) { - super(publisher); - this.frameDecoder = frameDecoder; - this.eventDecoder = eventDecoder; - } - - /** - * Creates a new processor with the given publisher and decoder factory. This method calls prepare to setup the - * decoders prior to start processing events. - * - * @param publisher The publisher generating the events - * @param eventDecoderFactory The decoder factory to decode the raw bytes - * @param The type of the frame - * @return A new processor - */ - public static > EventStreamFrameDecodingProcessor create( - Flow.Publisher publisher, - EventDecoderFactory eventDecoderFactory - ) { - var result = new EventStreamFrameDecodingProcessor<>( - publisher, - eventDecoderFactory.newFrameDecoder(), - eventDecoderFactory.newEventDecoder()); - result.prepare(); - return result; - } - - /** - * Called once after building the frame processor to allow decoders to do any one-time setup prior to start - * processing events. - */ - void prepare() { - frameDecoder.onPrepare(this); - eventDecoder.onPrepare(this); - } - - @Override - protected Stream map(ByteBuffer item) { - return frameDecoder.decode(item).stream().map(eventDecoder::decode); - } -} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamFrameEncodingProcessor.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamFrameEncodingProcessor.java deleted file mode 100644 index a764f87e2..000000000 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamFrameEncodingProcessor.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.core.serde.event; - -import java.nio.ByteBuffer; -import java.util.concurrent.Flow; -import java.util.stream.Stream; -import software.amazon.smithy.java.core.schema.SerializableStruct; -import software.amazon.smithy.java.core.serde.BufferingFlatMapProcessor; -import software.amazon.smithy.java.logging.InternalLogger; - -public final class EventStreamFrameEncodingProcessor, T extends SerializableStruct> - extends BufferingFlatMapProcessor { - private static final InternalLogger LOG = InternalLogger.getLogger(EventStreamFrameEncodingProcessor.class); - private final EventEncoder eventEncoder; - private final FrameEncoder encoder; - - private EventStreamFrameEncodingProcessor( - Flow.Publisher publisher, - EventEncoder eventEncoder, - FrameEncoder encoder - ) { - super(publisher); - this.eventEncoder = eventEncoder; - this.encoder = encoder; - } - - public static > EventStreamFrameEncodingProcessor create( - Flow.Publisher publisher, - EventEncoderFactory encoderFactory - ) { - var processor = new EventStreamFrameEncodingProcessor<>( - publisher, - encoderFactory.newEventEncoder(), - encoderFactory.newFrameEncoder()); - return processor; - } - - public static > EventStreamFrameEncodingProcessor create( - Flow.Publisher publisher, - EventEncoderFactory encoderFactory, - SerializableStruct firstItem - ) { - var processor = new EventStreamFrameEncodingProcessor<>( - publisher, - encoderFactory.newEventEncoder(), - encoderFactory.newFrameEncoder()); - processor.enqueueItem(firstItem); - return processor; - } - - @Override - protected Stream map(T item) { - return Stream.of(encoder.encode(eventEncoder.encode(item))); - } - - @Override - protected void handleError(Throwable error, Flow.Subscriber subscriber) { - subscriber.onNext(encoder.encode(eventEncoder.encodeFailure(error))); - LOG.warn("Unexpected error", error); - subscriber.onComplete(); - } -} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamReader.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamReader.java new file mode 100644 index 000000000..f8c9028b4 --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamReader.java @@ -0,0 +1,55 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import software.amazon.smithy.java.core.schema.SerializableStruct; + +/** + * Interface for reading events from an event stream. + * + * @param The type of the event stream. + */ +public sealed interface EventStreamReader + extends Iterable, AutoCloseable, EventStream permits InternalEventStreamReader { + + /** + * Reads an event from the event stream. + * + * @return the next event in the stream or null if the stream has reached its end. + */ + T read(); + + /** + * Returns a new iterator to iterate over the events in the stream. + * + * @return a new iterator to iterate over the events in the stream. + */ + @Override + default Iterator iterator() { + return new Iterator<>() { + private T next = null; + + @Override + public boolean hasNext() { + if (next != null) + return true; + next = read(); + return next != null; + } + + @Override + public T next() { + if (next == null) + throw new NoSuchElementException(); + T result = next; + next = null; + return result; + } + }; + } +} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamWriter.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamWriter.java new file mode 100644 index 000000000..a658c9836 --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/EventStreamWriter.java @@ -0,0 +1,24 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import software.amazon.smithy.java.core.schema.SerializableStruct; + +/** + * Interface for writing events to an event stream. + * + * @param The type of the event stream. + */ +public sealed interface EventStreamWriter extends AutoCloseable, EventStream + permits InternalEventStreamWriter { + + /** + * Writes the given event to the event stream. This method will block until the write is possible. + * + * @param event The event to write. + */ + void write(T event); +} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/FrameDecoder.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/FrameDecoder.java index 46a03807f..53d52bc24 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/event/FrameDecoder.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/FrameDecoder.java @@ -7,8 +7,6 @@ import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.Flow; -import software.amazon.smithy.java.core.schema.SerializableStruct; /** * Decodes frames from bytes. @@ -17,18 +15,9 @@ public interface FrameDecoder> { /** * Decode 0 or more frames from a buffer, holding on to excess data * that can be prepended to the next `decode` call. + * * @param buffer the buffer to attempt to read frames from * @return all the frames readable from this pass */ List decode(ByteBuffer buffer); - - /** - * Called once after building the publisher to allow the decoder to do any one-time setup prior to start processing - * events. - * - * @param publisher The events publisher. - */ - default void onPrepare(Flow.Publisher publisher) { - // does nothing by default. - } } diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/FrameTransformer.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/FrameTransformer.java index 9e1b09400..6cb1c15a9 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/event/FrameTransformer.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/FrameTransformer.java @@ -16,4 +16,15 @@ * @param the frame type */ @FunctionalInterface -public interface FrameTransformer> extends Function {} +public interface FrameTransformer> extends Function { + + /** + * An identity transformer that returns the same frame as given. + * + * @param the frame type + * @return an identity frame transformer + */ + static > FrameTransformer identity() { + return (F frame) -> frame; + } +} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/InternalEventStreamReader.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/InternalEventStreamReader.java new file mode 100644 index 000000000..686686c16 --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/InternalEventStreamReader.java @@ -0,0 +1,52 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import software.amazon.smithy.java.core.schema.SerializableStruct; +import software.amazon.smithy.java.io.datastream.DataStream; + +/** + * This class is used by the protocol deserializers to bind the user facing {@link EventStreamReader} + * to the downstream {@link DataStream} that reads the bytes over the wire. The deserializer is expected + * to create an instance of it using {@link #newReader(DataStream, EventDecoderFactory, boolean)} and + * set it to the event stream member of the class being deserialized. + * + * @param The type of the initial event + * @param the type of the event + */ +public sealed interface InternalEventStreamReader + extends EventStreamReader permits DefaultEventStreamReader { + /** + * Reads the initial response from the event stream. + * + *

Some protocols encode the initial event as the first event in the stream. + * This method reads and decodes that first event as an initial response of type IR. + * + * @return the initial event + */ + IE readInitialEvent(); + + /** + * Creates a new reader using the dataStream to read from, and the decoder factory to create + * decoders from bytes to frames and from frames to user facing events. + * + * @param dataStream the data stream to read from + * @param eventDecoderFactory the decoder factory to create the frame and event decoders. + * @param initialEventExpected true if the protocol encodes the initial event in the event stream, false if the protocol does not. + * @param The type of the initial event + * @param The type of the events + * @param The type of the frame + * @return The new reader + */ + static > InternalEventStreamReader< + IE, T> newReader( + DataStream dataStream, + EventDecoderFactory eventDecoderFactory, + boolean initialEventExpected + ) { + return new DefaultEventStreamReader<>(dataStream, eventDecoderFactory, initialEventExpected); + } +} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/event/InternalEventStreamWriter.java b/core/src/main/java/software/amazon/smithy/java/core/serde/event/InternalEventStreamWriter.java new file mode 100644 index 000000000..ac7ea3819 --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/event/InternalEventStreamWriter.java @@ -0,0 +1,87 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import software.amazon.smithy.java.core.schema.SerializableStruct; +import software.amazon.smithy.java.io.datastream.DataStream; + +/** + * This class is used by the protocol serializers to bind the user facing {@link EventStreamWriter} + * to the downstream {@link DataStream} that sends the bytes over the wire. The serializer + * can safely cast the {@link EventStreamWriter} member to this class using {@link #toInternal(EventStream)} + * to access the internal methods for bootstrapping by providing the encoder and the initial event if needed. + * + * @param The event type + * @param The initial event type + */ +public sealed interface InternalEventStreamWriter> extends EventStreamWriter permits DefaultEventStreamWriter { + /** + * Converts to the writer to a DataStream. This method will be called + * by the serializer to send the stream of bytes representing the + * events on the wire. + * + * @return The data stream from the writer. + */ + DataStream toDataStream(); + + /** + * Bootstraps the writer providing the encoding factory to convert events + * to frames and frames to bytes, and the initial event that must be sent + * as the first event of the stream if the protocol requires one. + * + *

This method will be called using a proper data by the protocol serializer. + * Writes will be blocked until this method is called. + * + * @param bootstrap The sink to write events to + */ + void bootstrap(Bootstrap bootstrap); + + /** + * Contains the protocol dependant encoder factory and the initial event needed to bootstrap the writer. + * Protocols that do not require an initial event to be sent as part of the stream must return null + * in the {@link #initialEvent()} method. + * + * @param the initial event type + * @param the frame type + */ + interface Bootstrap> { + /** + * Returns the event encoder factory to serialize events and encode to frames. + * + * @return the event encoder factory to serialize events and encode to frames. + */ + EventEncoderFactory encoder(); + + /** + * Returns the initial event of the event stream. This method must return + * {@code null} if the implementing protocol does not require the initial + * event to be encoded as an event. + * + * @return the initial event of the event stream. + */ + IE initialEvent(); + } + + /** + * Utility method to convert a {@link EventStreamWriter} to a {@link InternalEventStreamWriter}. + * + * @param the type of the event + * @param the type of the internal event + * @param the type of the frame + * @return The writer converted to an internal writer. + */ + @SuppressWarnings("unchecked") + static > InternalEventStreamWriter toInternal( + EventStream writer + ) { + if (!(writer instanceof InternalEventStreamWriter)) { + throw new IllegalArgumentException("writer must be an instance of InternalEventStreamWriter"); + } + return (InternalEventStreamWriter) writer; + } +} diff --git a/core/src/test/java/software/amazon/smithy/java/core/serde/event/DefaultEventStreamReaderTest.java b/core/src/test/java/software/amazon/smithy/java/core/serde/event/DefaultEventStreamReaderTest.java new file mode 100644 index 000000000..ebaad4104 --- /dev/null +++ b/core/src/test/java/software/amazon/smithy/java/core/serde/event/DefaultEventStreamReaderTest.java @@ -0,0 +1,68 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; +import software.amazon.smithy.java.io.datastream.DataStream; + +class DefaultEventStreamReaderTest { + + @Test + @SuppressWarnings("resource") + public void testDecode() throws Exception { + var writer = createWriter(); + var reader = createReader(writer.toDataStream()); + var result = new ArrayList(); + + // Act + var writeWorker = Thread.ofVirtual().start(() -> { + for (String msg : EventPipeStreamTest.sources()) { + var event = new TestMessage.TestEvent(msg); + writer.write(event); + } + writer.close(); + }); + var readWorker = Thread.ofVirtual().start(() -> { + reader.forEach(result::add); + }); + writeWorker.join(); + readWorker.join(); + + // Assert + var expected = List.of(EventPipeStreamTest.sources()); + var actual = result.stream().map(Object::toString).toList(); + assertEquals(expected.size(), actual.size()); + assertEquals(expected, actual); + } + + static DefaultEventStreamReader createReader( + DataStream stream + ) { + return new DefaultEventStreamReader<>(stream, new TestMessage.TestEventDecoderFactory(), false); + } + + static DefaultEventStreamWriter createWriter() { + var writer = new DefaultEventStreamWriter(); + writer.bootstrap(new InternalEventStreamWriter.Bootstrap<>() { + @Override + public EventEncoderFactory encoder() { + return new TestMessage.TestEventEncoderFactory(); + } + + @Override + public TestMessage.TestEvent initialEvent() { + return null; + } + }); + return writer; + } +} diff --git a/core/src/test/java/software/amazon/smithy/java/core/serde/event/EventPipeStreamTest.java b/core/src/test/java/software/amazon/smithy/java/core/serde/event/EventPipeStreamTest.java new file mode 100644 index 000000000..84cd480d3 --- /dev/null +++ b/core/src/test/java/software/amazon/smithy/java/core/serde/event/EventPipeStreamTest.java @@ -0,0 +1,86 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +class EventPipeStreamTest { + public static final String[] SOURCES = { + """ + O thou my lovely boy, who in thy pow'r + Dost hold time's fickle glass, his fickle hour, + Who hast by waning grown, and therein show'st, + Thy lover's with'ring, as thy sweet self grow'st, + If nature (sov'reign mistress over wrack) + As thou go'st onwards still will pluck thee back, + She keeps thee to this purpose, that her skill + May time disgrace, and wretched minute kill. + Yet fear her, O thou minion of her pleasure, + She may detain but not still keep her treasure! + Her audit (though delay'd) answer'd must be, + And her quietus is to render thee. + """, + """ + Es hielo abrasador, es fuego helado, + es herida que duele y no se siente, + es un soñado bien, un mal presente, + es un breve descanso muy cansado. + Es un descuido que nos da cuidado, + un cobarde, con nombre de valiente, + un andar solitario entre la gente, + un amar solamente ser amado. + Es una libertad encarcelada, + que dura hasta el postrero parasismo, + enfermedad que crece si es curada. + Este es el niño Amor, este es su abismo. + ¡Mirad cuál amistad tendrá con nada + el que en todo es contrario de sí mismo! + """, + """ + 太乙近天都, + 连山到海隅。 + 白云回望合, + 青霭入看无。 + 分野中峰变, + 阴晴众壑殊。 + 欲投人处宿, + 隔水问樵夫。 + """, + """ + من آن ملّای رومی‌ام که از نظمم شکر خیزد + """ + }; + + @ParameterizedTest + @MethodSource("sources") + void testEventInputStream(String source) throws IOException { + var eventInputStream = new EventPipeStream(); + Thread.ofVirtual().start(() -> { + for (var idx = 0; idx < source.length(); idx++) { + eventInputStream.write(ByteBuffer.wrap(charToUtf8Bytes(source.charAt(idx)))); + } + eventInputStream.complete(); + }); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + eventInputStream.transferTo(out); + assertEquals(source, out.toString()); + } + + static String[] sources() { + return SOURCES; + } + + static byte[] charToUtf8Bytes(char c) { + return Character.toString(c).getBytes(StandardCharsets.UTF_8); + } +} diff --git a/core/src/test/java/software/amazon/smithy/java/core/serde/event/TestMessage.java b/core/src/test/java/software/amazon/smithy/java/core/serde/event/TestMessage.java new file mode 100644 index 000000000..4dc2aad73 --- /dev/null +++ b/core/src/test/java/software/amazon/smithy/java/core/serde/event/TestMessage.java @@ -0,0 +1,164 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.event; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import software.amazon.smithy.java.core.schema.Schema; +import software.amazon.smithy.java.core.schema.SerializableStruct; +import software.amazon.smithy.java.core.serde.ShapeSerializer; + +/** + * Simple test frame that wraps raw bytes with a 4-byte length prefix. + */ +public final class TestMessage { + + static class TestFrame implements Frame { + private final byte[] data; + + public TestFrame(byte[] data) { + this.data = data; + } + + @Override + public byte[] unwrap() { + return data; + } + } + + static class TestFrameEncoder implements FrameEncoder { + @Override + public ByteBuffer encode(TestFrame frame) { + byte[] payload = frame.unwrap(); + ByteBuffer buffer = ByteBuffer.allocate(4 + payload.length); + buffer.putInt(payload.length); + buffer.put(payload); + buffer.flip(); + return buffer; + } + } + + static class TestFrameDecoder implements FrameDecoder { + private ByteBuffer pending = ByteBuffer.allocate(0); + + @Override + public List decode(ByteBuffer buffer) { + List frames = new ArrayList<>(); + ByteBuffer combined = ByteBuffer.allocate(pending.remaining() + buffer.remaining()); + combined.put(pending); + combined.put(buffer); + combined.flip(); + + while (combined.remaining() >= 4) { + int length = combined.getInt(combined.position()); + if (combined.remaining() < 4 + length) { + break; + } + combined.getInt(); // consume length + byte[] payload = new byte[length]; + combined.get(payload); + frames.add(new TestFrame(payload)); + } + + pending = ByteBuffer.allocate(combined.remaining()); + pending.put(combined); + pending.flip(); + return frames; + } + } + + static class TestEventDecoder implements EventDecoder { + + @Override + public SerializableStruct decode(TestFrame frame) { + var bytes = frame.unwrap(); + var value = new String(bytes, StandardCharsets.UTF_8); + return new TestEvent(value); + } + + @Override + public SerializableStruct decodeInitialEvent(TestFrame frame, EventStream stream) { + throw new UnsupportedOperationException(); + } + } + + static class TestEventEncoder implements EventEncoder { + + @Override + public TestFrame encode(SerializableStruct item) { + var testStruct = (TestEvent) item; + var bytes = testStruct.value.getBytes(StandardCharsets.UTF_8); + return new TestFrame(bytes); + } + + @Override + public TestFrame encodeFailure(Throwable exception) { + return null; + } + } + + static class TestEvent implements SerializableStruct { + + private final String value; + + public TestEvent(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + @Override + public Schema schema() { + throw new UnsupportedOperationException(); + } + + @Override + public void serializeMembers(ShapeSerializer serializer) { + throw new UnsupportedOperationException(); + } + + @Override + public T getMemberValue(Schema member) { + throw new UnsupportedOperationException(); + } + } + + static class TestEventDecoderFactory implements EventDecoderFactory { + + @Override + public EventDecoder newEventDecoder() { + return new TestEventDecoder(); + } + + @Override + public FrameDecoder newFrameDecoder() { + return new TestFrameDecoder(); + } + } + + static class TestEventEncoderFactory implements EventEncoderFactory { + + @Override + public EventEncoder newEventEncoder() { + return new TestEventEncoder(); + } + + @Override + public FrameEncoder newFrameEncoder() { + return new TestFrameEncoder(); + } + + @Override + public String contentType() { + return "text/plain"; + } + } +} diff --git a/examples/event-streaming-client/build.gradle.kts b/examples/event-streaming-client/build.gradle.kts index 406ff3b43..86b16113b 100644 --- a/examples/event-streaming-client/build.gradle.kts +++ b/examples/event-streaming-client/build.gradle.kts @@ -10,6 +10,8 @@ dependencies { smithyBuild("software.amazon.smithy.java:plugins:$smithyJavaVersion") implementation("software.amazon.smithy.java:aws-client-restjson:$smithyJavaVersion") implementation("software.amazon.smithy.java:client-core:$smithyJavaVersion") + implementation("software.amazon.smithy.java:client-rpcv2-cbor:${smithyJavaVersion}") + implementation(project(":framework-errors")) // Test dependencies testImplementation("org.junit.jupiter:junit-jupiter:6.0.2") diff --git a/examples/event-streaming-client/src/it/java/software/amazon/smithy/java/example/eventstreaming/EventStreamTest.java b/examples/event-streaming-client/src/it/java/software/amazon/smithy/java/example/eventstreaming/EventStreamTest.java index 61b23bcaa..659de791b 100644 --- a/examples/event-streaming-client/src/it/java/software/amazon/smithy/java/example/eventstreaming/EventStreamTest.java +++ b/examples/event-streaming-client/src/it/java/software/amazon/smithy/java/example/eventstreaming/EventStreamTest.java @@ -3,150 +3,106 @@ * SPDX-License-Identifier: Apache-2.0 */ -package softare.amazon.smithy.java.example.eventstreaming; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +package software.amazon.smithy.java.example.eventstreaming; import java.util.HashSet; +import java.util.List; import java.util.Set; -import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.smithy.java.client.core.ProtocolSettings; import software.amazon.smithy.java.client.core.endpoint.EndpointResolver; +import software.amazon.smithy.java.client.rpcv2.RpcV2CborProtocol; +import software.amazon.smithy.java.core.serde.event.EventStream; +import software.amazon.smithy.java.core.serde.event.EventStreamWriter; import software.amazon.smithy.java.example.eventstreaming.client.FizzBuzzServiceClient; -import software.amazon.smithy.java.example.eventstreaming.model.BuzzEvent; import software.amazon.smithy.java.example.eventstreaming.model.FizzBuzzInput; import software.amazon.smithy.java.example.eventstreaming.model.FizzBuzzOutput; import software.amazon.smithy.java.example.eventstreaming.model.FizzBuzzStream; -import software.amazon.smithy.java.example.eventstreaming.model.FizzEvent; import software.amazon.smithy.java.example.eventstreaming.model.Value; import software.amazon.smithy.java.example.eventstreaming.model.ValueStream; +import software.amazon.smithy.java.logging.InternalLogger; +import software.amazon.smithy.model.shapes.ShapeId; +import software.amazon.smithy.protocol.traits.Rpcv2CborTrait; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; // TODO: Update the test to create and run the server in setup before the test @Disabled("This test requires manually running a server locally and then verifies client behavior against it.") public class EventStreamTest { - @Test - public void fizzBuzz() throws InterruptedException { - var client = FizzBuzzServiceClient.builder() - .endpointResolver(EndpointResolver.staticHost("http://localhost:8080")) - .build(); - + private static final InternalLogger LOGGER = InternalLogger.getLogger(EventStreamTest.class); + @ParameterizedTest + @MethodSource("clients") + public void fizzBuzz(FizzBuzzServiceClient client) { int range = 100; - FizzBuzzInput input = FizzBuzzInput.builder() - .stream(new ValueStreamPublisher(range)) + .stream(EventStream.newWriter()) .build(); + Thread.ofVirtual().start(() -> { + try (EventStreamWriter writer = input.getStream().asWriter()) { + int count = 0; + while (count++ < range) { + ValueStream value = ValueStream.builder() + .value(Value.builder().value(count).build()) + .build(); + writer.write(value); + } + } + }); + LOGGER.info("sending request"); FizzBuzzOutput output = client.fizzBuzz(input); - - System.out.println("Initial messages done"); + LOGGER.info("Initial messages done"); AtomicLong receivedEvents = new AtomicLong(); Set unbuzzed = new HashSet<>(); - AtomicBoolean done = new AtomicBoolean(); - output.getStream().subscribe(new Flow.Subscriber<>() { - - private Flow.Subscription subscription; - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(1L); - } - - @Override - public void onNext(FizzBuzzStream item) { - receivedEvents.incrementAndGet(); - long value; - switch (item) { - case FizzBuzzStream.FizzMember(var fizz): - value = fizz.getValue(); - System.out.println("received fizz: " + value); - assertEquals(0, value % 3); - if (value % 5 == 0) { - assertTrue(unbuzzed.add(value), "Fizz already received for " + value); - } - break; - case FizzBuzzStream.BuzzMember(var buzz): - value = buzz.getValue(); - System.out.println("received buzz: " + value); - assertEquals(0, value % 5); - if (value % 3 == 0) { - assertTrue(unbuzzed.remove(value), "No fizz for " + value); - } - break; - default: - fail("Unexpected event: " + item.getClass()); - break; - } - subscription.request(1L); - } - - @Override - public void onError(Throwable throwable) { } - - @Override - public void onComplete() { - done.set(true); - System.out.println("output stream completed"); + output.getStream().asReader().forEach(item -> { + receivedEvents.incrementAndGet(); + long value; + switch (item) { + case FizzBuzzStream.FizzMember(var fizz): + value = fizz.getValue(); + LOGGER.info("received fizz: {}", value); + assertEquals(0, value % 3); + if (value % 5 == 0) { + assertTrue(unbuzzed.add(value), "Fizz already received for " + value); + } + break; + case FizzBuzzStream.BuzzMember(var buzz): + value = buzz.getValue(); + LOGGER.info("received buzz: {}", value); + assertEquals(0, value % 5); + if (value % 3 == 0) { + assertTrue(unbuzzed.remove(value), "No fizz for " + value); + } + break; + default: + fail("Unexpected event: " + item.getClass()); + break; } }); - - // wait to receive events in the response stream - var waits = 10; - do { - Thread.sleep(100); - if (--waits <= 0) { - throw new RuntimeException("Timed out waiting for completion"); - } - } while (!done.get()); - assertTrue(unbuzzed.isEmpty(), unbuzzed.size() + " unbuzzed fizzes"); assertEquals((range / 3) + (range / 5), receivedEvents.get()); } - private static class ValueStreamPublisher implements Flow.Publisher { - private final int range; - - public ValueStreamPublisher(int range) { - this.range = range; - } - - @Override - public void subscribe(Flow.Subscriber subscriber) { - subscriber.onSubscribe(new Flow.Subscription() { - - int count = 0; - - @Override - public void request(long n) { - // sleeping between sending request events so there's chance to process response events in parallel - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (count++ < range) { - ValueStream value = ValueStream.builder() - .value(Value.builder().value(count).build()) - .build(); - System.out.println("sent: " + value); - subscriber.onNext(value); - } else { - subscriber.onComplete(); - } - } - - @Override - public void cancel() { - subscriber.onComplete(); - } - }); - } + public static List clients() { + return List.of( + FizzBuzzServiceClient.builder() + .endpointResolver(EndpointResolver.staticHost("http://localhost:8080")) + .build(), + FizzBuzzServiceClient.builder() + .protocol(new RpcV2CborProtocol.Factory() + .createProtocol(ProtocolSettings.builder() + .service(ShapeId.from("smithy.example#TickService")) + .build(), Rpcv2CborTrait.builder().build())) + .endpointResolver(EndpointResolver.staticHost("http://localhost:8000")) + .build() + + ); } } diff --git a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/HttpBindingDeserializer.java b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/HttpBindingDeserializer.java index 8364d76d6..3faaa5f75 100644 --- a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/HttpBindingDeserializer.java +++ b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/HttpBindingDeserializer.java @@ -9,7 +9,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.Flow; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.schema.TraitKey; @@ -18,7 +17,8 @@ import software.amazon.smithy.java.core.serde.ShapeDeserializer; import software.amazon.smithy.java.core.serde.SpecificShapeDeserializer; import software.amazon.smithy.java.core.serde.event.EventDecoderFactory; -import software.amazon.smithy.java.core.serde.event.EventStreamFrameDecodingProcessor; +import software.amazon.smithy.java.core.serde.event.EventStreamReader; +import software.amazon.smithy.java.core.serde.event.InternalEventStreamReader; import software.amazon.smithy.java.http.api.HttpHeaders; import software.amazon.smithy.java.io.datastream.DataStream; import software.amazon.smithy.java.io.uri.QueryStringParser; @@ -114,8 +114,8 @@ public void readStruct(Schema schema, T state, StructMemberConsumer struc if (isEventStream(member)) { structMemberConsumer.accept(state, member, new SpecificShapeDeserializer() { @Override - public Flow.Publisher readEventStream(Schema schema) { - return EventStreamFrameDecodingProcessor.create(body, eventDecoderFactory); + public EventStreamReader readEventStream(Schema schema) { + return InternalEventStreamReader.newReader(body, eventDecoderFactory, false); } }); } else if (member.hasTrait(TraitKey.STREAMING_TRAIT)) { @@ -179,11 +179,11 @@ private void validateMediaType() { * starts with the expected media type and is either an exact match or is followed by a semicolon * (parameter separator) or whitespace. * - * @param actual the actual media type received (e.g., from a Content-Type header), or null + * @param actual the actual media type received (e.g., from a Content-Type header), or null * @param expected the expected media type, or null to skip validation * @return 1 if the media types match or no validation is needed because `expected` is null, - * 0 if actual is null but expected is not (missing Content-Type), - * -1 if the media types do not match + * 0 if actual is null but expected is not (missing Content-Type), + * -1 if the media types do not match */ static int compareMediaType(String actual, String expected) { if (expected == null) { diff --git a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/HttpBindingSerializer.java b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/HttpBindingSerializer.java index 20ec67365..4f3201e37 100644 --- a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/HttpBindingSerializer.java +++ b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/HttpBindingSerializer.java @@ -14,7 +14,6 @@ import java.util.Set; import java.util.StringJoiner; import java.util.TreeMap; -import java.util.concurrent.Flow; import java.util.function.BiConsumer; import software.amazon.smithy.java.core.error.ModeledException; import software.amazon.smithy.java.core.schema.Schema; @@ -26,6 +25,7 @@ import software.amazon.smithy.java.core.serde.SerializationException; import software.amazon.smithy.java.core.serde.ShapeSerializer; import software.amazon.smithy.java.core.serde.SpecificShapeSerializer; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.java.http.api.HttpHeaders; import software.amazon.smithy.java.io.datastream.DataStream; import software.amazon.smithy.java.io.uri.QueryStringBuilder; @@ -64,7 +64,7 @@ final class HttpBindingSerializer extends SpecificShapeSerializer implements Sha private ShapeSerializer shapeBodySerializer; private ByteArrayOutputStream shapeBodyOutput; private DataStream httpPayload; - private Flow.Publisher eventStream; + private EventStream eventStream; private int responseStatus; private boolean contentTypeHeaderInInput; @@ -225,11 +225,11 @@ int getResponseStatus() { return responseStatus; } - public Flow.Publisher getEventStream() { + public EventStream getEventStream() { return eventStream; } - void setEventStream(Flow.Publisher stream) { + void setEventStream(EventStream stream) { this.eventStream = stream; } diff --git a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/PayloadSerializer.java b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/PayloadSerializer.java index 8fc9c7f91..140d97a63 100644 --- a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/PayloadSerializer.java +++ b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/PayloadSerializer.java @@ -12,7 +12,6 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Instant; -import java.util.concurrent.Flow; import java.util.function.BiConsumer; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableStruct; @@ -23,6 +22,7 @@ import software.amazon.smithy.java.core.serde.ShapeSerializer; import software.amazon.smithy.java.core.serde.TimestampFormatter; import software.amazon.smithy.java.core.serde.document.Document; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.java.io.datastream.DataStream; final class PayloadSerializer implements ShapeSerializer { @@ -49,10 +49,10 @@ public void writeDataStream(Schema schema, DataStream value) { @Override public void writeEventStream( Schema schema, - Flow.Publisher value + EventStream value ) { payloadWritten = true; - serializer.setEventStream(value); + serializer.setEventStream(value.asWriter()); } private void write(byte[] bytes) { diff --git a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/RequestSerializer.java b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/RequestSerializer.java index c378a46c0..42847bfee 100644 --- a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/RequestSerializer.java +++ b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/RequestSerializer.java @@ -8,7 +8,6 @@ import java.net.URI; import java.util.Objects; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Flow; import software.amazon.smithy.java.core.schema.ApiOperation; import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; import software.amazon.smithy.java.core.schema.Schema; @@ -17,8 +16,8 @@ import software.amazon.smithy.java.core.schema.TraitKey; import software.amazon.smithy.java.core.serde.Codec; import software.amazon.smithy.java.core.serde.event.EventEncoderFactory; -import software.amazon.smithy.java.core.serde.event.EventStreamFrameEncodingProcessor; import software.amazon.smithy.java.core.serde.event.Frame; +import software.amazon.smithy.java.core.serde.event.InternalEventStreamWriter; import software.amazon.smithy.java.http.api.HttpRequest; import software.amazon.smithy.java.io.uri.URIBuilder; @@ -130,6 +129,7 @@ public RequestSerializer allowEmptyStructPayload(boolean allowEmptyStructPayload * * @return Returns the created request. */ + @SuppressWarnings("unchecked") public HttpRequest serializeRequest() { Objects.requireNonNull(shapeValue, "shapeValue is not set"); Objects.requireNonNull(operation, "operation is not set"); @@ -165,9 +165,24 @@ public HttpRequest serializeRequest() { .method(httpTrait.getMethod()) .uri(targetEndpoint); - var eventStream = (Flow.Publisher) serializer.getEventStream(); + var eventStream = serializer.getEventStream(); if (eventStream != null && operation instanceof InputEventStreamingApiOperation) { - builder.body(EventStreamFrameEncodingProcessor.create(eventStream, eventStreamEncodingFactory)); + InternalEventStreamWriter> writer = + InternalEventStreamWriter.toInternal(eventStream); + + writer.bootstrap(new InternalEventStreamWriter.Bootstrap<>() { + @Override + @SuppressWarnings("unchecked") + public EventEncoderFactory> encoder() { + return (EventEncoderFactory) eventStreamEncodingFactory; + } + + @Override + public SerializableStruct initialEvent() { + return null; + } + }); + builder.body(writer.toDataStream()); serializer.setContentType(eventStreamEncodingFactory.contentType()); } else if (serializer.hasBody()) { builder.body(serializer.getBody()); @@ -175,4 +190,8 @@ public HttpRequest serializeRequest() { return builder.headers(serializer.getHeaders()).build(); } + + private static T doCast(Class xclass, Object value) { + return xclass.cast(value); + } } diff --git a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseSerializer.java b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseSerializer.java index 36ee35f80..12a0f50d7 100644 --- a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseSerializer.java +++ b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseSerializer.java @@ -7,7 +7,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Flow; import software.amazon.smithy.java.core.schema.ApiOperation; import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; import software.amazon.smithy.java.core.schema.Schema; @@ -16,7 +15,8 @@ import software.amazon.smithy.java.core.schema.TraitKey; import software.amazon.smithy.java.core.serde.Codec; import software.amazon.smithy.java.core.serde.event.EventEncoderFactory; -import software.amazon.smithy.java.core.serde.event.EventStreamFrameEncodingProcessor; +import software.amazon.smithy.java.core.serde.event.Frame; +import software.amazon.smithy.java.core.serde.event.InternalEventStreamWriter; import software.amazon.smithy.java.http.api.HttpResponse; /** @@ -28,7 +28,7 @@ public final class ResponseSerializer { private String payloadMediaType; private ApiOperation operation; private SerializableShape shapeValue; - private EventEncoderFactory eventEncoderFactory; + private EventEncoderFactory> eventEncoderFactory; private Schema errorSchema; private boolean omitEmptyPayload = false; private final ConcurrentMap bindingCache; @@ -88,7 +88,7 @@ public ResponseSerializer shapeValue(SerializableShape shapeValue) { * @return Returns the serializer. */ public ResponseSerializer eventEncoderFactory( - EventEncoderFactory encoderFactory + EventEncoderFactory> encoderFactory ) { this.eventEncoderFactory = encoderFactory; return this; @@ -151,9 +151,22 @@ public HttpResponse serializeResponse() { var builder = HttpResponse.builder() .statusCode(serializer.getResponseStatus()); - var eventStream = (Flow.Publisher) serializer.getEventStream(); + var eventStream = serializer.getEventStream(); if (eventStream != null && operation instanceof OutputEventStreamingApiOperation) { - builder.body(EventStreamFrameEncodingProcessor.create(eventStream, eventEncoderFactory)); + InternalEventStreamWriter> writer = + InternalEventStreamWriter.toInternal(eventStream); + writer.bootstrap(new InternalEventStreamWriter.Bootstrap<>() { + @Override + public EventEncoderFactory> encoder() { + return eventEncoderFactory; + } + + @Override + public SerializableStruct initialEvent() { + return null; + } + }); + builder.body(writer.toDataStream()); serializer.setContentType(eventEncoderFactory.contentType()); } else if (serializer.hasBody()) { builder.body(serializer.getBody());