Skip to content

Conversation

@cawthorne
Copy link
Contributor

@cawthorne cawthorne commented Jan 22, 2026

NoDAG CRE LLO Transmitter with Changesets

Summary

This PR adds support for LLO (Low-Latency Oracle) reports to be consumed by CRE workflows via the [email protected] capability. This enables workflows to receive real-time price data from Streams DON oracle networks without requiring a DAG (Directed Acyclic Graph) pipeline.

Background

Previously, LLO reports required a DAG-based pipeline to process and forward reports to workflows. This PR introduces a "NoDAG" implementation that directly connects the LLO plugin to CRE workflows through a new transmitter implementation, simplifying the architecture and reducing latency.

Key Changes

Core Implementation

  1. NoDAG Transmitter (core/services/llo/cre/nodag_transmitter.go)

    • New standalone transmitter that receives reports from the LLO plugin and emits them to workflow subscribers
    • Implements the [email protected] capability
    • Architecture: LLO Plugin → nodagTransmitter.Transmit() → processReport() → subscribers
    • Supports both V1 (values.Map) and V2 (proto) output formats for backward compatibility
  2. Signature Verification (core/capabilities/remote/aggregation/signed_report.go)

    • Added cryptographic signature verification for LLO reports in SignedReportRemoteAggregator
    • Supports two report formats:
      • Format 5: Protobuf-encoded OCRTriggerReport
      • Format 7: ABI-encoded report with calculated streams (EVMABIEncodeUnpackedExpr)
    • Added legacy signature verification for Format 7 reports using v0.3 OCR report context (includes donID in ExtraHash)
    • Validates signatures against allowed signers from the capability registry
  3. Capability Registry Integration (core/capabilities/launcher.go)

    • Modified to use OnchainPublicKey for LLO signer extraction (via environment variable USE_LLO_ONCHAIN_SIGNER)
    • Added support for SignedReportRemoteAggregator for LLO triggers
    • Extracts signer addresses from the capability registry for signature verification
  4. Workflow Engine (core/services/workflows/v2/engine.go)

    • Enhanced to handle trigger subscription failures with proper error logging
    • Improved initialization error handling

Deployment & Configuration

  1. LLO Contract Deployment (core/scripts/cre/environment/deploy/llo_contracts.go)

    • Added support for deploying LLO Configurator contracts using CLD changesets
    • Integrated with the CRE environment setup
  2. Test Configuration (core/scripts/cre/environment/configs/workflow-capabilities-llo-don.toml)

    • New test-specific config file for LLO streams trigger E2E test
    • Removes gateway DON (not needed for LLO test)
    • Minimal capabilities: workflow DON has ["ocr3", "consensus", "don-time"], capabilities DON has ["ocr3", "streams-trigger"]
    • Uses only chain 1337 (no chain 2337 needed)
  3. Keystone Registry (system-tests/lib/cre/contracts/keystone_llo.go)

    • Test-specific override for signer extraction using OnchainPublicKey
    • Activated via USE_LLO_ONCHAIN_SIGNER environment variable
    • Ensures capability registry uses correct signer addresses for LLO signature verification

Testing

  1. E2E Test (system-tests/tests/smoke/cre/v2_llo_streams_trigger_test.go)

    • Full end-to-end test demonstrating complete data flow:
      • Mock EA → Stream Jobs → LLO Plugin → CRE Transmitter → Workflow
    • Tests both Format 5 and Format 7 report formats
    • Verifies signature validation and workflow execution
    • Includes test infrastructure setup helpers
  2. Test Helpers (system-tests/tests/smoke/cre/llo_e2e_setup.go)

    • Helper functions for LLO infrastructure setup
    • Deploys LLO contracts, stream jobs, and LLO jobs
    • Configures OCR settings with proper encryption keys
    • Includes Anvil readiness checks to prevent timing issues

Technical Details

Signature Verification

LLO reports are cryptographically signed by LLO DON nodes as part of the OCR protocol. The signature verification:

  • Uses OnchainPublicKey (20-byte Ethereum address) for signer identification
  • Validates against allowed signers stored in the capability registry
  • Requires F+1 valid signatures for BFT (Byzantine Fault Tolerance)
  • Supports legacy Format 7 reports with donID in ExtraHash

Report Formats

  • Format 5 (ReportFormatCapabilityTrigger): Protobuf-encoded OCRTriggerReport
  • Format 7 (ReportFormatEVMABIEncodeUnpackedExpr): ABI-encoded report with calculated streams

Both formats are supported and automatically detected based on event ID patterns.

