Conversation
|
I see you updated files related to
|
|
✅ No conflicts with other open PRs targeting |
| } | ||
|
|
||
| // Add appends an event to the buffer. Called by OCRQueue.Put. | ||
| func (b *ObservationBuffer) Add(event EnqueuedTriggerEvent) { |
There was a problem hiding this comment.
need to persist events here possibly into the 3.1 KV store
| } | ||
| for _, ev := range events { | ||
| enqueued := v2.NewEnqueuedTriggerEvent(ev.triggerCapID, ev.triggerIndex, ev.timestamp, ev.event) | ||
| if err := t.queue.Put(ctx, enqueued); err != nil { |
There was a problem hiding this comment.
use an observation/execution method/callback from the engine. Engine should guarantee that this event has been accepted/that it is executing.
|
| // Transmit decodes the report after consensus is reached and | ||
| // enqueues each event into the internal queue. Engine Wait() will return the head | ||
| // of these events. | ||
| func (t *Transmitter) Transmit(ctx context.Context, cd types.ConfigDigest, seqNr uint64, rwi ocr3types.ReportWithInfo[[]byte], sigs []types.AttributedOnchainSignature) error { |
There was a problem hiding this comment.
what's the mechanism for handling failures from Transmit calls? can we atomically commit to executing or rollback. For research: what happens if not all nodes successfully execute Transmit (i.e., start execution on every consensus event)
Subject/Observer situation.
| method string | ||
| } | ||
|
|
||
| // EnqueuedTriggerEvent is the type queued for workflow trigger execution. |
There was a problem hiding this comment.
note: remove or move execution concurrency limit
There was a problem hiding this comment.
question: should we even enforce max execution concurrency at DON level?
There was a problem hiding this comment.
ensure that the queue design is generic enough to put in front of either capabilities or trigger events
| // startExecution initiates a new workflow execution, blocking until completed | ||
| func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueuedTriggerEvent) { | ||
| fullExecutionID, err := events.GenerateExecutionIDWithTriggerIndex(e.cfg.WorkflowID, wrappedTriggerEvent.event.Event.ID, wrappedTriggerEvent.triggerIndex) | ||
| func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent EnqueuedTriggerEvent) { |
There was a problem hiding this comment.
managing trigger execution under congestion.
Primary guarantee: every trigger event in consensus starts a workflow execution. at the end of transmit, the engine has started execution on all trigger events.
possible to allow F failures to start a workflow execution




Requires
Supports