Skip to content
Draft
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err
if ocr2Delegate == nil {
return nil, errors.New("ocr2.NewDelegate() returned nil")
}
creServices.SetOCRTriggerQueueFactory(ocr2Delegate)
delegates[job.OffchainReporting2] = ocr2Delegate
delegates[job.Bootstrap] = ocrbootstrap.NewDelegateBootstrap(
opts.DS,
Expand Down
67 changes: 64 additions & 3 deletions core/services/cre/cre.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,14 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter"
wftypes "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types"
v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2"

"github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings"
)

// ocrTriggerEventQueueEnabled gates use of the OCR-backed trigger queue.
// TODO: replace with cresettings.OCRTriggerEventQueueEnabled when added to chainlink-common.
const ocrTriggerEventQueueEnabled = false

// Keystore is the minimal interface needed from keystore for CRE
type Keystore interface {
CSA() keystore.CSA
Expand Down Expand Up @@ -116,6 +122,16 @@ type Services struct {

// callback to wire Delegates into CRE services (e.g. Launcher) when ready
SetDelegatesDeps func(*standardcapabilities.Delegate) (commonsrv.Service, error)

// triggerQueueFactory builds the trigger queue constructor, which is used
// to manage the queue of trigger events in the engine.
triggerQueueFactory v2.TriggerQueueFactory
}

// SetOCRTriggerQueueFactory sets the creator for the OCR-backed trigger queue.
// Called by application.go after creating the OCR delegate.
func (s *Services) SetOCRTriggerQueueFactory(c v2.TriggerQueueFactory) {
s.triggerQueueFactory = c
}

func (s *Services) close() error {
Expand Down Expand Up @@ -222,6 +238,7 @@ func (s *Services) newSubservices(
cfg,
relayerChainInterops,
opts,
s.triggerQueueFactory,
lggr,
ds,
opts.DonTimeStore,
Expand Down Expand Up @@ -311,7 +328,7 @@ func newRegistrySyncerV1(
relayer loop.Relayer,
registryAddress string,
ds sqlutil.DataSource,
externalPeerWrapper p2ptypes.PeerWrapper,
_ p2ptypes.PeerWrapper,
ocrConfigService capregconfig.OCRConfigService,
wfLauncher registrysyncerV1.Listener,
) ([]commonsrv.Service, error) {
Expand Down Expand Up @@ -830,6 +847,7 @@ func newWorkflowRegistrySyncerV2(
relayerChainInterops RelayerChainInterops,
billingClient metering.BillingClient,
opts Opts,
triggerQueueFactory v2.TriggerQueueFactory,
lggr logger.Logger,
ds sqlutil.DataSource,
dontimeStore *dontime.Store,
Expand Down Expand Up @@ -874,11 +892,52 @@ func newWorkflowRegistrySyncerV2(

engineRegistry := syncerV2.NewEngineRegistry()

engineLimiters, err := v2.NewLimiters(lf, nil)
// Enable override of default queue behavior
// TODO: use cre settings package for feature flag
var triggerQueue limits.QueueLimiter[v2.EnqueuedTriggerEvent]
var consensusEventDispatcher *syncerV2.ConsensusEventDispatcher
if ocrTriggerEventQueueEnabled && triggerQueueFactory != nil {
// OCR mode: create dispatcher first (queue=nil, no Wait loop); Transmitter delivers via OnConsensusEvent.
consensusEventDispatcher, err = syncerV2.NewConsensusEventDispatcher(
lggr,
engineRegistry,
nil, // queue=nil: events come from Transmitter callback
nil, // engineLimiters not needed for routing
)
if err != nil {
return nil, nil, fmt.Errorf("could not create consensus event dispatcher: %w", err)
}
cfg := cresettings.Default.PerWorkflow
deps := v2.TriggerQueueDeps{
Lf: lf,
Cfg: &cfg,
DonSubscriber: workflowDonNotifier,
ConsensusEventReceiver: consensusEventDispatcher,
}
var tqErr error
triggerQueue, tqErr = triggerQueueFactory.NewOCRTriggerQueue(context.Background(), deps)
if tqErr != nil {
return nil, nil, fmt.Errorf("could not create OCR trigger queue: %w", tqErr)
}
}
engineLimiters, err := v2.NewLimitersWithTriggerQueue(lf, nil, triggerQueue)
if err != nil {
return nil, nil, fmt.Errorf("could not instantiate engine limiters: %w", err)
}

if consensusEventDispatcher == nil {
// Standard mode: dispatcher runs Wait loop on the queue
consensusEventDispatcher, err = syncerV2.NewConsensusEventDispatcher(
lggr,
engineRegistry,
engineLimiters.TriggerEventQueue,
engineLimiters,
)
if err != nil {
return nil, nil, fmt.Errorf("could not create consensus event dispatcher: %w", err)
}
}

featureFlags, err := v2.NewFeatureFlags(lf, nil)
if err != nil {
return nil, nil, fmt.Errorf("could not instantiate engine feature flags: %w", err)
Expand Down Expand Up @@ -974,7 +1033,7 @@ func newWorkflowRegistrySyncerV2(
return nil, nil, fmt.Errorf("unable to create workflow registry syncer: %w", err)
}

srvcs = append(srvcs, workflowRegistrySyncerV2)
srvcs = append(srvcs, consensusEventDispatcher, workflowRegistrySyncerV2)
lggr.Debugw("Created WorkflowRegistrySyncer V2")
return workflowRegistrySyncerV2, srvcs, nil
}
Expand All @@ -984,6 +1043,7 @@ func newWorkflowRegistrySyncer(
cfg Config,
relayerChainInterops RelayerChainInterops,
opts Opts,
triggerQueueFactory v2.TriggerQueueFactory,
lggr logger.Logger,
ds sqlutil.DataSource,
dontimeStore *dontime.Store,
Expand Down Expand Up @@ -1032,6 +1092,7 @@ func newWorkflowRegistrySyncer(
relayerChainInterops,
billingClient,
opts,
triggerQueueFactory,
lggr,
ds,
dontimeStore,
Expand Down
49 changes: 44 additions & 5 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaulttypes"
"github.com/smartcontractkit/chainlink/v2/core/config/env"
syncerV2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/v2"
v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2"

"github.com/smartcontractkit/smdkg/dkgocr/oracleargs"

Expand Down Expand Up @@ -89,6 +90,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/autotelemetry21"
ocr2keeper21core "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core"
ringconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ring/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/triggerqueue"
vaultocrplugin "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/vault"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr3_1/beholderwrapper"
Expand Down Expand Up @@ -336,6 +338,48 @@ func (d *Delegate) JobType() job.Type {
return job.OffchainReporting2
}

// triggerQueueContractID is a synthetic ID for monitoring; trigger queue has no on-chain contract.
const triggerQueueContractID = "trigger_queue"

// NewOCRTriggerQueue creates the trigger queue for the workflow syncer.
// Delegate owns the oracle; builds OCR3_1OracleArgs with TODOs for unwired fields.
// Returns OCRQueue; transmitter delivers consensus events via receiver.OnConsensusEvent.
func (d *Delegate) NewOCRTriggerQueue(ctx context.Context, deps v2.TriggerQueueDeps) (limits.QueueLimiter[v2.EnqueuedTriggerEvent], error) {
if deps.ConsensusEventReceiver == nil {
return nil, errors.New("OCR trigger queue requires ConsensusEventReceiver")
}
inner, err := v2.NewStandardTriggerQueue(deps.Lf, deps.Cfg)
if err != nil {
return nil, err
}
lamport := &v2.LamportCounter{}
buffer := v2.NewObservationBuffer[v2.EnqueuedTriggerEvent](lamport)

// Build OCR3_1OracleArgs for the trigger queue. TODO: wire all fields and call libocr2.NewOracle.
ocrLogger := ocrcommon.NewOCRWrapper(d.lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) {
d.lggr.Warnw("OCR trigger queue", "msg", msg)
})
oracleArgs := libocr2.OCR3_1OracleArgs[[]byte]{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer3_1,
V2Bootstrappers: nil, // TODO: wire bootstrap peers from workflow DON (deps.DonSubscriber.WaitForDon or config)
ContractConfigTracker: nil, // TODO: in-process config; need static/dynamic config from DON (no on-chain contract)
ContractTransmitter: triggerqueue.NewTransmitter(deps.ConsensusEventReceiver, d.lggr),
Database: nil, // TODO: wire OCR DB (e.g. NewDB(d.ds, triggerQueuePluginID, 0, d.lggr)); need unique plugin ID
KeyValueDatabaseFactory: nil, // TODO: wire KV factory (e.g. kvdb.NewPebbleKeyValueDatabaseFactory(path)); path = KeyValueStoreRootDir/trigger_queue
LocalConfig: ocrtypes.LocalConfig{}, // TODO: wire LocalConfig (BlockDelta, etc.)
Logger: ocrLogger,
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": triggerQueueContractID}, prometheus.DefaultRegisterer),
MonitoringEndpoint: nil, // TODO: wire d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, triggerQueueContractID, ...); need rid for trigger queue
OffchainConfigDigester: nil, // TODO: in-process digester; derive config digest from DON (members, F, offchain config)
OffchainKeyring: nil, // TODO: wire OCR key bundle (d.ks.Get or workflow DON key)
OnchainKeyring: nil, // TODO: wire onchain keyring adapter (trigger queue may use same as vault/dontime for in-process)
ReportingPluginFactory: beholderwrapper.NewReportingPluginFactory(triggerqueue.NewFactory(d.lggr, buffer), d.lggr, "triggerqueue"),
}
_ = oracleArgs // TODO: pass to libocr2.NewOracle(oracleArgs) when all fields wired

return v2.NewOCRQueue[v2.EnqueuedTriggerEvent](v2.OCRQueueDeps[v2.EnqueuedTriggerEvent]{Inner: inner, Buffer: buffer})
}

func (d *Delegate) BeforeJobCreated(_ job.Job) {
// This is only called first time the job is created
d.isNewlyCreatedJob = true
Expand Down Expand Up @@ -1046,9 +1090,6 @@ func (d *Delegate) newDonTimePlugin(
return nil, err
}

if err != nil {
return nil, err
}
srvs = append(srvs, job.NewServiceAdapter(oracle))
return srvs, nil
}
Expand Down Expand Up @@ -2445,7 +2486,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug
return nil, fmt.Errorf("chain not supported for CCIP execution: %s", spec.Relay)
}
dstRid, err := spec.RelayID()

if err != nil {
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
}
Expand Down Expand Up @@ -2506,7 +2546,6 @@ func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, plugi
return nil, fmt.Errorf("chain not supported for CCIP execution: %s", spec.Relay)
}
dstRid, err := spec.RelayID()

if err != nil {
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
}
Expand Down
44 changes: 44 additions & 0 deletions core/services/ocr2/plugins/triggerqueue/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package triggerqueue

import (
"context"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var _ ocr3_1types.ReportingPluginFactory[[]byte] = (*Factory)(nil)

// Factory creates ReportingPlugin instances for the trigger queue.
type Factory struct {
lggr logger.Logger
buffer *v2.ObservationBuffer[v2.EnqueuedTriggerEvent]
}

// NewFactory creates a new trigger queue plugin factory.
func NewFactory(lggr logger.Logger, buffer *v2.ObservationBuffer[v2.EnqueuedTriggerEvent]) *Factory {
return &Factory{lggr: lggr.Named("TriggerQueueFactory"), buffer: buffer}
}

func (f *Factory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig, fetcher ocr3_1types.BlobBroadcastFetcher) (ocr3_1types.ReportingPlugin[[]byte], ocr3_1types.ReportingPluginInfo, error) {
plugin := NewReportingPlugin(f.lggr, f.buffer)
info := ocr3_1types.ReportingPluginInfo1{
Name: "triggerqueue",
Limits: ocr3_1types.ReportingPluginLimits{
MaxQueryBytes: 100,
MaxObservationBytes: 500 * 1024, // 500KB per design doc
MaxReportsPlusPrecursorBytes: 500 * 1024,
MaxReportBytes: 500 * 1024,
MaxReportCount: 1,
MaxKeyValueModifiedKeys: 500,
MaxKeyValueModifiedKeysPlusValuesBytes: 1024 * 1024, // 1MB
MaxBlobPayloadBytes: 25 * 1024, // 25KB per design doc
MaxPerOracleUnexpiredBlobCumulativePayloadBytes: 30 * 1024 * 1024,
MaxPerOracleUnexpiredBlobCount: 1000,
},
}
return plugin, info, nil
}
86 changes: 86 additions & 0 deletions core/services/ocr2/plugins/triggerqueue/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package triggerqueue

import (
"context"
"encoding/json"
"errors"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// errNotImplemented is returned by all plugin methods in this draft.
var errNotImplemented = errors.New("triggerqueue plugin: draft implementation, not yet implemented")

var _ ocr3_1types.ReportingPlugin[[]byte] = (*ReportingPlugin)(nil)

// ReportingPlugin implements OCR 3.1 ReportingPlugin for the trigger queue.
type ReportingPlugin struct {
lggr logger.Logger
buffer *v2.ObservationBuffer[v2.EnqueuedTriggerEvent]
}

// NewReportingPlugin creates a new ReportingPlugin.
func NewReportingPlugin(lggr logger.Logger, buffer *v2.ObservationBuffer[v2.EnqueuedTriggerEvent]) *ReportingPlugin {
return &ReportingPlugin{lggr: lggr.Named("TriggerQueuePlugin"), buffer: buffer}
}

func (p *ReportingPlugin) Query(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Query, error) {
return nil, errNotImplemented
}

// Observation reads from the buffer (filled by OCRQueue.Put) and produces an observation.
// Draft: returns minimal observation (event IDs and Lamport as JSON); full impl would BroadcastBlob for payloads.
func (p *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Observation, error) {
events := p.buffer.TakeForObservation()
if len(events) == 0 {
return []byte("[]"), nil
}
obs := make([]struct {
ID string `json:"id"`
Lamport uint64 `json:"lamport"`
}, len(events))
for i, be := range events {
obs[i] = struct {
ID string `json:"id"`
Lamport uint64 `json:"lamport"`
}{ID: be.ID(), Lamport: be.Lamport()}
}
return json.Marshal(obs)
}

func (p *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, ao types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) error {
return errNotImplemented
}

func (p *ReportingPlugin) ObservationQuorum(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) (bool, error) {
return false, errNotImplemented
}

func (p *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReadWriter ocr3_1types.KeyValueStateReadWriter, blobFetcher ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) {
return ocr3_1types.ReportsPlusPrecursor{}, errNotImplemented
}

func (p *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlusPrecursor ocr3_1types.ReportsPlusPrecursor) ([]ocr3types.ReportPlus[[]byte], error) {
return nil, errNotImplemented
}

func (p *ReportingPlugin) Committed(ctx context.Context, seqNr uint64, keyValueReader ocr3_1types.KeyValueStateReader) error {
return errNotImplemented
}

func (p *ReportingPlugin) ShouldAcceptAttestedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[[]byte]) (bool, error) {
return false, errNotImplemented
}

func (p *ReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[[]byte]) (bool, error) {
return false, errNotImplemented
}

func (p *ReportingPlugin) Close() error {
return nil
}
Loading
Loading