Environment Variables

  • USE_LLO_ONCHAIN_SIGNER=true: Enables OnchainPublicKey-based signer extraction for LLO tests
  • LLO_MOCK_EA_URL: URL for mock external adapter (for testing)
  • LLO_CHANNEL_DEFS_URL: URL for channel definitions JSON

Testing

Running the E2E Test

cd system-tests/tests
go test -timeout 15m -run "Test_CRE_V2_LLO_Streams_Trigger_E2E" ./smoke/cre/...

The test:

  1. Sets up LLO infrastructure (contracts, OCR config)
  2. Deploys stream jobs and LLO jobs with CRE transmitter
  3. Deploys a consumer workflow that subscribes to [email protected]
  4. Verifies end-to-end data flow by checking for LLO_E2E_VALUE messages in workflow logs

Test Configuration

The test uses a minimal configuration (workflow-capabilities-llo-don.toml):

  • Workflow DON: 5 nodes with bootstrap, capabilities: ["ocr3", "consensus", "don-time"]
  • Capabilities DON: 4 nodes, capabilities: ["ocr3", "streams-trigger"]
  • No gateway DON: Removed to reduce resource usage
  • Single chain: Only chain 1337 (no chain 2337)

Files Changed

  • Core implementation: nodag_transmitter.go, signed_report.go, launcher.go
  • Deployment: llo_contracts.go, deploy_cmd.go
  • Test infrastructure: v2_llo_streams_trigger_test.go, llo_e2e_setup.go, keystone_llo.go
  • Configuration: workflow-capabilities-llo-don.toml (new test-specific config)

Breaking Changes

None. This is a new feature addition that doesn't affect existing functionality.

Related Issues

  • Enables LLO reports to be consumed by CRE workflows
  • Provides NoDAG alternative to DAG-based LLO pipelines
  • Supports both Format 5 and Format 7 report formats

