Skip to content

Implement new event streams API#1035

Open
sugmanue wants to merge 4 commits intosmithy-lang:mainfrom
sugmanue:sugmanue/event-stream-overhaul
Open

Implement new event streams API#1035
sugmanue wants to merge 4 commits intosmithy-lang:mainfrom
sugmanue:sugmanue/event-stream-overhaul

Conversation

@sugmanue
Copy link
Contributor

Issue #, if available:

Description of changes:

Implements a new event streams API that's VT friendly by blocking when the caller reads and writes events.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Comment on lines 56 to 58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • can we collapse these into a single int state field so we don't need to do three volatile reads on each read() call?
  • under our new threading model, do these even need to be volatile? it's expected that only one VT will interact with the reading edge of the event stream, and the VT subsystem itself must provide visibility guarantees for past actions that occurred if the VT previously ran on a different platform thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will collapse these into a single field, as for the volatility the user will get a copy of this object, I don't think we can assume that they will use a VT even if we recommend it, so I feel this way is safer, let me know if you think otherwise or if I'm missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCount incurs a volatile read of readyLatch's backing state, please only call this if debug logs are enabled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we type this queue around ByteBuffers instead? If we use ByteBuffer as a (byte[], offset, length) triple, we can allocate larger buffers less frequently in the writer, reducing our overall allocation pressure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can, I will change it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Duplicate of ByteBufferUtils#getBytes
  • We already have a ByteBuffer, can we just push it down into pipeStream directly? Why unwrap to byte[]s here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me change the code to just pass ByteBuffer.

Comment on lines 209 to 210
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonatomic get-and-set of a volatile variable is a code smell. either use a compare-and-set primitive via AtomicIntegerFieldUpdater or AtomicBoolean, or make the field non-volatile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I will fix it.

Comment on lines +35 to +36
* before any other event is written. Protocols that don't require the initial event still have
* to unlatch the writer by bootstrapping it with a null value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike this. We shouldn't be exposing complexities of a subset of protocols across all protocols.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike it too 😆. Do you have a suggestion to avoid it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR, the user has to have a reference to the writer before the writer is bootstrapped using the protocol specifics, which include whether to send the initial event in the stream.

readyLatch.getCount());

// Wait for writer to be fully setup and the initial event to be written.
readyLatch.await();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should only be possible if we expect a different thread to call bootstrap in parallel with a thread attempting to call write, but in our new VT threading model we should expect a single writer thread to drive all interactions. additionally, we should trivially be able to enforce that the first write call contains the initial event because our runtime already doesn't expose this object to users until after bootstrap is called.

was this added because the current runtime behavior requires it or because you anticipated it would be necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An instance if this class can created by the user and set in the request before it's fully bootstrapped, so yes, it can start writing in its own thread before it's ready.

public void bootstrap(Bootstrap<IE, F> bootstrap) {
if (readyLatch.getCount() == 0) {
throw new IllegalStateException("bootstrap has been already called");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we truly expect parallelism, this is insufficient. You'd need an atomic mutation to guarantee readyLatch's state doesn't change between your state check, your future mutations of the latch, and other threads which may be executing the same check-and-mutate code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is there just to catch an bugs from the calling code to call it twice. We don't expect any concurrent callers for this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this small a limit, we might as well make an ArrayBlockingQueue. better spatial locality and avoids per-node allocations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will change it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments