-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Feature/add no dag cre llo transmitter with changesets #20867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Feature/add no dag cre llo transmitter with changesets #20867
Conversation
| # we want to write only to the home chain, so that we can test whether remote write-evm-2337 capability of the 'capabilities DON' is used | ||
| write-evm = ["1337"] | ||
| read-contract = ["1337"] | ||
| evm = ["1337"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? we are splitting capabilities like that on purpose. one for 1337 will be executed using local caps, the one for 2337 will be using remote caps
|
|
||
| [[nodesets]] | ||
| nodes = 4 | ||
| nodes = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would it require 5 nodes?
| port = 13000 | ||
|
|
||
| [[nodesets.node_specs]] | ||
| roles = ["bootstrap"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, bootstrap needs to leave on a separate DON, please don't change the shape of this topology
| nodes = 4 | ||
| name = "capabilities" | ||
| don_types = ["capabilities"] | ||
| exposes_remote_capabilities = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this change
|
|
||
| # we need to have chain 1337 configured (even if no capability uses it), because we use node addresses on chain 1337 | ||
| # to identify nodes in the gateway configuration (required by both web-api-target and vault capabilities) | ||
| supported_evm_chains = [1337, 2337] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this change
| [[nodesets]] | ||
| nodes = 1 | ||
| name = "bootstrap-gateway" | ||
| don_types = ["bootstrap", "gateway"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this change
|
|
||
| # even though this DON is not using any capability for chain with ID 2337 we still need it to be connected to it, | ||
| # because bootstrap job for capability DON will be created on the boostrap node from this DON | ||
| supported_evm_chains = [1337, 2337] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this change
| if os.Getenv("FORCE_DOCKER_REBUILD") == "true" { | ||
| args = append(args, "--no-cache") | ||
| logger.Info().Msg("FORCE_DOCKER_REBUILD=true: building without cache") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need that? what is the actual use case? Why wouldn't you want Docker to use cache if none of the layers changed?
| ) | ||
|
|
||
| func init() { | ||
| if os.Getenv("USE_LLO_ONCHAIN_SIGNER") == "true" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we set it via env var? is this something that has to change dynamically per test? or is it only used when the environment starts? I guess it the latter and if so we can leverage existing mechanisms for configuring capabilities, check what we do for custom compute: https://github.com/smartcontractkit/chainlink/blob/develop/system-tests/lib/cre/features/custom_compute/custom_compute.go#L111, where it applies defaults from this file https://github.com/smartcontractkit/chainlink/blob/develop/core/scripts/cre/environment/configs/capability_defaults.toml#L18 which can be overwritten by each DON in each topology like this: https://github.com/smartcontractkit/chainlink/blob/develop/core/scripts/cre/environment/configs/examples/workflow-don-overrides.toml#L54-L56
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so your streams feature could read that capability config and use one signing method or another
| # | ||
| # Usage: | ||
| # # First start mock EA | ||
| # docker run -d --name mock-ea -p 8080:8080 mock-ea:latest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idiomatic way to do it is by adding a struct that represents this mock to config:
type Config struct {
Blockchains []*blockchain.Input `toml:"blockchains" validate:"required"`
NodeSets []*cre.NodeSet `toml:"nodesets" validate:"required"`
JD *jd.Input `toml:"jd" validate:"required"`
Infra *infra.Provider `toml:"infra" validate:"required"`
Fake *fake.Input `toml:"fake" validate:"required"`
FakeHTTP *fake.Input `toml:"fake_http" validate:"required"`
MockEA. *mockEa.Input `toml:"mock_ea"`
S3ProviderInput *s3provider.Input `toml:"s3provider"`
CapabilityConfigs map[string]cre.CapabilityConfig `toml:"capability_configs"` // capability flag -> capability config
Addresses []string `toml:"addresses"`
mu sync.Mutex
loaded bool
}
and then in the topology TOML (e.g this file) add it like this:
[mock_ea]
port = 8088
and then in your test you read that config and if mock_ea is not nil then you start the service.
| # The streams-trigger feature registers it in the on-chain capability registry | ||
| # The mock capability allows us to create a mock implementation for testing | ||
| # NOTE: custom-compute removed due to known bug in local CRE framework | ||
| capabilities = ["ocr3", "streams-trigger", "mock"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason why these two capabilities "streams-trigger", "mock" cannot be added to existing topology, e..g. workflow-gateway-capabilities-don.toml?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They can be if there are no complaints.
|
|
||
| env_vars = { CL_EVM_CMD = "" } | ||
| # streams-trigger and ocr3 are required for LLO streams trigger E2E test | ||
| capabilities = ["ocr3", "streams-trigger"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here: is there a reason why these two capabilities "streams-trigger" cannot be added to existing topology, e..g. workflow-gateway-capabilities-don.toml?
No ill will result if a DON has capabilities that aren't actually used by tests or workflows.
| @@ -36,31 +36,62 @@ | |||
| # provider = "kind" # or "aws" | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically all the changes made to this file in the current form need to be reverted
| } | ||
|
|
||
| // Start a simple HTTP server to host channel definitions | ||
| channelDefsURL, channelDefsSHA, stopServer := startChannelDefsServer(channelDefsJSON) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this server be expressed in the same way as mock ea? i.e. either a dockerised "fake" or in-process http server that is started by each test with the exact data each test needs?
first approach would be recommended if the data this server serves is static, second if needs to be based on some inputs or manipulated during runtime
| } | ||
|
|
||
| // For OCR config, we need the OCR config file with all key information | ||
| if ocrConfigFile != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local CRE has structs like Node and NodeMetadata that already have all that information. Plus OCR3 changests contain logic that allows them to query that information from the job distributor, so you don't need to gather any of it manually.
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to encode LLO onchain config: %w", err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like that llo on chain config is static and it could well go into PostEnvStartup hook. Or am I missing something?
| OffchainConfig: offchainConfig, | ||
| } | ||
|
|
||
| return contracts.SetProductionConfig(ctx, client, auth, ocrCfg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned in some other places, any reason for not configuring this contract using known and existing changesets? For example like this?
| sha = sha256.Sum256(channelDefs) | ||
|
|
||
| mux := http.NewServeMux() | ||
| mux.HandleFunc("/channel-definitions.json", func(w http.ResponseWriter, r *http.Request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason to serve this via JSON if the content of this definitions file never changes?
| } | ||
|
|
||
| // Deploy Configurator | ||
| configuratorAddr, tx, configuratorContract, err := configurator.DeployConfigurator(auth, client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally we would have deployment expressed as a changeset instead of directly calling gethwrappers. How is this contract deployed on production? Is there an existing changeset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I see we have these changesets and that e2e tests use them, but they just aren't used here.
any reason for having this logic expressed here as a command and in helpers in system-tetsts/tests?
| fmt.Printf("Configurator deployed at: %s\n", configuratorAddr.Hex()) | ||
|
|
||
| // Deploy ChannelConfigStore | ||
| channelConfigStoreAddr, tx, channelConfigStore, err := channel_config_store.DeployChannelConfigStore(auth, client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same: ideally we would have deployment expressed as a changeset instead of directly calling gethwrappers. How is this contract deployed on production? Is there an existing changeset?
|
|
||
| // Set config on configurator | ||
| // Signature: SetProductionConfig(configId [32]byte, signers [][]byte, offchainTransmitters [][32]byte, f uint8, onchainConfig []byte, offchainConfigVersion uint64, offchainConfig []byte) | ||
| tx, err := c.Configurator.SetProductionConfig( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same: configuring the contract should be expressed as a changeset. And then in the local CRE we would use the same changeset that is used on production, just with different data.
| sha [32]byte, // SHA256 hash of the channel definitions | ||
| ) error { | ||
| // Signature: SetChannelDefinitions(donId uint32, url string, sha [32]byte) | ||
| tx, err := c.ChannelConfigStore.SetChannelDefinitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
| } | ||
|
|
||
| func RunLLOConsumerWorkflow(config WorkflowConfig, logger *slog.Logger, _ cre.SecretsProvider) (cre.Workflow[WorkflowConfig], error) { | ||
| logger.Info(fmt.Sprintf("LLO_CONSUMER_STARTING: streams=%v, expecting MAGIC_FORMAT5=%d, MAGIC_FORMAT7=%d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't the logger have Infof() function that would allow to format string?
| ChannelConfigStoreAddress: ccsAddr, | ||
| FromBlock: 0, // Set to actual block number if needed | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could all of this happen in PostEnvStartup()? if not, why not?
| // Return the config digest (would need to be computed or retrieved from event) | ||
| // For now, return empty - the LLO plugin will compute it from the event | ||
| return [32]byte{}, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks pretty static in the sense that for each DON this OCR configuration will be the same, why not move it to PostEnvStartup()?
| ccsAddr common.Address, | ||
| donID uint32, | ||
| s3URL string, | ||
| hash [32]byte, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do these parameters represent?
ccsAddrss3URLhash
How many of them vary per test and how many never change once DON has been started and contracts deployed?
Why I ask? Ideally I'd would want that to land in PostEnvStartup().
| }}, | ||
| }, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this also looks like something that could be done in PostEnvStartup... since once we have the DON and ccsAddr deployed it never changes.
| result -> multiply; | ||
| """ | ||
| `, name, streamID, externalJobID, hardcodedValue) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Could we use
text/templateto render the jobspec instead offmt.Sprintf? - Could we create jobspec for each known
streamIDin thePostEnvStartup()since the list of supportedstreamIDsis short and close-ended?
|
|
||
| // Wait for Anvil to be ready before accessing it | ||
| testLogger.Info().Msg("Waiting for Anvil to be ready...") | ||
| err := waitForAnvil(ctx, evmBlockchain.SethClient.Client, testLogger, 30*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this ever the case that Anvil wasn't immediately ready?
| // Get CLD environment | ||
| if testEnv.CreEnvironment.CldfEnvironment == nil { | ||
| return nil, fmt.Errorf("CLDF environment is nil - cannot use changesets") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this ever the case? I don't think that testEnv would be created from rehydrated state if cldf env was nil
| } | ||
|
|
||
| // deployLLOContractsWithChangesets deploys the Configurator contract using CLD changesets | ||
| func deployLLOContractsWithChangesets( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does it differ from what happens in llo_deployment_helpers.go?
|
|
||
| testLogger.Info().Msg("✓ Production config set using CLD changeset") | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as said before: this looks like a perfect fit for something that could happen in PostEnvStartup()
| } | ||
| logger.Debug().Int("blocks", numBlocks).Msg("Mined additional blocks") | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this necessary? Anvil is started in mix-mining mode which means that it either mines a block once a tx is received or if none is received it mines a block every 500ms.
|
|
||
| testLogger.Info().Int("count", len(jobSpecs)).Msg("✓ Stream jobs deployed") | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same, since this job spec has always the same content it should be created in PostEnvStartup(). We could make it configurable via TOML and capability configs, if needed.
| } | ||
|
|
||
| // Create jobs via Job Distributor | ||
| if testEnv.CreEnvironment.CldfEnvironment == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| // If FRESH_ENV=true and state exists, stop environment and delete state to force recreation | ||
| if freshEnv && stateExists { | ||
| framework.L.Info().Msg("FRESH_ENV=true - stopping existing environment and deleting state...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
| // | ||
| // REQUIREMENTS: | ||
| // This test requires the mock capability gRPC ports (15002-15005) to be exposed | ||
| // on the capabilities DON nodes. This requires topology configuration changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could just add these port to the existing topology, if they are not used by other tests/use cases there is no harm done
|
|
||
| // IMPORTANT: Start the fake price provider BEFORE environment setup | ||
| // The LLO feature's PostEnvStartup hook sets the channel definitions URL on the contract, | ||
| // and the Docker containers need to be able to reach this URL immediately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what will happen if that URL is not immediately available? is there no retry? if that's a hard requirement we could modify the starting sequence to accommodate this requirement.
| // generateMockLLOReport creates a mock LLO report that matches the streams.Report structure | ||
| func generateMockLLOReport(seqNr uint64) (*streams.Report, error) { | ||
| timestamp := uint64(time.Now().UnixNano()) | ||
| configDigest, _ := hex.DecodeString("00091599c39d29821b4949b9ba237d2d1d9b7369087a71283c921034898320b0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great to see a comment explaining what that config digest contains and who such a value can be obtain/generated if it needs to be updated
|
|
||
| // First, explicitly wait for Format 5 (424242) | ||
| // Check for "Format=5" to ensure we get Format 5 reports specifically | ||
| expectedLogFormat5 := "Format=5" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious: how is this workflow triggered? I see we are not using mock capability to simulate streams trigger, so what triggers it?
NoDAG CRE LLO Transmitter with Changesets
Summary
This PR adds support for LLO (Low-Latency Oracle) reports to be consumed by CRE workflows via the
[email protected]capability. This enables workflows to receive real-time price data from Streams DON oracle networks without requiring a DAG (Directed Acyclic Graph) pipeline.Background
Previously, LLO reports required a DAG-based pipeline to process and forward reports to workflows. This PR introduces a "NoDAG" implementation that directly connects the LLO plugin to CRE workflows through a new transmitter implementation, simplifying the architecture and reducing latency.
Key Changes
Core Implementation
NoDAG Transmitter (
core/services/llo/cre/nodag_transmitter.go)[email protected]capabilityLLO Plugin → nodagTransmitter.Transmit() → processReport() → subscribersSignature Verification (
core/capabilities/remote/aggregation/signed_report.go)SignedReportRemoteAggregatorCapability Registry Integration (
core/capabilities/launcher.go)OnchainPublicKeyfor LLO signer extraction (via environment variableUSE_LLO_ONCHAIN_SIGNER)SignedReportRemoteAggregatorfor LLO triggersWorkflow Engine (
core/services/workflows/v2/engine.go)Deployment & Configuration
LLO Contract Deployment (
core/scripts/cre/environment/deploy/llo_contracts.go)Test Configuration (
core/scripts/cre/environment/configs/workflow-capabilities-llo-don.toml)["ocr3", "consensus", "don-time"], capabilities DON has["ocr3", "streams-trigger"]Keystone Registry (
system-tests/lib/cre/contracts/keystone_llo.go)OnchainPublicKeyUSE_LLO_ONCHAIN_SIGNERenvironment variableTesting
E2E Test (
system-tests/tests/smoke/cre/v2_llo_streams_trigger_test.go)Test Helpers (
system-tests/tests/smoke/cre/llo_e2e_setup.go)Technical Details
Signature Verification
LLO reports are cryptographically signed by LLO DON nodes as part of the OCR protocol. The signature verification:
OnchainPublicKey(20-byte Ethereum address) for signer identificationReport Formats
Both formats are supported and automatically detected based on event ID patterns.
Environment Variables
USE_LLO_ONCHAIN_SIGNER=true: Enables OnchainPublicKey-based signer extraction for LLO testsLLO_MOCK_EA_URL: URL for mock external adapter (for testing)LLO_CHANNEL_DEFS_URL: URL for channel definitions JSONTesting
Running the E2E Test
The test:
[email protected]LLO_E2E_VALUEmessages in workflow logsTest Configuration
The test uses a minimal configuration (
workflow-capabilities-llo-don.toml):["ocr3", "consensus", "don-time"]["ocr3", "streams-trigger"]Files Changed
nodag_transmitter.go,signed_report.go,launcher.gollo_contracts.go,deploy_cmd.gov2_llo_streams_trigger_test.go,llo_e2e_setup.go,keystone_llo.goworkflow-capabilities-llo-don.toml(new test-specific config)Breaking Changes
None. This is a new feature addition that doesn't affect existing functionality.
Related Issues