# we want to write only to the home chain, so that we can test whether remote write-evm-2337 capability of the 'capabilities DON' is used
write-evm = ["1337"]
read-contract = ["1337"]
evm = ["1337"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? we are splitting capabilities like that on purpose. one for 1337 will be executed using local caps, the one for 2337 will be using remote caps


[[nodesets]]
nodes = 4
nodes = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would it require 5 nodes?

port = 13000

[[nodesets.node_specs]]
roles = ["bootstrap"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, bootstrap needs to leave on a separate DON, please don't change the shape of this topology

nodes = 4
name = "capabilities"
don_types = ["capabilities"]
exposes_remote_capabilities = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this change


# we need to have chain 1337 configured (even if no capability uses it), because we use node addresses on chain 1337
# to identify nodes in the gateway configuration (required by both web-api-target and vault capabilities)
supported_evm_chains = [1337, 2337]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this change

[[nodesets]]
nodes = 1
name = "bootstrap-gateway"
don_types = ["bootstrap", "gateway"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this change


# even though this DON is not using any capability for chain with ID 2337 we still need it to be connected to it,
# because bootstrap job for capability DON will be created on the boostrap node from this DON
supported_evm_chains = [1337, 2337]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this change

if os.Getenv("FORCE_DOCKER_REBUILD") == "true" {
args = append(args, "--no-cache")
logger.Info().Msg("FORCE_DOCKER_REBUILD=true: building without cache")
}
Copy link
Contributor

@Tofel Tofel Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need that? what is the actual use case? Why wouldn't you want Docker to use cache if none of the layers changed?

)

func init() {
if os.Getenv("USE_LLO_ONCHAIN_SIGNER") == "true" {
Copy link
Contributor

@Tofel Tofel Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we set it via env var? is this something that has to change dynamically per test? or is it only used when the environment starts? I guess it the latter and if so we can leverage existing mechanisms for configuring capabilities, check what we do for custom compute: https://github.com/smartcontractkit/chainlink/blob/develop/system-tests/lib/cre/features/custom_compute/custom_compute.go#L111, where it applies defaults from this file https://github.com/smartcontractkit/chainlink/blob/develop/core/scripts/cre/environment/configs/capability_defaults.toml#L18 which can be overwritten by each DON in each topology like this: https://github.com/smartcontractkit/chainlink/blob/develop/core/scripts/cre/environment/configs/examples/workflow-don-overrides.toml#L54-L56

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so your streams feature could read that capability config and use one signing method or another

#
# Usage:
# # First start mock EA
# docker run -d --name mock-ea -p 8080:8080 mock-ea:latest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idiomatic way to do it is by adding a struct that represents this mock to config:

type Config struct {
	Blockchains       []*blockchain.Input             `toml:"blockchains" validate:"required"`
	NodeSets          []*cre.NodeSet                  `toml:"nodesets" validate:"required"`
	JD                *jd.Input                       `toml:"jd" validate:"required"`
	Infra             *infra.Provider                 `toml:"infra" validate:"required"`
	Fake              *fake.Input                     `toml:"fake" validate:"required"`
	FakeHTTP          *fake.Input                     `toml:"fake_http" validate:"required"`
    MockEA.              *mockEa.Input               `toml:"mock_ea"` 
	S3ProviderInput   *s3provider.Input               `toml:"s3provider"`
	CapabilityConfigs map[string]cre.CapabilityConfig `toml:"capability_configs"` // capability flag -> capability config
	Addresses         []string                        `toml:"addresses"`

	mu     sync.Mutex
	loaded bool
}

and then in the topology TOML (e.g this file) add it like this:

[mock_ea]
port = 8088

and then in your test you read that config and if mock_ea is not nil then you start the service.

# The streams-trigger feature registers it in the on-chain capability registry
# The mock capability allows us to create a mock implementation for testing
# NOTE: custom-compute removed due to known bug in local CRE framework
capabilities = ["ocr3", "streams-trigger", "mock"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why these two capabilities "streams-trigger", "mock" cannot be added to existing topology, e..g. workflow-gateway-capabilities-don.toml?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can be if there are no complaints.


env_vars = { CL_EVM_CMD = "" }
# streams-trigger and ocr3 are required for LLO streams trigger E2E test
capabilities = ["ocr3", "streams-trigger"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here: is there a reason why these two capabilities "streams-trigger" cannot be added to existing topology, e..g. workflow-gateway-capabilities-don.toml?

No ill will result if a DON has capabilities that aren't actually used by tests or workflows.

@@ -36,31 +36,62 @@
# provider = "kind" # or "aws"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically all the changes made to this file in the current form need to be reverted

}

// Start a simple HTTP server to host channel definitions
channelDefsURL, channelDefsSHA, stopServer := startChannelDefsServer(channelDefsJSON)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this server be expressed in the same way as mock ea? i.e. either a dockerised "fake" or in-process http server that is started by each test with the exact data each test needs?

first approach would be recommended if the data this server serves is static, second if needs to be based on some inputs or manipulated during runtime

}

// For OCR config, we need the OCR config file with all key information
if ocrConfigFile != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local CRE has structs like Node and NodeMetadata that already have all that information. Plus OCR3 changests contain logic that allows them to query that information from the job distributor, so you don't need to gather any of it manually.

})
if err != nil {
return fmt.Errorf("failed to encode LLO onchain config: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like that llo on chain config is static and it could well go into PostEnvStartup hook. Or am I missing something?

OffchainConfig: offchainConfig,
}

return contracts.SetProductionConfig(ctx, client, auth, ocrCfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned in some other places, any reason for not configuring this contract using known and existing changesets? For example like this?

sha = sha256.Sum256(channelDefs)

mux := http.NewServeMux()
mux.HandleFunc("/channel-definitions.json", func(w http.ResponseWriter, r *http.Request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to serve this via JSON if the content of this definitions file never changes?

}

// Deploy Configurator
configuratorAddr, tx, configuratorContract, err := configurator.DeployConfigurator(auth, client)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we would have deployment expressed as a changeset instead of directly calling gethwrappers. How is this contract deployed on production? Is there an existing changeset?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I see we have these changesets and that e2e tests use them, but they just aren't used here.

any reason for having this logic expressed here as a command and in helpers in system-tetsts/tests?

fmt.Printf("Configurator deployed at: %s\n", configuratorAddr.Hex())

// Deploy ChannelConfigStore
channelConfigStoreAddr, tx, channelConfigStore, err := channel_config_store.DeployChannelConfigStore(auth, client)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same: ideally we would have deployment expressed as a changeset instead of directly calling gethwrappers. How is this contract deployed on production? Is there an existing changeset?


// Set config on configurator
// Signature: SetProductionConfig(configId [32]byte, signers [][]byte, offchainTransmitters [][32]byte, f uint8, onchainConfig []byte, offchainConfigVersion uint64, offchainConfig []byte)
tx, err := c.Configurator.SetProductionConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same: configuring the contract should be expressed as a changeset. And then in the local CRE we would use the same changeset that is used on production, just with different data.

sha [32]byte, // SHA256 hash of the channel definitions
) error {
// Signature: SetChannelDefinitions(donId uint32, url string, sha [32]byte)
tx, err := c.ChannelConfigStore.SetChannelDefinitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

}

func RunLLOConsumerWorkflow(config WorkflowConfig, logger *slog.Logger, _ cre.SecretsProvider) (cre.Workflow[WorkflowConfig], error) {
logger.Info(fmt.Sprintf("LLO_CONSUMER_STARTING: streams=%v, expecting MAGIC_FORMAT5=%d, MAGIC_FORMAT7=%d",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't the logger have Infof() function that would allow to format string?

ChannelConfigStoreAddress: ccsAddr,
FromBlock: 0, // Set to actual block number if needed
}, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could all of this happen in PostEnvStartup()? if not, why not?

// Return the config digest (would need to be computed or retrieved from event)
// For now, return empty - the LLO plugin will compute it from the event
return [32]byte{}, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks pretty static in the sense that for each DON this OCR configuration will be the same, why not move it to PostEnvStartup()?

ccsAddr common.Address,
donID uint32,
s3URL string,
hash [32]byte,
Copy link
Contributor

@Tofel Tofel Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do these parameters represent?

  • ccsAddrs
  • s3URL
  • hash

How many of them vary per test and how many never change once DON has been started and contracts deployed?

Why I ask? Ideally I'd would want that to land in PostEnvStartup().

}},
},
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also looks like something that could be done in PostEnvStartup... since once we have the DON and ccsAddr deployed it never changes.

result -> multiply;
"""
`, name, streamID, externalJobID, hardcodedValue)
}
Copy link
Contributor

@Tofel Tofel Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Could we use text/template to render the jobspec instead of fmt.Sprintf?
  2. Could we create jobspec for each known streamID in the PostEnvStartup() since the list of supported streamIDs is short and close-ended?


// Wait for Anvil to be ready before accessing it
testLogger.Info().Msg("Waiting for Anvil to be ready...")
err := waitForAnvil(ctx, evmBlockchain.SethClient.Client, testLogger, 30*time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this ever the case that Anvil wasn't immediately ready?

// Get CLD environment
if testEnv.CreEnvironment.CldfEnvironment == nil {
return nil, fmt.Errorf("CLDF environment is nil - cannot use changesets")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this ever the case? I don't think that testEnv would be created from rehydrated state if cldf env was nil

}

// deployLLOContractsWithChangesets deploys the Configurator contract using CLD changesets
func deployLLOContractsWithChangesets(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does it differ from what happens in llo_deployment_helpers.go?


testLogger.Info().Msg("✓ Production config set using CLD changeset")
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as said before: this looks like a perfect fit for something that could happen in PostEnvStartup()

}
logger.Debug().Int("blocks", numBlocks).Msg("Mined additional blocks")
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary? Anvil is started in mix-mining mode which means that it either mines a block once a tx is received or if none is received it mines a block every 500ms.


testLogger.Info().Int("count", len(jobSpecs)).Msg("✓ Stream jobs deployed")
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, since this job spec has always the same content it should be created in PostEnvStartup(). We could make it configurable via TOML and capability configs, if needed.

}

// Create jobs via Job Distributor
if testEnv.CreEnvironment.CldfEnvironment == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again :D PostEnvStartup()!

Also... why not use existing changesets for OCR3 jobs to create the jobspecs?

Example for bootstrap job. Example for worker jobs.


// If FRESH_ENV=true and state exists, stop environment and delete state to force recreation
if freshEnv && stateExists {
framework.L.Info().Msg("FRESH_ENV=true - stopping existing environment and deleting state...")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

//
// REQUIREMENTS:
// This test requires the mock capability gRPC ports (15002-15005) to be exposed
// on the capabilities DON nodes. This requires topology configuration changes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could just add these port to the existing topology, if they are not used by other tests/use cases there is no harm done


// IMPORTANT: Start the fake price provider BEFORE environment setup
// The LLO feature's PostEnvStartup hook sets the channel definitions URL on the contract,
// and the Docker containers need to be able to reach this URL immediately.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if that URL is not immediately available? is there no retry? if that's a hard requirement we could modify the starting sequence to accommodate this requirement.

// generateMockLLOReport creates a mock LLO report that matches the streams.Report structure
func generateMockLLOReport(seqNr uint64) (*streams.Report, error) {
timestamp := uint64(time.Now().UnixNano())
configDigest, _ := hex.DecodeString("00091599c39d29821b4949b9ba237d2d1d9b7369087a71283c921034898320b0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to see a comment explaining what that config digest contains and who such a value can be obtain/generated if it needs to be updated


// First, explicitly wait for Format 5 (424242)
// Check for "Format=5" to ensure we get Format 5 reports specifically
expectedLogFormat5 := "Format=5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: how is this workflow triggered? I see we are not using mock capability to simulate streams trigger, so what triggers it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants