Skip to content

draft: ocr trigger queue#21461

Draft
MStreet3 wants to merge 9 commits intodevelopfrom
tmp-ocr-queue-draft
Draft

draft: ocr trigger queue#21461
MStreet3 wants to merge 9 commits intodevelopfrom
tmp-ocr-queue-draft

Conversation

@MStreet3
Copy link
Contributor

@MStreet3 MStreet3 commented Mar 9, 2026

image

Requires

Supports

@github-actions
Copy link
Contributor

github-actions bot commented Mar 9, 2026

I see you updated files related to core. Please run make gocs in the root directory to add a changeset as well as in the text include at least one of the following tags:

  • #added For any new functionality added.
  • #breaking_change For any functionality that requires manual action for the node to boot.
  • #bugfix For bug fixes.
  • #changed For any change to the existing functionality.
  • #db_update For any feature that introduces updates to database schema.
  • #deprecation_notice For any upcoming deprecation functionality.
  • #internal For changesets that need to be excluded from the final changelog.
  • #nops For any feature that is NOP facing and needs to be in the official Release Notes for the release.
  • #removed For any functionality/config that is removed.
  • #updated For any functionality that is updated.
  • #wip For any change that is not ready yet and external communication about it should be held off till it is feature complete.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 9, 2026

✅ No conflicts with other open PRs targeting develop

@trunk-io
Copy link

trunk-io bot commented Mar 9, 2026

Static BadgeStatic BadgeStatic BadgeStatic Badge

View Full Report ↗︎Docs

}

// Add appends an event to the buffer. Called by OCRQueue.Put.
func (b *ObservationBuffer) Add(event EnqueuedTriggerEvent) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to persist events here possibly into the 3.1 KV store

}
for _, ev := range events {
enqueued := v2.NewEnqueuedTriggerEvent(ev.triggerCapID, ev.triggerIndex, ev.timestamp, ev.event)
if err := t.queue.Put(ctx, enqueued); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use an observation/execution method/callback from the engine. Engine should guarantee that this event has been accepted/that it is executing.

@cl-sonarqube-production
Copy link

Quality Gate failed Quality Gate failed

Failed conditions
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube

Catch issues before they fail your Quality Gate with our IDE extension SonarQube IDE SonarQube IDE

// Transmit decodes the report after consensus is reached and
// enqueues each event into the internal queue. Engine Wait() will return the head
// of these events.
func (t *Transmitter) Transmit(ctx context.Context, cd types.ConfigDigest, seqNr uint64, rwi ocr3types.ReportWithInfo[[]byte], sigs []types.AttributedOnchainSignature) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the mechanism for handling failures from Transmit calls? can we atomically commit to executing or rollback. For research: what happens if not all nodes successfully execute Transmit (i.e., start execution on every consensus event)

Subject/Observer situation.

method string
}

// EnqueuedTriggerEvent is the type queued for workflow trigger execution.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: remove or move execution concurrency limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: should we even enforce max execution concurrency at DON level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure that the queue design is generic enough to put in front of either capabilities or trigger events

// startExecution initiates a new workflow execution, blocking until completed
func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueuedTriggerEvent) {
fullExecutionID, err := events.GenerateExecutionIDWithTriggerIndex(e.cfg.WorkflowID, wrappedTriggerEvent.event.Event.ID, wrappedTriggerEvent.triggerIndex)
func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent EnqueuedTriggerEvent) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

managing trigger execution under congestion.

Primary guarantee: every trigger event in consensus starts a workflow execution. at the end of transmit, the engine has started execution on all trigger events.

possible to allow F failures to start a workflow execution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant