From 2c47912642bad182e476ffbf0bca5e558a4fc069 Mon Sep 17 00:00:00 2001 From: Renato Maia <1887792+renatomaia@users.noreply.github.com> Date: Wed, 27 May 2026 19:07:08 -0300 Subject: [PATCH 1/4] refactor(evmreader): poll periodically for new blocks instead of websocket notifications --- Makefile | 2 +- cmd/cartesi-rollups-evm-reader/root/root.go | 28 +- cmd/cartesi-rollups-node/root/root.go | 36 +- compose.individual-services.yaml | 5 +- compose.yaml | 5 +- internal/config/generate/Config.toml | 34 +- internal/config/generated.go | 268 ++++++--------- internal/evmreader/dave_consensus_test.go | 76 +++++ internal/evmreader/edge_cases_test.go | 80 +---- internal/evmreader/evmreader.go | 169 ++-------- internal/evmreader/evmreader_test.go | 349 +++++++++++++------- internal/evmreader/fixtures_test.go | 8 - internal/evmreader/input_test.go | 158 ++------- internal/evmreader/mocks_test.go | 80 +---- internal/evmreader/output_test.go | 162 +++------ internal/evmreader/sealedepochs.go | 21 ++ internal/evmreader/service.go | 73 +--- internal/evmreader/service_config_test.go | 58 +++- internal/node/node.go | 17 +- pkg/ethutil/client.go | 8 +- pkg/ethutil/client_test.go | 28 ++ pkg/service/service.go | 7 +- pkg/service/service_test.go | 16 + test/compose/compose.integration.yaml | 2 +- test/compose/compose.test.yaml | 2 +- test/secrets/blockchain_ws_endpoint.txt | 1 - 26 files changed, 732 insertions(+), 961 deletions(-) delete mode 100644 test/secrets/blockchain_ws_endpoint.txt diff --git a/Makefile b/Makefile index a00b130a5..5a0a51281 100644 --- a/Makefile +++ b/Makefile @@ -121,8 +121,8 @@ env: @echo export CARTESI_LOG_LEVEL="info" @echo export CARTESI_BLOCKCHAIN_DEFAULT_BLOCK="latest" @echo export CARTESI_BLOCKCHAIN_HTTP_ENDPOINT="http://localhost:8545" - @echo export CARTESI_BLOCKCHAIN_WS_ENDPOINT="ws://localhost:8545" @echo export CARTESI_BLOCKCHAIN_ID="31337" + @echo export CARTESI_EVM_READER_POLLING_INTERVAL="1" @echo export CARTESI_CONTRACTS_INPUT_BOX_ADDRESS="0x346B3df038FE9f8380071eC6514D5a83aD143939" @echo export CARTESI_CONTRACTS_AUTHORITY_FACTORY_ADDRESS="0x3C1FE01c542a88A523FF6847eD1E26176c8C4ED0" @echo export CARTESI_CONTRACTS_QUORUM_FACTORY_ADDRESS="0x1f94009389F408B8D0ADfFcF8BBDCe5552BaCa5F" diff --git a/cmd/cartesi-rollups-evm-reader/root/root.go b/cmd/cartesi-rollups-evm-reader/root/root.go index cae77132f..08faec1ed 100644 --- a/cmd/cartesi-rollups-evm-reader/root/root.go +++ b/cmd/cartesi-rollups-evm-reader/root/root.go @@ -13,7 +13,6 @@ import ( "github.com/cartesi/rollups-node/internal/version" "github.com/cartesi/rollups-node/pkg/ethutil" "github.com/cartesi/rollups-node/pkg/service" - "github.com/ethereum/go-ethereum/ethclient" "github.com/spf13/cobra" ) @@ -22,8 +21,8 @@ var ( logLevel string logColor bool defaultBlockString string - blockchainHttpEndpoint string - blockchainWsEndpoint string + blockchainHTTPEndpoint string + pollInterval string databaseConnection string maxStartupTime string enableInputReader bool @@ -55,10 +54,10 @@ func init() { "Tint the logs (colored output)") cli.AddFlagStrVar(flags, &databaseConnection, "database-connection", config.DATABASE_CONNECTION, "Database connection string in the URL format\n(eg.: 'postgres://user:password@hostname:port/database') ") - cli.AddFlagStrVar(flags, &blockchainHttpEndpoint, "blockchain-http-endpoint", config.BLOCKCHAIN_HTTP_ENDPOINT, + cli.AddFlagStrVar(flags, &blockchainHTTPEndpoint, "blockchain-http-endpoint", config.BLOCKCHAIN_HTTP_ENDPOINT, "Blockchain http endpoint") - cli.AddFlagStrVar(flags, &blockchainWsEndpoint, "blockchain-ws-endpoint", config.BLOCKCHAIN_WS_ENDPOINT, - "Blockchain WS Endpoint") + cli.AddFlagStrVar(flags, &pollInterval, "poll-interval", config.EVM_READER_POLLING_INTERVAL, + "Poll interval") cli.AddFlagStrVar(flags, &maxStartupTime, "max-startup-time", config.MAX_STARTUP_TIME, "Maximum startup time in seconds") cli.AddFlagBoolVar(flags, &enableInputReader, "input-reader", config.FEATURE_INPUT_READER_ENABLED, @@ -89,6 +88,7 @@ func run(cmd *cobra.Command, args []string) { EnableSignalHandling: true, TelemetryCreate: true, TelemetryAddress: cfg.EvmReaderTelemetryAddress, + PollInterval: cfg.EvmReaderPollingInterval, }, Config: *cfg, } @@ -101,19 +101,17 @@ func run(cmd *cobra.Command, args []string) { createInfo.EthClient, err = ethutil.NewEthClient( ctx, cfg.BlockchainHttpEndpoint.Raw(), logger, ethutil.RetryConfig{ - MaxRetries: cfg.BlockchainHttpMaxRetries, - RetryMinWait: cfg.BlockchainHttpRetryMinWait, - RetryMaxWait: cfg.BlockchainHttpRetryMaxWait, + MaxRetries: cfg.BlockchainHttpMaxRetries, + RetryMinWait: cfg.BlockchainHttpRetryMinWait, + RetryMaxWait: cfg.BlockchainHttpRetryMaxWait, + RequestTimeout: cfg.BlockchainHttpRequestTimeout, }, authOpt) cli.CheckErr(logger, err) - wsEndpoint := cfg.BlockchainWsEndpoint.Raw() - createInfo.EthWsClient, err = ethclient.DialContext(ctx, wsEndpoint) - cli.CheckErr(logger, ethutil.RedactEndpointFromError(err, wsEndpoint)) - - createInfo.Repository, err = factory.NewRepositoryFromConnectionString(ctx, cfg.DatabaseConnection.Raw()) + repo, err := factory.NewRepositoryFromConnectionString(ctx, cfg.DatabaseConnection.Raw()) cli.CheckErr(logger, err) - defer createInfo.Repository.Close() + defer repo.Close() + createInfo.Repository = repo readerService, err := evmreader.Create(ctx, &createInfo) cli.CheckErr(logger, err) diff --git a/cmd/cartesi-rollups-node/root/root.go b/cmd/cartesi-rollups-node/root/root.go index f534f1d83..c17bbefd3 100644 --- a/cmd/cartesi-rollups-node/root/root.go +++ b/cmd/cartesi-rollups-node/root/root.go @@ -5,6 +5,7 @@ package root import ( "context" + "time" "github.com/cartesi/rollups-node/internal/cli" "github.com/cartesi/rollups-node/internal/config" @@ -28,9 +29,9 @@ var ( logLevelPrt string logLevelValidator string defaultBlockString string - blockchainHttpEndpoint string - blockchainWsEndpoint string + blockchainHTTPEndpoint string databaseConnection string + evmReaderPollInterval string advancerPollInterval string validatorPollInterval string claimerPollInterval string @@ -41,7 +42,7 @@ var ( enableJsonrpc bool enableSubmission bool enableMachineHashCheck bool - jsonrpcApiAddress string + jsonrpcAPIAddress string inspectAddress string telemetryAddress string machinelogLevel string @@ -64,7 +65,7 @@ func init() { cli.AddFlagStrVarP(flags, &defaultBlockString, "default-block", "d", config.BLOCKCHAIN_DEFAULT_BLOCK, "Default block to be used when fetching new blocks.\nOne of 'latest', 'safe', 'pending', 'finalized'") - cli.AddFlagStrVar(flags, &jsonrpcApiAddress, "jsonrpc-address", config.JSONRPC_API_ADDRESS, + cli.AddFlagStrVar(flags, &jsonrpcAPIAddress, "jsonrpc-address", config.JSONRPC_API_ADDRESS, "Jsonrpc API service address and port") cli.AddFlagStrVar(flags, &inspectAddress, "inspect-address", config.INSPECT_ADDRESS, "Inspect service address and port") @@ -88,10 +89,10 @@ func init() { "Override log level for the validator service (default: inherit --log-level)") cli.AddFlagStrVar(flags, &databaseConnection, "database-connection", config.DATABASE_CONNECTION, "Database connection string in the URL format\n(eg.: 'postgres://user:password@hostname:port/database') ") - cli.AddFlagStrVar(flags, &blockchainHttpEndpoint, "blockchain-http-endpoint", config.BLOCKCHAIN_HTTP_ENDPOINT, + cli.AddFlagStrVar(flags, &blockchainHTTPEndpoint, "blockchain-http-endpoint", config.BLOCKCHAIN_HTTP_ENDPOINT, "Blockchain HTTP endpoint") - cli.AddFlagStrVar(flags, &blockchainWsEndpoint, "blockchain-ws-endpoint", config.BLOCKCHAIN_WS_ENDPOINT, - "Blockchain WS Endpoint") + cli.AddFlagStrVar(flags, &evmReaderPollInterval, "evm-reader-poll-interval", config.EVM_READER_POLLING_INTERVAL, + "EVM reader poll interval") cli.AddFlagStrVar(flags, &advancerPollInterval, "advancer-poll-interval", config.ADVANCER_POLLING_INTERVAL, "Advancer poll interval") cli.AddFlagStrVar(flags, &validatorPollInterval, "validator-poll-interval", config.VALIDATOR_POLLING_INTERVAL, @@ -99,7 +100,7 @@ func init() { cli.AddFlagStrVar(flags, &claimerPollInterval, "claimer-poll-interval", config.CLAIMER_POLLING_INTERVAL, "Claimer poll interval") cli.AddFlagStrVar(flags, &prtPollInterval, "prt-poll-interval", config.PRT_POLLING_INTERVAL, - "Claimer poll interval") + "PRT poll interval") cli.AddFlagStrVar(flags, &maxStartupTime, "max-startup-time", config.MAX_STARTUP_TIME, "Maximum startup time in seconds") cli.AddFlagBoolVar(flags, &enableInputReader, "input-reader", config.FEATURE_INPUT_READER_ENABLED, @@ -128,7 +129,7 @@ func init() { } } -func newEthClient(ctx context.Context, svcName string) (*ethclient.Client, error) { +func newEthClient(ctx context.Context, svcName string, requestTimeout time.Duration) (*ethclient.Client, error) { level := config.ResolveServiceLogLevel(svcName, cfg.LogLevel) logger := service.NewLogger(level, cfg.LogColor).With("service", svcName) @@ -139,9 +140,10 @@ func newEthClient(ctx context.Context, svcName string) (*ethclient.Client, error return ethutil.NewEthClient(ctx, cfg.BlockchainHttpEndpoint.Raw(), logger, ethutil.RetryConfig{ - MaxRetries: cfg.BlockchainHttpMaxRetries, - RetryMinWait: cfg.BlockchainHttpRetryMinWait, - RetryMaxWait: cfg.BlockchainHttpRetryMaxWait, + MaxRetries: cfg.BlockchainHttpMaxRetries, + RetryMinWait: cfg.BlockchainHttpRetryMinWait, + RetryMaxWait: cfg.BlockchainHttpRetryMaxWait, + RequestTimeout: requestTimeout, }, authOpt) } @@ -164,17 +166,13 @@ func run(cmd *cobra.Command, args []string) { createInfo.CreateInfo.Logger = logger var err error - createInfo.ReaderClient, err = newEthClient(ctx, config.ServiceEvmReader) + createInfo.ReaderClient, err = newEthClient(ctx, config.ServiceEvmReader, cfg.BlockchainHttpRequestTimeout) cli.CheckErr(logger, err) - wsEndpoint := cfg.BlockchainWsEndpoint.Raw() - createInfo.ReaderWSClient, err = ethclient.DialContext(ctx, wsEndpoint) - cli.CheckErr(logger, ethutil.RedactEndpointFromError(err, wsEndpoint)) - - createInfo.ClaimerClient, err = newEthClient(ctx, config.ServiceClaimer) + createInfo.ClaimerClient, err = newEthClient(ctx, config.ServiceClaimer, 0) cli.CheckErr(logger, err) - createInfo.PrtClient, err = newEthClient(ctx, config.ServicePrt) + createInfo.PrtClient, err = newEthClient(ctx, config.ServicePrt, 0) cli.CheckErr(logger, err) createInfo.Repository, err = factory.NewRepositoryFromConnectionString(ctx, cfg.DatabaseConnection.Raw()) diff --git a/compose.individual-services.yaml b/compose.individual-services.yaml index f65c54995..daa8f2a35 100644 --- a/compose.individual-services.yaml +++ b/compose.individual-services.yaml @@ -1,8 +1,8 @@ x-env: &env CARTESI_LOG_LEVEL: info CARTESI_BLOCKCHAIN_HTTP_ENDPOINT_FILE: /run/secrets/blockchain_http_endpoint - CARTESI_BLOCKCHAIN_WS_ENDPOINT_FILE: /run/secrets/blockchain_http_endpoint CARTESI_BLOCKCHAIN_ID: 31337 + CARTESI_EVM_READER_POLLING_INTERVAL: 1 CARTESI_CONTRACTS_INPUT_BOX_ADDRESS: 0x346B3df038FE9f8380071eC6514D5a83aD143939 CARTESI_CONTRACTS_AUTHORITY_FACTORY_ADDRESS: 0x3C1FE01c542a88A523FF6847eD1E26176c8C4ED0 CARTESI_CONTRACTS_QUORUM_FACTORY_ADDRESS: 0x1f94009389F408B8D0ADfFcF8BBDCe5552BaCa5F @@ -66,7 +66,6 @@ services: secrets: - auth_mnemonic - blockchain_http_endpoint - - blockchain_ws_endpoint - database_connection environment: <<: *env @@ -155,7 +154,5 @@ secrets: file: test/secrets/auth_mnemonic.txt blockchain_http_endpoint: file: test/secrets/blockchain_http_endpoint.txt - blockchain_ws_endpoint: - file: test/secrets/blockchain_ws_endpoint.txt database_connection: file: test/secrets/database_connection.txt diff --git a/compose.yaml b/compose.yaml index afa0dfdd8..5ebdfe4db 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,8 +1,8 @@ x-env: &env CARTESI_LOG_LEVEL: info CARTESI_BLOCKCHAIN_HTTP_ENDPOINT_FILE: /run/secrets/blockchain_http_endpoint - CARTESI_BLOCKCHAIN_WS_ENDPOINT_FILE: /run/secrets/blockchain_http_endpoint CARTESI_BLOCKCHAIN_ID: 31337 + CARTESI_EVM_READER_POLLING_INTERVAL: 1 CARTESI_CONTRACTS_INPUT_BOX_ADDRESS: 0x346B3df038FE9f8380071eC6514D5a83aD143939 CARTESI_CONTRACTS_AUTHORITY_FACTORY_ADDRESS: 0x3C1FE01c542a88A523FF6847eD1E26176c8C4ED0 CARTESI_CONTRACTS_QUORUM_FACTORY_ADDRESS: 0x1f94009389F408B8D0ADfFcF8BBDCe5552BaCa5F @@ -71,7 +71,6 @@ services: secrets: - auth_mnemonic - blockchain_http_endpoint - - blockchain_ws_endpoint - database_connection environment: <<: *env @@ -87,7 +86,5 @@ secrets: file: test/secrets/auth_mnemonic.txt blockchain_http_endpoint: file: test/secrets/blockchain_http_endpoint.txt - blockchain_ws_endpoint: - file: test/secrets/blockchain_ws_endpoint.txt database_connection: file: test/secrets/database_connection.txt diff --git a/internal/config/generate/Config.toml b/internal/config/generate/Config.toml index 006eec24a..b94b9d8f6 100644 --- a/internal/config/generate/Config.toml +++ b/internal/config/generate/Config.toml @@ -117,6 +117,13 @@ Maximum number of inputs fetched per database query during advance processing an synchronization. Bounds memory usage for applications with large backlogs of unprocessed inputs.""" used-by = ["advancer", "node"] +[rollups.CARTESI_EVM_READER_POLLING_INTERVAL] +default = "12" +go-type = "Duration" +description = """ +Time in seconds to wait before checking for a new block header. The default (12s) is tuned for mainnet. Reduce for faster chains or devnets.""" +used-by = ["evmreader", "node"] + [rollups.CARTESI_ADVANCER_POLLING_INTERVAL] default = "3" go-type = "Duration" @@ -196,13 +203,6 @@ Examples: omit = true used-by = ["evmreader", "claimer", "node"] -[blockchain.CARTESI_BLOCKCHAIN_WS_ENDPOINT] -file = true -go-type = "URL" -description = """ -WebSocket endpoint for the blockchain RPC provider.""" -used-by = ["evmreader", "node"] - [blockchain.CARTESI_BLOCKCHAIN_LEGACY_ENABLED] default = "false" go-type = "bool" @@ -240,26 +240,12 @@ description = """ Maximum wait time in seconds for the exponential backoff retry policy. The delay between retries for HTTP blockchain requests will never exceed this value, regardless of the backoff calculation.""" used-by = ["evmreader", "claimer", "node", "prt"] -[rollups.CARTESI_BLOCKCHAIN_WS_LIVENESS_TIMEOUT] +[rollups.CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT] default = "120" go-type = "Duration" description = """ -Maximum time in seconds to wait for a new block header on the WebSocket subscription before treating the connection as stalled and reconnecting. Handles silent connection drops where no error is delivered. The default (120s) is tuned for mainnet (~12s block time). Reduce for faster chains or devnets.""" -used-by = ["evmreader", "node"] - -[rollups.CARTESI_BLOCKCHAIN_WS_MAX_RETRIES] -default = "4" -go-type = "uint64" -description = """ -Maximum number of consecutive WebSocket subscription failures before the service gives up and exits. A failure is counted only when a subscription attempt produces zero headers before disconnecting. Successful header processing resets the counter.""" -used-by = ["evmreader", "node"] - -[rollups.CARTESI_BLOCKCHAIN_WS_RECONNECT_INTERVAL] -default = "1" -go-type = "Duration" -description = """ -Wait time in seconds between WebSocket subscription reconnection attempts after a connection failure.""" -used-by = ["evmreader", "node"] +Maximum time in seconds allowed for each HTTP request to the blockchain provider. This prevents a single slow or stuck provider request from blocking a service indefinitely.""" +used-by = ["evmreader", "claimer", "node", "prt"] [rollups.CARTESI_BLOCKCHAIN_MAX_BLOCK_RANGE] default = "0" diff --git a/internal/config/generated.go b/internal/config/generated.go index 2037cb541..9576ec7a8 100644 --- a/internal/config/generated.go +++ b/internal/config/generated.go @@ -33,7 +33,6 @@ const ( BLOCKCHAIN_HTTP_ENDPOINT = "CARTESI_BLOCKCHAIN_HTTP_ENDPOINT" BLOCKCHAIN_ID = "CARTESI_BLOCKCHAIN_ID" BLOCKCHAIN_LEGACY_ENABLED = "CARTESI_BLOCKCHAIN_LEGACY_ENABLED" - BLOCKCHAIN_WS_ENDPOINT = "CARTESI_BLOCKCHAIN_WS_ENDPOINT" CONTRACTS_APPLICATION_FACTORY_ADDRESS = "CARTESI_CONTRACTS_APPLICATION_FACTORY_ADDRESS" CONTRACTS_AUTHORITY_FACTORY_ADDRESS = "CARTESI_CONTRACTS_AUTHORITY_FACTORY_ADDRESS" CONTRACTS_DAVE_APP_FACTORY_ADDRESS = "CARTESI_CONTRACTS_DAVE_APP_FACTORY_ADDRESS" @@ -73,14 +72,13 @@ const ( ADVANCER_INPUT_BATCH_SIZE = "CARTESI_ADVANCER_INPUT_BATCH_SIZE" ADVANCER_POLLING_INTERVAL = "CARTESI_ADVANCER_POLLING_INTERVAL" BLOCKCHAIN_HTTP_MAX_RETRIES = "CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES" + BLOCKCHAIN_HTTP_REQUEST_TIMEOUT = "CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT" BLOCKCHAIN_HTTP_RETRY_MAX_WAIT = "CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT" BLOCKCHAIN_HTTP_RETRY_MIN_WAIT = "CARTESI_BLOCKCHAIN_HTTP_RETRY_MIN_WAIT" BLOCKCHAIN_MAX_BLOCK_RANGE = "CARTESI_BLOCKCHAIN_MAX_BLOCK_RANGE" - BLOCKCHAIN_WS_LIVENESS_TIMEOUT = "CARTESI_BLOCKCHAIN_WS_LIVENESS_TIMEOUT" - BLOCKCHAIN_WS_MAX_RETRIES = "CARTESI_BLOCKCHAIN_WS_MAX_RETRIES" - BLOCKCHAIN_WS_RECONNECT_INTERVAL = "CARTESI_BLOCKCHAIN_WS_RECONNECT_INTERVAL" CLAIMER_MAX_ACCEPT_ATTEMPTS = "CARTESI_CLAIMER_MAX_ACCEPT_ATTEMPTS" CLAIMER_POLLING_INTERVAL = "CARTESI_CLAIMER_POLLING_INTERVAL" + EVM_READER_POLLING_INTERVAL = "CARTESI_EVM_READER_POLLING_INTERVAL" MAX_STARTUP_TIME = "CARTESI_MAX_STARTUP_TIME" PRT_POLLING_INTERVAL = "CARTESI_PRT_POLLING_INTERVAL" VALIDATOR_POLLING_INTERVAL = "CARTESI_VALIDATOR_POLLING_INTERVAL" @@ -95,8 +93,6 @@ const ( BLOCKCHAIN_HTTP_AUTHORIZATION_FILE = "CARTESI_BLOCKCHAIN_HTTP_AUTHORIZATION_FILE" BLOCKCHAIN_HTTP_ENDPOINT_FILE = "CARTESI_BLOCKCHAIN_HTTP_ENDPOINT_FILE" - BLOCKCHAIN_WS_ENDPOINT_FILE = "CARTESI_BLOCKCHAIN_WS_ENDPOINT_FILE" - DATABASE_CONNECTION_FILE = "CARTESI_DATABASE_CONNECTION_FILE" ) @@ -125,8 +121,6 @@ func SetDefaults() { viper.SetDefault(BLOCKCHAIN_LEGACY_ENABLED, "false") - // no default for CARTESI_BLOCKCHAIN_WS_ENDPOINT - // no default for CARTESI_CONTRACTS_APPLICATION_FACTORY_ADDRESS // no default for CARTESI_CONTRACTS_AUTHORITY_FACTORY_ADDRESS @@ -205,22 +199,20 @@ func SetDefaults() { viper.SetDefault(BLOCKCHAIN_HTTP_MAX_RETRIES, "4") + viper.SetDefault(BLOCKCHAIN_HTTP_REQUEST_TIMEOUT, "120") + viper.SetDefault(BLOCKCHAIN_HTTP_RETRY_MAX_WAIT, "60") viper.SetDefault(BLOCKCHAIN_HTTP_RETRY_MIN_WAIT, "1") viper.SetDefault(BLOCKCHAIN_MAX_BLOCK_RANGE, "0") - viper.SetDefault(BLOCKCHAIN_WS_LIVENESS_TIMEOUT, "120") - - viper.SetDefault(BLOCKCHAIN_WS_MAX_RETRIES, "4") - - viper.SetDefault(BLOCKCHAIN_WS_RECONNECT_INTERVAL, "1") - viper.SetDefault(CLAIMER_MAX_ACCEPT_ATTEMPTS, "5") viper.SetDefault(CLAIMER_POLLING_INTERVAL, "3") + viper.SetDefault(EVM_READER_POLLING_INTERVAL, "12") + viper.SetDefault(MAX_STARTUP_TIME, "15") viper.SetDefault(PRT_POLLING_INTERVAL, "3") @@ -450,6 +442,9 @@ type ClaimerConfig struct { // Maximum number of retry attempts for HTTP blockchain requests after encountering an error. BlockchainHttpMaxRetries uint64 `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES"` + // Maximum time in seconds allowed for each HTTP request to the blockchain provider. This prevents a single slow or stuck provider request from blocking a service indefinitely. + BlockchainHttpRequestTimeout Duration `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT"` + // Maximum wait time in seconds for the exponential backoff retry policy. The delay between retries for HTTP blockchain requests will never exceed this value, regardless of the backoff calculation. BlockchainHttpRetryMaxWait Duration `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT"` @@ -559,6 +554,13 @@ func LoadClaimerConfig() (*ClaimerConfig, error) { return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES is required for the claimer service: %w", err) } + cfg.BlockchainHttpRequestTimeout, err = GetBlockchainHttpRequestTimeout() + if err != nil && err != ErrNotDefined { + return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT: %w", err) + } else if err == ErrNotDefined { + return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT is required for the claimer service: %w", err) + } + cfg.BlockchainHttpRetryMaxWait, err = GetBlockchainHttpRetryMaxWait() if err != nil && err != ErrNotDefined { return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT: %w", err) @@ -617,9 +619,6 @@ type EvmreaderConfig struct { // An unique identifier representing a blockchain network. BlockchainId uint64 `mapstructure:"CARTESI_BLOCKCHAIN_ID"` - // WebSocket endpoint for the blockchain RPC provider. - BlockchainWsEndpoint URL `mapstructure:"CARTESI_BLOCKCHAIN_WS_ENDPOINT"` - // Postgres endpoint in the 'postgres://user:password@hostname:port/database' format (URL). // // If not set, or set to empty string, will defer the behaviour to the PG driver. @@ -645,6 +644,9 @@ type EvmreaderConfig struct { // Maximum number of retry attempts for HTTP blockchain requests after encountering an error. BlockchainHttpMaxRetries uint64 `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES"` + // Maximum time in seconds allowed for each HTTP request to the blockchain provider. This prevents a single slow or stuck provider request from blocking a service indefinitely. + BlockchainHttpRequestTimeout Duration `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT"` + // Maximum wait time in seconds for the exponential backoff retry policy. The delay between retries for HTTP blockchain requests will never exceed this value, regardless of the backoff calculation. BlockchainHttpRetryMaxWait Duration `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT"` @@ -654,14 +656,8 @@ type EvmreaderConfig struct { // Maximum number of blocks in a single query to the provider. Queries with larger ranges will be broken into multiple smaller queries. Zero for unlimited. BlockchainMaxBlockRange uint64 `mapstructure:"CARTESI_BLOCKCHAIN_MAX_BLOCK_RANGE"` - // Maximum time in seconds to wait for a new block header on the WebSocket subscription before treating the connection as stalled and reconnecting. Handles silent connection drops where no error is delivered. The default (120s) is tuned for mainnet (~12s block time). Reduce for faster chains or devnets. - BlockchainWsLivenessTimeout Duration `mapstructure:"CARTESI_BLOCKCHAIN_WS_LIVENESS_TIMEOUT"` - - // Maximum number of consecutive WebSocket subscription failures before the service gives up and exits. A failure is counted only when a subscription attempt produces zero headers before disconnecting. Successful header processing resets the counter. - BlockchainWsMaxRetries uint64 `mapstructure:"CARTESI_BLOCKCHAIN_WS_MAX_RETRIES"` - - // Wait time in seconds between WebSocket subscription reconnection attempts after a connection failure. - BlockchainWsReconnectInterval Duration `mapstructure:"CARTESI_BLOCKCHAIN_WS_RECONNECT_INTERVAL"` + // Time in seconds to wait before checking for a new block header. The default (12s) is tuned for mainnet. Reduce for faster chains or devnets. + EvmReaderPollingInterval Duration `mapstructure:"CARTESI_EVM_READER_POLLING_INTERVAL"` // How many seconds the node expects services take initializing before aborting. MaxStartupTime Duration `mapstructure:"CARTESI_MAX_STARTUP_TIME"` @@ -702,13 +698,6 @@ func LoadEvmreaderConfig() (*EvmreaderConfig, error) { return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_ID is required for the evmreader service: %w", err) } - cfg.BlockchainWsEndpoint, err = GetBlockchainWsEndpoint() - if err != nil && err != ErrNotDefined { - return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_WS_ENDPOINT: %w", err) - } else if err == ErrNotDefined { - return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_WS_ENDPOINT is required for the evmreader service: %w", err) - } - cfg.DatabaseConnection, err = GetDatabaseConnection() if err != nil && err != ErrNotDefined { return nil, fmt.Errorf("failed to get CARTESI_DATABASE_CONNECTION: %w", err) @@ -751,6 +740,13 @@ func LoadEvmreaderConfig() (*EvmreaderConfig, error) { return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES is required for the evmreader service: %w", err) } + cfg.BlockchainHttpRequestTimeout, err = GetBlockchainHttpRequestTimeout() + if err != nil && err != ErrNotDefined { + return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT: %w", err) + } else if err == ErrNotDefined { + return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT is required for the evmreader service: %w", err) + } + cfg.BlockchainHttpRetryMaxWait, err = GetBlockchainHttpRetryMaxWait() if err != nil && err != ErrNotDefined { return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT: %w", err) @@ -772,25 +768,11 @@ func LoadEvmreaderConfig() (*EvmreaderConfig, error) { return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_MAX_BLOCK_RANGE is required for the evmreader service: %w", err) } - cfg.BlockchainWsLivenessTimeout, err = GetBlockchainWsLivenessTimeout() - if err != nil && err != ErrNotDefined { - return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_WS_LIVENESS_TIMEOUT: %w", err) - } else if err == ErrNotDefined { - return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_WS_LIVENESS_TIMEOUT is required for the evmreader service: %w", err) - } - - cfg.BlockchainWsMaxRetries, err = GetBlockchainWsMaxRetries() - if err != nil && err != ErrNotDefined { - return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_WS_MAX_RETRIES: %w", err) - } else if err == ErrNotDefined { - return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_WS_MAX_RETRIES is required for the evmreader service: %w", err) - } - - cfg.BlockchainWsReconnectInterval, err = GetBlockchainWsReconnectInterval() + cfg.EvmReaderPollingInterval, err = GetEvmReaderPollingInterval() if err != nil && err != ErrNotDefined { - return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_WS_RECONNECT_INTERVAL: %w", err) + return nil, fmt.Errorf("failed to get CARTESI_EVM_READER_POLLING_INTERVAL: %w", err) } else if err == ErrNotDefined { - return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_WS_RECONNECT_INTERVAL is required for the evmreader service: %w", err) + return nil, fmt.Errorf("CARTESI_EVM_READER_POLLING_INTERVAL is required for the evmreader service: %w", err) } cfg.MaxStartupTime, err = GetMaxStartupTime() @@ -933,9 +915,6 @@ type NodeConfig struct { // (instead of EIP-1559). BlockchainLegacyEnabled bool `mapstructure:"CARTESI_BLOCKCHAIN_LEGACY_ENABLED"` - // WebSocket endpoint for the blockchain RPC provider. - BlockchainWsEndpoint URL `mapstructure:"CARTESI_BLOCKCHAIN_WS_ENDPOINT"` - // Postgres endpoint in the 'postgres://user:password@hostname:port/database' format (URL). // // If not set, or set to empty string, will defer the behaviour to the PG driver. @@ -1014,6 +993,9 @@ type NodeConfig struct { // Maximum number of retry attempts for HTTP blockchain requests after encountering an error. BlockchainHttpMaxRetries uint64 `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES"` + // Maximum time in seconds allowed for each HTTP request to the blockchain provider. This prevents a single slow or stuck provider request from blocking a service indefinitely. + BlockchainHttpRequestTimeout Duration `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT"` + // Maximum wait time in seconds for the exponential backoff retry policy. The delay between retries for HTTP blockchain requests will never exceed this value, regardless of the backoff calculation. BlockchainHttpRetryMaxWait Duration `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT"` @@ -1023,15 +1005,6 @@ type NodeConfig struct { // Maximum number of blocks in a single query to the provider. Queries with larger ranges will be broken into multiple smaller queries. Zero for unlimited. BlockchainMaxBlockRange uint64 `mapstructure:"CARTESI_BLOCKCHAIN_MAX_BLOCK_RANGE"` - // Maximum time in seconds to wait for a new block header on the WebSocket subscription before treating the connection as stalled and reconnecting. Handles silent connection drops where no error is delivered. The default (120s) is tuned for mainnet (~12s block time). Reduce for faster chains or devnets. - BlockchainWsLivenessTimeout Duration `mapstructure:"CARTESI_BLOCKCHAIN_WS_LIVENESS_TIMEOUT"` - - // Maximum number of consecutive WebSocket subscription failures before the service gives up and exits. A failure is counted only when a subscription attempt produces zero headers before disconnecting. Successful header processing resets the counter. - BlockchainWsMaxRetries uint64 `mapstructure:"CARTESI_BLOCKCHAIN_WS_MAX_RETRIES"` - - // Wait time in seconds between WebSocket subscription reconnection attempts after a connection failure. - BlockchainWsReconnectInterval Duration `mapstructure:"CARTESI_BLOCKCHAIN_WS_RECONNECT_INTERVAL"` - // Maximum number of consecutive acceptClaim attempts per (application, epoch) before // the application is marked FAILED. Bounds wasted gas on a persistently-reverting chain // (gas misconfig, nonce gap, signer not authorised, fork inconsistency). The default @@ -1044,6 +1017,9 @@ type NodeConfig struct { // How many seconds the node will wait before querying the database for new claims. ClaimerPollingInterval Duration `mapstructure:"CARTESI_CLAIMER_POLLING_INTERVAL"` + // Time in seconds to wait before checking for a new block header. The default (12s) is tuned for mainnet. Reduce for faster chains or devnets. + EvmReaderPollingInterval Duration `mapstructure:"CARTESI_EVM_READER_POLLING_INTERVAL"` + // How many seconds the node expects services take initializing before aborting. MaxStartupTime Duration `mapstructure:"CARTESI_MAX_STARTUP_TIME"` @@ -1099,13 +1075,6 @@ func LoadNodeConfig() (*NodeConfig, error) { return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_LEGACY_ENABLED is required for the node service: %w", err) } - cfg.BlockchainWsEndpoint, err = GetBlockchainWsEndpoint() - if err != nil && err != ErrNotDefined { - return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_WS_ENDPOINT: %w", err) - } else if err == ErrNotDefined { - return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_WS_ENDPOINT is required for the node service: %w", err) - } - cfg.DatabaseConnection, err = GetDatabaseConnection() if err != nil && err != ErrNotDefined { return nil, fmt.Errorf("failed to get CARTESI_DATABASE_CONNECTION: %w", err) @@ -1239,6 +1208,13 @@ func LoadNodeConfig() (*NodeConfig, error) { return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES is required for the node service: %w", err) } + cfg.BlockchainHttpRequestTimeout, err = GetBlockchainHttpRequestTimeout() + if err != nil && err != ErrNotDefined { + return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT: %w", err) + } else if err == ErrNotDefined { + return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT is required for the node service: %w", err) + } + cfg.BlockchainHttpRetryMaxWait, err = GetBlockchainHttpRetryMaxWait() if err != nil && err != ErrNotDefined { return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT: %w", err) @@ -1260,27 +1236,6 @@ func LoadNodeConfig() (*NodeConfig, error) { return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_MAX_BLOCK_RANGE is required for the node service: %w", err) } - cfg.BlockchainWsLivenessTimeout, err = GetBlockchainWsLivenessTimeout() - if err != nil && err != ErrNotDefined { - return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_WS_LIVENESS_TIMEOUT: %w", err) - } else if err == ErrNotDefined { - return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_WS_LIVENESS_TIMEOUT is required for the node service: %w", err) - } - - cfg.BlockchainWsMaxRetries, err = GetBlockchainWsMaxRetries() - if err != nil && err != ErrNotDefined { - return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_WS_MAX_RETRIES: %w", err) - } else if err == ErrNotDefined { - return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_WS_MAX_RETRIES is required for the node service: %w", err) - } - - cfg.BlockchainWsReconnectInterval, err = GetBlockchainWsReconnectInterval() - if err != nil && err != ErrNotDefined { - return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_WS_RECONNECT_INTERVAL: %w", err) - } else if err == ErrNotDefined { - return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_WS_RECONNECT_INTERVAL is required for the node service: %w", err) - } - cfg.ClaimerMaxAcceptAttempts, err = GetClaimerMaxAcceptAttempts() if err != nil && err != ErrNotDefined { return nil, fmt.Errorf("failed to get CARTESI_CLAIMER_MAX_ACCEPT_ATTEMPTS: %w", err) @@ -1295,6 +1250,13 @@ func LoadNodeConfig() (*NodeConfig, error) { return nil, fmt.Errorf("CARTESI_CLAIMER_POLLING_INTERVAL is required for the node service: %w", err) } + cfg.EvmReaderPollingInterval, err = GetEvmReaderPollingInterval() + if err != nil && err != ErrNotDefined { + return nil, fmt.Errorf("failed to get CARTESI_EVM_READER_POLLING_INTERVAL: %w", err) + } else if err == ErrNotDefined { + return nil, fmt.Errorf("CARTESI_EVM_READER_POLLING_INTERVAL is required for the node service: %w", err) + } + cfg.MaxStartupTime, err = GetMaxStartupTime() if err != nil && err != ErrNotDefined { return nil, fmt.Errorf("failed to get CARTESI_MAX_STARTUP_TIME: %w", err) @@ -1368,6 +1330,9 @@ type PrtConfig struct { // Maximum number of retry attempts for HTTP blockchain requests after encountering an error. BlockchainHttpMaxRetries uint64 `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES"` + // Maximum time in seconds allowed for each HTTP request to the blockchain provider. This prevents a single slow or stuck provider request from blocking a service indefinitely. + BlockchainHttpRequestTimeout Duration `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT"` + // Maximum wait time in seconds for the exponential backoff retry policy. The delay between retries for HTTP blockchain requests will never exceed this value, regardless of the backoff calculation. BlockchainHttpRetryMaxWait Duration `mapstructure:"CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT"` @@ -1471,6 +1436,13 @@ func LoadPrtConfig() (*PrtConfig, error) { return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES is required for the prt service: %w", err) } + cfg.BlockchainHttpRequestTimeout, err = GetBlockchainHttpRequestTimeout() + if err != nil && err != ErrNotDefined { + return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT: %w", err) + } else if err == ErrNotDefined { + return nil, fmt.Errorf("CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT is required for the prt service: %w", err) + } + cfg.BlockchainHttpRetryMaxWait, err = GetBlockchainHttpRetryMaxWait() if err != nil && err != ErrNotDefined { return nil, fmt.Errorf("failed to get CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT: %w", err) @@ -1635,6 +1607,7 @@ func (c *NodeConfig) ToClaimerConfig() *ClaimerConfig { LogColor: c.LogColor, LogLevel: c.LogLevel, BlockchainHttpMaxRetries: c.BlockchainHttpMaxRetries, + BlockchainHttpRequestTimeout: c.BlockchainHttpRequestTimeout, BlockchainHttpRetryMaxWait: c.BlockchainHttpRetryMaxWait, BlockchainHttpRetryMinWait: c.BlockchainHttpRetryMinWait, BlockchainMaxBlockRange: c.BlockchainMaxBlockRange, @@ -1647,22 +1620,20 @@ func (c *NodeConfig) ToClaimerConfig() *ClaimerConfig { // ToEvmreaderConfig converts a NodeConfig to a EvmreaderConfig. func (c *NodeConfig) ToEvmreaderConfig() *EvmreaderConfig { return &EvmreaderConfig{ - BlockchainDefaultBlock: c.BlockchainDefaultBlock, - BlockchainHttpEndpoint: c.BlockchainHttpEndpoint, - BlockchainId: c.BlockchainId, - BlockchainWsEndpoint: c.BlockchainWsEndpoint, - DatabaseConnection: c.DatabaseConnection, - FeatureInputReaderEnabled: c.FeatureInputReaderEnabled, - LogColor: c.LogColor, - LogLevel: c.LogLevel, - BlockchainHttpMaxRetries: c.BlockchainHttpMaxRetries, - BlockchainHttpRetryMaxWait: c.BlockchainHttpRetryMaxWait, - BlockchainHttpRetryMinWait: c.BlockchainHttpRetryMinWait, - BlockchainMaxBlockRange: c.BlockchainMaxBlockRange, - BlockchainWsLivenessTimeout: c.BlockchainWsLivenessTimeout, - BlockchainWsMaxRetries: c.BlockchainWsMaxRetries, - BlockchainWsReconnectInterval: c.BlockchainWsReconnectInterval, - MaxStartupTime: c.MaxStartupTime, + BlockchainDefaultBlock: c.BlockchainDefaultBlock, + BlockchainHttpEndpoint: c.BlockchainHttpEndpoint, + BlockchainId: c.BlockchainId, + DatabaseConnection: c.DatabaseConnection, + FeatureInputReaderEnabled: c.FeatureInputReaderEnabled, + LogColor: c.LogColor, + LogLevel: c.LogLevel, + BlockchainHttpMaxRetries: c.BlockchainHttpMaxRetries, + BlockchainHttpRequestTimeout: c.BlockchainHttpRequestTimeout, + BlockchainHttpRetryMaxWait: c.BlockchainHttpRetryMaxWait, + BlockchainHttpRetryMinWait: c.BlockchainHttpRetryMinWait, + BlockchainMaxBlockRange: c.BlockchainMaxBlockRange, + EvmReaderPollingInterval: c.EvmReaderPollingInterval, + MaxStartupTime: c.MaxStartupTime, } } @@ -1691,6 +1662,7 @@ func (c *NodeConfig) ToPrtConfig() *PrtConfig { LogColor: c.LogColor, LogLevel: c.LogLevel, BlockchainHttpMaxRetries: c.BlockchainHttpMaxRetries, + BlockchainHttpRequestTimeout: c.BlockchainHttpRequestTimeout, BlockchainHttpRetryMaxWait: c.BlockchainHttpRetryMaxWait, BlockchainHttpRetryMinWait: c.BlockchainHttpRetryMinWait, BlockchainMaxBlockRange: c.BlockchainMaxBlockRange, @@ -1886,27 +1858,6 @@ func GetBlockchainLegacyEnabled() (bool, error) { return notDefinedbool(), fmt.Errorf("%s: %w", BLOCKCHAIN_LEGACY_ENABLED, ErrNotDefined) } -// GetBlockchainWsEndpoint returns the value for the environment variable CARTESI_BLOCKCHAIN_WS_ENDPOINT. -func GetBlockchainWsEndpoint() (URL, error) { - s := viper.GetString(BLOCKCHAIN_WS_ENDPOINT) - if s == "" { - filename := viper.GetString(BLOCKCHAIN_WS_ENDPOINT_FILE) - contents, err := os.ReadFile(filename) - if err != nil { - return notDefinedURL(), fmt.Errorf("failed to parse %s: %w", BLOCKCHAIN_WS_ENDPOINT_FILE, err) - } - s = strings.TrimSpace(string(contents)) - } - if s != "" { - v, err := toURL(s) - if err != nil { - return v, fmt.Errorf("failed to parse %s: %w", BLOCKCHAIN_WS_ENDPOINT, err) - } - return v, nil - } - return notDefinedURL(), fmt.Errorf("%s: %w", BLOCKCHAIN_WS_ENDPOINT, ErrNotDefined) -} - // GetContractsApplicationFactoryAddress returns the value for the environment variable CARTESI_CONTRACTS_APPLICATION_FACTORY_ADDRESS. func GetContractsApplicationFactoryAddress() (Address, error) { s := viper.GetString(CONTRACTS_APPLICATION_FACTORY_ADDRESS) @@ -2422,6 +2373,19 @@ func GetBlockchainHttpMaxRetries() (uint64, error) { return notDefineduint64(), fmt.Errorf("%s: %w", BLOCKCHAIN_HTTP_MAX_RETRIES, ErrNotDefined) } +// GetBlockchainHttpRequestTimeout returns the value for the environment variable CARTESI_BLOCKCHAIN_HTTP_REQUEST_TIMEOUT. +func GetBlockchainHttpRequestTimeout() (Duration, error) { + s := viper.GetString(BLOCKCHAIN_HTTP_REQUEST_TIMEOUT) + if s != "" { + v, err := toDuration(s) + if err != nil { + return v, fmt.Errorf("failed to parse %s: %w", BLOCKCHAIN_HTTP_REQUEST_TIMEOUT, err) + } + return v, nil + } + return notDefinedDuration(), fmt.Errorf("%s: %w", BLOCKCHAIN_HTTP_REQUEST_TIMEOUT, ErrNotDefined) +} + // GetBlockchainHttpRetryMaxWait returns the value for the environment variable CARTESI_BLOCKCHAIN_HTTP_RETRY_MAX_WAIT. func GetBlockchainHttpRetryMaxWait() (Duration, error) { s := viper.GetString(BLOCKCHAIN_HTTP_RETRY_MAX_WAIT) @@ -2461,45 +2425,6 @@ func GetBlockchainMaxBlockRange() (uint64, error) { return notDefineduint64(), fmt.Errorf("%s: %w", BLOCKCHAIN_MAX_BLOCK_RANGE, ErrNotDefined) } -// GetBlockchainWsLivenessTimeout returns the value for the environment variable CARTESI_BLOCKCHAIN_WS_LIVENESS_TIMEOUT. -func GetBlockchainWsLivenessTimeout() (Duration, error) { - s := viper.GetString(BLOCKCHAIN_WS_LIVENESS_TIMEOUT) - if s != "" { - v, err := toDuration(s) - if err != nil { - return v, fmt.Errorf("failed to parse %s: %w", BLOCKCHAIN_WS_LIVENESS_TIMEOUT, err) - } - return v, nil - } - return notDefinedDuration(), fmt.Errorf("%s: %w", BLOCKCHAIN_WS_LIVENESS_TIMEOUT, ErrNotDefined) -} - -// GetBlockchainWsMaxRetries returns the value for the environment variable CARTESI_BLOCKCHAIN_WS_MAX_RETRIES. -func GetBlockchainWsMaxRetries() (uint64, error) { - s := viper.GetString(BLOCKCHAIN_WS_MAX_RETRIES) - if s != "" { - v, err := toUint64(s) - if err != nil { - return v, fmt.Errorf("failed to parse %s: %w", BLOCKCHAIN_WS_MAX_RETRIES, err) - } - return v, nil - } - return notDefineduint64(), fmt.Errorf("%s: %w", BLOCKCHAIN_WS_MAX_RETRIES, ErrNotDefined) -} - -// GetBlockchainWsReconnectInterval returns the value for the environment variable CARTESI_BLOCKCHAIN_WS_RECONNECT_INTERVAL. -func GetBlockchainWsReconnectInterval() (Duration, error) { - s := viper.GetString(BLOCKCHAIN_WS_RECONNECT_INTERVAL) - if s != "" { - v, err := toDuration(s) - if err != nil { - return v, fmt.Errorf("failed to parse %s: %w", BLOCKCHAIN_WS_RECONNECT_INTERVAL, err) - } - return v, nil - } - return notDefinedDuration(), fmt.Errorf("%s: %w", BLOCKCHAIN_WS_RECONNECT_INTERVAL, ErrNotDefined) -} - // GetClaimerMaxAcceptAttempts returns the value for the environment variable CARTESI_CLAIMER_MAX_ACCEPT_ATTEMPTS. func GetClaimerMaxAcceptAttempts() (uint64, error) { s := viper.GetString(CLAIMER_MAX_ACCEPT_ATTEMPTS) @@ -2526,6 +2451,19 @@ func GetClaimerPollingInterval() (Duration, error) { return notDefinedDuration(), fmt.Errorf("%s: %w", CLAIMER_POLLING_INTERVAL, ErrNotDefined) } +// GetEvmReaderPollingInterval returns the value for the environment variable CARTESI_EVM_READER_POLLING_INTERVAL. +func GetEvmReaderPollingInterval() (Duration, error) { + s := viper.GetString(EVM_READER_POLLING_INTERVAL) + if s != "" { + v, err := toDuration(s) + if err != nil { + return v, fmt.Errorf("failed to parse %s: %w", EVM_READER_POLLING_INTERVAL, err) + } + return v, nil + } + return notDefinedDuration(), fmt.Errorf("%s: %w", EVM_READER_POLLING_INTERVAL, ErrNotDefined) +} + // GetMaxStartupTime returns the value for the environment variable CARTESI_MAX_STARTUP_TIME. func GetMaxStartupTime() (Duration, error) { s := viper.GetString(MAX_STARTUP_TIME) diff --git a/internal/evmreader/dave_consensus_test.go b/internal/evmreader/dave_consensus_test.go index e154b6fb4..e15de64b6 100644 --- a/internal/evmreader/dave_consensus_test.go +++ b/internal/evmreader/dave_consensus_test.go @@ -361,6 +361,82 @@ func (s *SealedEpochsSuite) TestOpenEpochExistingEpochAccumulatesInputs() { s.Require().Equal(uint64(4), storedInputs[0].Index) } +func (s *SealedEpochsSuite) TestOpenEpochScansBoundaryBlockAfterIntraTickCursorAdvance() { + const ( + previousInputCursor uint64 = 100 + mostRecentBlock uint64 = 200 + ) + + app := appContracts{ + application: &Application{ + ID: 1, + Name: "test-app", + IApplicationAddress: app1Addr, + IConsensusAddress: consensusAddr, + IInputBoxAddress: inputBoxAddr, + IInputBoxBlock: 10, + LastInputCheckBlock: previousInputCursor, + }, + inputSource: s.inputBox, + daveConsensus: s.dave, + } + + s.repository.On("GetLastNonOpenEpoch", mock.Anything, mock.Anything). + Return(&Epoch{ + Index: 0, + FirstBlock: 10, + LastBlock: mostRecentBlock, + InputIndexLowerBound: 0, + InputIndexUpperBound: 4, + }, nil) + + s.repository.On("GetEpoch", mock.Anything, mock.Anything, uint64(1)). + Return(nil, nil) + + s.repository.On("GetEventLastCheckBlock", + mock.Anything, int64(1), MonitoredEvent_InputAdded, + ).Return(mostRecentBlock, nil) + + s.repository.On("GetNumberOfInputs", mock.Anything, mock.Anything). + Return(uint64(4), nil) + + s.inputBox.On("GetNumberOfInputs", + mock.MatchedBy(func(opts *bind.CallOpts) bool { + return opts.BlockNumber.Uint64() == mostRecentBlock + }), + mock.Anything, + ).Return(big.NewInt(5), nil) + + s.inputBox.On("RetrieveInputs", + mock.MatchedBy(func(opts *bind.FilterOpts) bool { return opts.Start == mostRecentBlock }), + mock.Anything, mock.Anything, + ).Return([]iinputbox.IInputBoxInputAdded{makeInputEvent(app1Addr, 4, mostRecentBlock)}, nil) + + var storedEpoch *Epoch + var storedInputs []*Input + s.repository.On("CreateEpochsAndInputs", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, + ).Run(func(args mock.Arguments) { + epochInputMap := args.Get(2).(map[*Epoch][]*Input) + for epoch, inputs := range epochInputMap { + storedEpoch = epoch + storedInputs = inputs + } + }).Return(nil) + + err := s.evmReader.processApplicationOpenEpoch(s.ctx, app, mostRecentBlock) + s.Require().NoError(err) + + s.Require().NotNil(storedEpoch) + s.Require().Equal(uint64(1), storedEpoch.Index) + s.Require().Equal(mostRecentBlock, storedEpoch.FirstBlock) + s.Require().Equal(mostRecentBlock, storedEpoch.LastBlock) + s.Require().Equal(uint64(5), storedEpoch.InputIndexUpperBound) + s.Require().Len(storedInputs, 1) + s.Require().Equal(uint64(4), storedInputs[0].Index) + s.Require().Equal(mostRecentBlock, storedInputs[0].BlockNumber) +} + // --- Test 4: Multiple EpochSealed events in one block --- // When two epochs are sealed in the same block, FindTransitions sees a single // transition, but RetrieveSealedEpochs returns both events and each is processed. diff --git a/internal/evmreader/edge_cases_test.go b/internal/evmreader/edge_cases_test.go index 4edbbe116..cf30f8a4c 100644 --- a/internal/evmreader/edge_cases_test.go +++ b/internal/evmreader/edge_cases_test.go @@ -4,11 +4,9 @@ package evmreader import ( - "context" "errors" "math" "math/big" - "time" . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/contracts/idaveconsensus" @@ -313,8 +311,11 @@ func (s *SealedEpochsSuite) TestFetchSealedEpochInputsRetrieveFailure() { // When an application's consensus address changes between block headers, // the adapter cache must be invalidated and adapters recreated. func (s *EvmReaderSuite) TestAdapterCacheInvalidationOnConfigChange() { - ws := &FakeWSEthClient{} - s.evmReader.wsClient = ws + // Fire 3 headers (block numbers below 999 so output check skips) + s.client.EnqueueNewHead(100).Once() + s.client.EnqueueNewHead(101).Once() + called := newCallNotification(s.client.EnqueueNewHead(102)) + s.evmReader.inputReaderEnabled = false s.evmReader.defaultBlock = DefaultBlock_Latest @@ -365,24 +366,18 @@ func (s *EvmReaderSuite) TestAdapterCacheInvalidationOnConfigChange() { factory.On("CreateAdapters", mock.Anything). Return(newMockApplicationContract(), newMockInputBox(), nil, nil) s.evmReader.adapterFactory = factory + s.evmReader.resolver = newApplicationAdapterResolver(s.evmReader.Logger, factory) - ctx, cancel := context.WithCancel(s.ctx) - ready := make(chan struct{}, 1) - errCh := make(chan error, 1) + done := make(chan struct{}) go func() { - _, err := s.evmReader.watchForNewBlocks(ctx, ready) - errCh <- err + err := s.evmReader.Serve() + s.Require().NoError(err) + close(done) }() - <-ready - // Fire 3 headers (block numbers below 999 so output check skips) - ws.fireNewHead(&types.Header{Number: big.NewInt(100)}) - ws.fireNewHead(&types.Header{Number: big.NewInt(101)}) - ws.fireNewHead(&types.Header{Number: big.NewInt(102)}) - ws.flushHeaders() - - cancel() - <-errCh + <-called + s.cancel() + <-done // CreateAdapters called twice: // Header 1: cache miss → create @@ -390,52 +385,3 @@ func (s *EvmReaderSuite) TestAdapterCacheInvalidationOnConfigChange() { // Header 3: cache hit → skip factory.AssertNumberOfCalls(s.T(), "CreateAdapters", 2) } - -// --- #20: Liveness timer fires correctly after headers stop --- -// After processing headers, if no new header arrives within the liveness -// timeout, watchForNewBlocks returns a SubscriptionError. This also exercises -// the double-select fix: headers that arrive simultaneously with the timer -// are picked up by the inner non-blocking receive. -func (s *EvmReaderSuite) TestLivenessTimerFiresAfterHeadersStop() { - ws := &FakeWSEthClient{} - s.evmReader.wsClient = ws - s.evmReader.wsLivenessTimeout = 50 * time.Millisecond - s.evmReader.inputReaderEnabled = false - s.evmReader.defaultBlock = DefaultBlock_Latest - - repo := newMockRepository() - repo.On("ListApplications", mock.Anything, mock.Anything, mock.Anything, false). - Return([]*Application{}, uint64(0), nil) - s.evmReader.repository = repo - - ctx, cancel := context.WithCancel(s.ctx) - defer cancel() - ready := make(chan struct{}, 1) - - type watchResult struct { - headersProcessed uint64 - err error - } - resultCh := make(chan watchResult, 1) - go func() { - hp, err := s.evmReader.watchForNewBlocks(ctx, ready) - resultCh <- watchResult{hp, err} - }() - <-ready - - // Fire 3 headers, then stop sending - ws.fireNewHead(&types.Header{Number: big.NewInt(100)}) - ws.fireNewHead(&types.Header{Number: big.NewInt(101)}) - ws.fireNewHead(&types.Header{Number: big.NewInt(102)}) - - // Liveness timer should fire ~50ms after last header - select { - case r := <-resultCh: - s.Require().Equal(uint64(3), r.headersProcessed) - var subErr *SubscriptionError - s.Require().ErrorAs(r.err, &subErr) - s.Require().ErrorContains(r.err, "no new block header received") - case <-time.After(5 * time.Second): - s.FailNow("watchForNewBlocks didn't return after liveness timeout") - } -} diff --git a/internal/evmreader/evmreader.go b/internal/evmreader/evmreader.go index 1c1b749e5..14622fe8a 100644 --- a/internal/evmreader/evmreader.go +++ b/internal/evmreader/evmreader.go @@ -10,7 +10,6 @@ import ( "math/big" "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" @@ -79,22 +78,9 @@ type EvmReaderRepository interface { // EthClientInterface defines the methods we need from ethclient.Client type EthClientInterface interface { HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) - SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) ChainID(ctx context.Context) (*big.Int, error) } -type SubscriptionError struct { - Cause error -} - -func (e *SubscriptionError) Error() string { - return fmt.Sprintf("Subscription error : %v", e.Cause) -} - -func (e *SubscriptionError) Unwrap() error { - return e.Cause -} - // Internal struct to hold application and it's contracts together type appContracts struct { application *Application @@ -103,46 +89,6 @@ type appContracts struct { daveConsensus DaveConsensusAdapter } -func (r *Service) Run(ctx context.Context, ready chan struct{}) error { - var consecutiveFailures uint64 - for { - headersProcessed, err := r.watchForNewBlocks(ctx, ready) - if ctx.Err() != nil { - return ctx.Err() - } - r.Logger.Error("watchForNewBlocks exited", - "error", err, "headers_processed", headersProcessed) - - // Only reset the retry counter if the connection actually processed - // at least one block header. This prevents infinite retries when the - // subscription connects but immediately fails before doing useful work. - if headersProcessed > 0 { - consecutiveFailures = 0 - } else { - consecutiveFailures++ - } - - if consecutiveFailures > r.blockchainMaxRetries { - r.Logger.Error("Max consecutive failures reached. Exiting", - "consecutive_failures", consecutiveFailures, - "max_retries", r.blockchainMaxRetries, - ) - return err - } - - r.Logger.Info("Restarting subscription", - "consecutive_failures", consecutiveFailures, - "max_retries", r.blockchainMaxRetries, - ) - - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(r.blockchainSubscriptionRetryInterval): - } - } -} - func listEnabledApplications(ctx context.Context, er EvmReaderRepository) ([]*Application, uint64, error) { return er.ListApplications(ctx, repository.ApplicationFilter{Enabled: new(true)}, repository.Pagination{}, false) } @@ -155,93 +101,30 @@ func (r *Service) setApplicationCorrupted(ctx context.Context, app *Application, return appstatus.SetCorruptedf(ctx, r.Logger, r.repository, app, reasonFmt, args...) } -// watchForNewBlocks subscribes to new block headers and processes them. -// Returns the number of headers processed and any error that caused it to stop. -func (r *Service) watchForNewBlocks(ctx context.Context, ready chan<- struct{}) (uint64, error) { - headers := make(chan *types.Header) - sub, err := r.wsClient.SubscribeNewHead(ctx, headers) +func (r *Service) Tick() []error { + blockNumber, err := r.fetchMostRecentHeader(r.Context, r.defaultBlock) if err != nil { - return 0, fmt.Errorf("could not start subscription: %w", err) - } - r.Logger.Info("Subscribed to new block events") - select { - case ready <- struct{}{}: - default: - } - defer sub.Unsubscribe() - - liveness := time.NewTimer(r.wsLivenessTimeout) - defer liveness.Stop() - - resolver := newApplicationAdapterResolver(r.Logger, r.adapterFactory) - var headersProcessed uint64 - for { - var header *types.Header - select { - case <-ctx.Done(): - return headersProcessed, ctx.Err() - case err := <-sub.Err(): - if err == nil { - err = errors.New("subscription closed unexpectedly") - } - return headersProcessed, &SubscriptionError{Cause: err} - case <-liveness.C: - // Before declaring stalled, check if a header arrived simultaneously. - // Go's select picks randomly when multiple cases are ready, so the - // liveness timer may win even though a header is available. - select { - case header = <-headers: - default: - return headersProcessed, &SubscriptionError{ - Cause: fmt.Errorf( - "no new block header received for %s, assuming stalled connection", - r.wsLivenessTimeout, - ), - } - } - case header = <-headers: + if errors.Is(err, context.Canceled) { + return nil } - - if header == nil { - continue - } - headersProcessed++ - liveness.Reset(r.wsLivenessTimeout) - - // Every time a new block arrives - r.Logger.Debug("New block header received", - "blockNumber", header.Number, "blockHash", header.Hash()) - - r.processBlockCandidate(ctx, header.Number.Uint64(), resolver) + return []error{err} } -} -func (r *Service) resolveScanBlock( - ctx context.Context, - observedBlock uint64, -) (uint64, error) { - if r.defaultBlock == DefaultBlock_Latest { - return observedBlock, nil + if blockNumber != r.lastBlockNumber.Load() { + r.lastBlockNumber.Store(blockNumber) + r.Logger.Info("Got new block header", "block", blockNumber, "policy", r.defaultBlock) } - mostRecentHeader, err := r.fetchMostRecentHeader(ctx, r.defaultBlock) - if err != nil { - return 0, fmt.Errorf("fetch most recent block for default block %s: %w", r.defaultBlock, err) - } - blockNumber := mostRecentHeader.Number.Uint64() - - r.Logger.Debug(fmt.Sprintf( - "Using block %d and not %d because of commitment policy: %s", - blockNumber, - observedBlock, - r.defaultBlock, - )) - return blockNumber, nil + // Scans run under the service context: cancellable on shutdown, free to take + // as long as catch-up needs. Per-request bounds live on the HTTP transport. + r.processBlockHead(r.Context, blockNumber, r.resolver) + + return nil } -func (r *Service) processBlockCandidate( +func (r *Service) processBlockHead( ctx context.Context, - blockCandidate uint64, + blockNumber uint64, resolver *applicationAdapterResolver, ) { r.Logger.Debug("Retrieving enabled applications") @@ -263,21 +146,12 @@ func (r *Service) processBlockCandidate( } r.hasEnabledApps = true - if resolver == nil { - resolver = newApplicationAdapterResolver(r.Logger, r.adapterFactory) - } apps := resolver.buildAppContracts(observableApps) if len(apps) == 0 { r.Logger.Info("No correctly configured applications running") return } - blockNumber, err := r.resolveScanBlock(ctx, blockCandidate) - if err != nil { - r.Logger.Error("Error resolving EVMReader scan block", "error", err) - return - } - r.runBlockScanners(ctx, apps, blockNumber) } @@ -305,7 +179,7 @@ func (r *Service) runBlockScanners( func (r *Service) fetchMostRecentHeader( ctx context.Context, defaultBlock DefaultBlock, -) (*types.Header, error) { +) (uint64, error) { var defaultBlockNumber int64 switch defaultBlock { @@ -318,7 +192,7 @@ func (r *Service) fetchMostRecentHeader( case DefaultBlock_Safe: defaultBlockNumber = rpc.SafeBlockNumber.Int64() default: - return nil, fmt.Errorf("default block '%v' not supported", defaultBlock) + return 0, fmt.Errorf("default block '%v' not supported", defaultBlock) } header, err := @@ -326,13 +200,16 @@ func (r *Service) fetchMostRecentHeader( ctx, new(big.Int).SetInt64(defaultBlockNumber)) if err != nil { - return nil, fmt.Errorf("failed to retrieve header: %w", err) + return 0, fmt.Errorf("failed to retrieve header: %w", err) } if header == nil { - return nil, fmt.Errorf("returned header is nil") + return 0, fmt.Errorf("returned header is nil") + } + if header.Number == nil { + return 0, fmt.Errorf("returned header number is nil") } - return header, nil + return header.Number.Uint64(), nil } type AdapterFactory interface { diff --git a/internal/evmreader/evmreader_test.go b/internal/evmreader/evmreader_test.go index 8413878b0..1ccb2344d 100644 --- a/internal/evmreader/evmreader_test.go +++ b/internal/evmreader/evmreader_test.go @@ -4,14 +4,17 @@ package evmreader import ( + "bytes" "context" - "fmt" + "errors" + "log/slog" "testing" "time" "github.com/cartesi/rollups-node/internal/config" . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/service" + "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -26,7 +29,6 @@ type EvmReaderSuite struct { ctx context.Context cancel context.CancelFunc client *MockEthClient - wsClient *MockEthClient repository *MockRepository evmReader *Service contractFactory *MockAdapterFactory @@ -40,17 +42,12 @@ func TestEvmReaderSuite(t *testing.T) { } func (s *EvmReaderSuite) SetupSuite() { - s.ctx, s.cancel = context.WithTimeout(context.Background(), suiteTimeout) config.SetDefaults() } -func (s *EvmReaderSuite) TearDownSuite() { - s.cancel() -} - func (s *EvmReaderSuite) SetupTest() { + s.ctx, s.cancel = context.WithTimeout(context.Background(), suiteTimeout) s.client = newMockEthClient().SetupDefaultBehavior() - s.wsClient = newMockEthClient().SetupDefaultWsBehavior() s.repository = newMockRepository().SetupDefaultBehavior() s.applicationContract1 = newMockApplicationContract().SetupDefaultBehavior() s.applicationContract2 = newMockApplicationContract().SetupDefaultBehavior() @@ -58,157 +55,265 @@ func (s *EvmReaderSuite) SetupTest() { s.contractFactory = newMockAdapterFactory().SetupDefaultBehavior(s.applicationContract1, s.applicationContract2, s.inputBox) s.evmReader = &Service{ - client: s.client, - wsClient: s.wsClient, - repository: s.repository, - defaultBlock: DefaultBlock_Latest, - adapterFactory: s.contractFactory, - hasEnabledApps: true, - inputReaderEnabled: true, - blockchainMaxRetries: 0, - blockchainSubscriptionRetryInterval: time.Second, - wsLivenessTimeout: 120 * time.Second, + client: s.client, + repository: s.repository, + defaultBlock: DefaultBlock_Latest, + inputReaderEnabled: true, + hasEnabledApps: true, + adapterFactory: s.contractFactory, } logLevel, err := config.GetLogLevel() s.Require().NoError(err) - serviceArgs := &service.CreateInfo{Name: "evm-reader", Impl: s.evmReader, LogLevel: logLevel} + serviceArgs := &service.CreateInfo{ + Name: "evm-reader", + Impl: s.evmReader, + LogLevel: logLevel, + Context: s.ctx, + Cancel: s.cancel, + PollInterval: 100 * time.Millisecond, + } err = service.Create(context.Background(), serviceArgs, &s.evmReader.Service) s.Require().NoError(err) + + s.evmReader.resolver = newApplicationAdapterResolver(s.evmReader.Logger, s.contractFactory) +} + +func (s *EvmReaderSuite) TearDownTest() { + s.cancel() +} + +func newCallNotification(c *mock.Call) <-chan struct{} { + ch := make(chan struct{}) + c.Run(func(mock.Arguments) { ch <- struct{}{} }) + return ch +} + +func newBlockedCallNotification(c *mock.Call) (<-chan struct{}, chan struct{}) { + called := make(chan struct{}) + blocked := make(chan struct{}) + c.Run(func(mock.Arguments) { + called <- struct{}{} // notify function was called + <-blocked // block function until notified + }) + return called, blocked +} + +func waitNotification(ch <-chan struct{}) bool { + select { + case <-ch: + return true + case <-time.After(2 * time.Second): + return false + } +} + +func wasntNotified(ch <-chan struct{}) bool { + select { + case <-ch: + return false + default: + return true + } } // Service tests -func (s *EvmReaderSuite) TestItStopsWhenContextIsCanceled() { - ctx, cancel := context.WithCancel(s.ctx) - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) +func (s *EvmReaderSuite) TestItStopsWhenContextIsAlreadyCanceled() { + done := make(chan struct{}) go func() { - errChannel <- s.evmReader.Run(ctx, ready) + s.cancel() + err := s.evmReader.Serve() + s.Require().NoError(err) + close(done) }() - cancel() - err := <-errChannel - s.Require().Equal(context.Canceled, err, "stopped for the wrong reason") + s.Require().True(waitNotification(done), "evmreader did not stop after context cancelation") } -func (s *EvmReaderSuite) TestItEventuallyBecomesReady() { - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) +func (s *EvmReaderSuite) TestItStopsWhenContextIsCanceledAfterFirstHeader() { + called := newCallNotification(s.client.EnqueueNewHead(100)) + + done := make(chan struct{}) go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) + err := s.evmReader.Serve() + s.Require().NoError(err) + close(done) }() - select { - case <-ready: - case err := <-errChannel: - s.FailNow("unexpected failure", err) - } + s.Require().True(waitNotification(called), "evmreader did not read new header") + + s.cancel() + + s.Require().True(waitNotification(done), "evmreader did not stop after context cancelation") } -func (s *EvmReaderSuite) TestItReturnsErrorWhenWebSocketStalls() { - s.evmReader.wsLivenessTimeout = 50 * time.Millisecond - ready := make(chan struct{}, 1) - headersProcessed, err := s.evmReader.watchForNewBlocks(s.ctx, ready) - s.Require().Equal(uint64(0), headersProcessed) - var subErr *SubscriptionError - s.Require().ErrorAs(err, &subErr) - s.Require().ErrorContains(err, "no new block header received") +func (s *EvmReaderSuite) TestReadyReflectsServeLifecycle() { + called := newCallNotification(s.client.EnqueueNewHead(100)) + + s.Require().False(s.evmReader.Ready()) + + done := make(chan struct{}) + go func() { + err := s.evmReader.Serve() + s.Require().NoError(err) + close(done) + }() + + s.Require().True(waitNotification(called)) + s.Require().True(s.evmReader.Ready()) + s.Require().True(wasntNotified(done)) + + s.cancel() + s.Require().True(waitNotification(done)) + s.Require().False(s.evmReader.Ready()) } -func (s *EvmReaderSuite) TestRunExhaustsRetriesOnConsecutiveConnectionFailures() { - s.evmReader.blockchainMaxRetries = 2 - s.evmReader.blockchainSubscriptionRetryInterval = time.Millisecond +func (s *EvmReaderSuite) TestReadyDoesNotDependOnPollingSuccess() { + var hdr *types.Header + called := newCallNotification(s.client.On("HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(hdr, errors.New("transient connection error")).Once()) + s.Require().False(s.evmReader.Ready()) + + done := make(chan struct{}) + go func() { + err := s.evmReader.Serve() + s.Require().NoError(err) + close(done) + }() - s.wsClient.Unset("SubscribeNewHead") - sub := &MockSubscription{} - s.wsClient.On("SubscribeNewHead", mock.Anything, mock.Anything). - Return(sub, fmt.Errorf("connection refused")) + s.Require().True(waitNotification(called)) + s.Require().True(s.evmReader.Ready()) + s.Require().True(wasntNotified(done)) - err := s.evmReader.Run(s.ctx, make(chan struct{}, 1)) - s.Require().ErrorContains(err, "connection refused") - // 1 initial + 2 retries = 3 calls - s.wsClient.AssertNumberOfCalls(s.T(), "SubscribeNewHead", 3) + s.cancel() + s.Require().True(waitNotification(done)) + s.Require().False(s.evmReader.Ready()) } -func (s *EvmReaderSuite) TestRunResetsRetriesAfterProcessingHeaders() { - s.evmReader.blockchainMaxRetries = 1 - s.evmReader.blockchainSubscriptionRetryInterval = time.Millisecond - s.evmReader.wsLivenessTimeout = 100 * time.Millisecond - - // First call: subscribe succeeds, deliver a header, then subscription error fires. - // -> headersProcessed > 0, so consecutiveFailures resets to 0 - // Second call: subscribe fails (connection error) -> consecutiveFailures=1 - // Third call: subscribe fails -> consecutiveFailures=2 > maxRetries(1) -> exit - subWithError := &MockSubscription{} - errCh := make(chan error, 1) - subWithError.On("Unsubscribe").Return() - subWithError.On("Err").Return((<-chan error)(errCh)) - - s.wsClient.Unset("SubscribeNewHead") - s.wsClient.On("SubscribeNewHead", mock.Anything, mock.Anything). - Run(func(args mock.Arguments) { - ch := args.Get(1).(chan<- *types.Header) - // Deliver a header then trigger subscription error - go func() { - ch <- &header0 - errCh <- fmt.Errorf("connection lost") - }() - }). - Return(subWithError, nil).Once() - - emptySub := &MockSubscription{} - s.wsClient.On("SubscribeNewHead", mock.Anything, mock.Anything). - Return(emptySub, fmt.Errorf("connection refused")) - - err := s.evmReader.Run(s.ctx, make(chan struct{}, 1)) - s.Require().ErrorContains(err, "connection refused") - // 1 successful + 1 retry + 1 exhausted = 3 calls - s.wsClient.AssertNumberOfCalls(s.T(), "SubscribeNewHead", 3) +func (s *EvmReaderSuite) TestTickScansWithServiceContext() { + s.client.EnqueueNewHead(100).Once() + + // reset mock calls from 'MockRepository.SetupDefaultBehavior' + s.repository.On( + "UpdateEventLastCheckBlock", + mock.Anything, + mock.Anything, + MonitoredEvent_InputAdded, + mock.Anything, + ).Unset() + s.repository.On( + "UpdateEventLastCheckBlock", + mock.Anything, + mock.Anything, + MonitoredEvent_OutputExecuted, + mock.Anything, + ).Unset() + + assertValidContext := func(args mock.Arguments) { + ctx := args.Get(0).(context.Context) + s.Require().Equal(s.evmReader.Context, ctx) + s.Require().Nil(ctx.Err()) + } + + // setup mock calls expected to receive a valid context + s.repository.On( + "UpdateEventLastCheckBlock", + mock.Anything, + mock.Anything, + MonitoredEvent_InputAdded, + mock.Anything, + ).Return(nil).Times(1).Run(assertValidContext) + s.repository.On( + "UpdateEventLastCheckBlock", + mock.Anything, + mock.Anything, + MonitoredEvent_OutputExecuted, + mock.Anything, + ).Return(nil).Times(4).Run(assertValidContext) + + s.Require().False(s.evmReader.Ready()) + + errs := s.evmReader.Tick() + s.Require().Empty(errs) + + s.client.AssertCalled(s.T(), "HeaderByNumber", mock.Anything, mock.Anything) + s.repository.AssertNumberOfCalls(s.T(), "UpdateEventLastCheckBlock", 5) } -func (s *EvmReaderSuite) TestRunDoesNotResetRetriesWithoutProcessingHeaders() { - s.evmReader.blockchainMaxRetries = 1 - s.evmReader.blockchainSubscriptionRetryInterval = time.Millisecond - s.evmReader.wsLivenessTimeout = time.Millisecond - - // Subscribe succeeds but no headers arrive before liveness timeout. - // headersProcessed=0, so consecutiveFailures increments (not reset). - // With maxRetries=1: first timeout -> failures=1, second timeout -> failures=2 > 1 -> exit - err := s.evmReader.Run(s.ctx, make(chan struct{}, 1)) - s.Require().ErrorContains(err, "no new block header received") - s.wsClient.AssertNumberOfCalls(s.T(), "SubscribeNewHead", 2) +func (s *EvmReaderSuite) TestFetchMostRecentHeaderReturnsErrorWhenHeaderNumberIsNil() { + s.client.On("HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&types.Header{}, nil).Once() + + blockNumber, err := s.evmReader.fetchMostRecentHeader(s.ctx, DefaultBlock_Latest) + + s.Require().Error(err) + s.Require().ErrorContains(err, "returned header number is nil") + s.Require().Zero(blockNumber) +} + +func (s *EvmReaderSuite) TestTickReturnsHeaderFetchErrorWithoutLocalErrorLog() { + var logBuffer bytes.Buffer + s.evmReader.Logger = slog.New(slog.NewTextHandler(&logBuffer, nil)) + + headerErr := errors.New("transient connection error") + var hdr *types.Header + s.client.On("HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(hdr, headerErr).Once() + + errs := s.evmReader.Tick() + + s.Require().Len(errs, 1) + s.Require().ErrorIs(errs[0], headerErr) + s.Require().NotContains(logBuffer.String(), "Error fetching most recent block") + s.repository.AssertNumberOfCalls(s.T(), "ListApplications", 0) } -func (s *EvmReaderSuite) TestRunStopsDuringRetryWhenContextCanceled() { - s.evmReader.blockchainMaxRetries = 100 - s.evmReader.blockchainSubscriptionRetryInterval = time.Second +func (s *EvmReaderSuite) TestItRunsWhenConnectionFails() { + var hdr *types.Header + called := newCallNotification(s.client.On("HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(hdr, errors.New("transient connection error"))) - s.wsClient.Unset("SubscribeNewHead") - sub := &MockSubscription{} - ctx, cancel := context.WithCancel(s.ctx) - s.wsClient.On("SubscribeNewHead", mock.Anything, mock.Anything). - Run(func(_ mock.Arguments) { cancel() }). - Return(sub, fmt.Errorf("connection refused")) + done := make(chan struct{}) + go func() { + err := s.evmReader.Serve() + s.Require().NoError(err) + close(done) + }() - err := s.evmReader.Run(ctx, make(chan struct{}, 1)) - s.Require().ErrorIs(err, context.Canceled) + s.Require().True(waitNotification(called)) + s.Require().True(waitNotification(called)) + s.Require().True(wasntNotified(done)) } -func (s *EvmReaderSuite) TestItFailsToSubscribeForNewInputsOnStart() { - s.wsClient.Unset("ChainID") - s.wsClient.Unset("SubscribeNewHead") - emptySubscription := &MockSubscription{} - s.wsClient.On( - "SubscribeNewHead", +func (s *EvmReaderSuite) TestRunResetsRetriesAfterProcessingHeaders() { + s.client.EnqueueNewHead(100).Once() + var hdr *types.Header + s.client.On("HeaderByNumber", mock.Anything, mock.Anything, - ).Return(emptySubscription, fmt.Errorf("expected failure")) + ).Return(hdr, errors.New("transient connection error")).Once() + called := newCallNotification(s.client.EnqueueNewHead(101)) + + done := make(chan struct{}) + go func() { + err := s.evmReader.Serve() + s.Require().NoError(err) + close(done) + }() + + s.Require().True(waitNotification(called)) + s.Require().True(wasntNotified(done)) - err := s.evmReader.Run(s.ctx, make(chan struct{}, 1)) - s.Require().ErrorContains(err, "expected failure") - s.wsClient.AssertNumberOfCalls(s.T(), "SubscribeNewHead", 1) - s.wsClient.AssertExpectations(s.T()) + s.client.AssertCalled(s.T(), "HeaderByNumber", mock.Anything, mock.Anything) } // indexApps indexes applications given a key extractor function. diff --git a/internal/evmreader/fixtures_test.go b/internal/evmreader/fixtures_test.go index a864e248d..5cb498d60 100644 --- a/internal/evmreader/fixtures_test.go +++ b/internal/evmreader/fixtures_test.go @@ -36,14 +36,6 @@ var ( inputAddedEvent3 = makeInputEvent(app1Addr, 3, 0x13) ) -// sentinelHeader is sent after the real headers to flush the processing pipeline. -// Because the subscription channel is unbuffered, fireNewHead blocks until the -// evmreader receives the header. Sending a sentinel after the last real header -// guarantees all real headers have been fully processed when fireNewHead returns. -var sentinelHeader = makeHeader(0x01) - -var subscription0 = newMockSubscription() - // applications defines the two-app setup used by most tests. // app1: InputBox DA (inputs are read), app2: non-InputBox DA (inputs filtered out). var applications = []*Application{{ diff --git a/internal/evmreader/input_test.go b/internal/evmreader/input_test.go index 5c1846f41..740a7801c 100644 --- a/internal/evmreader/input_test.go +++ b/internal/evmreader/input_test.go @@ -16,28 +16,14 @@ import ( ) func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocksFilteredByDA() { - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient + s.client.EnqueueNewHead(0x11).Once() + s.client.EnqueueNewHead(0x12).Once() + s.client.EnqueueNewHead(0x13).Once() + called, blocked := newBlockedCallNotification(s.client.EnqueueNewHead(0x13)) - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } + go s.evmReader.Serve() //nolint: errcheck - wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) - wsClient.fireNewHead(&header2) - wsClient.flushHeaders() + s.Require().True(waitNotification(called), "evmreader did not read new header") s.repository.AssertNumberOfCalls(s.T(), "CreateEpochsAndInputs", 3) s.repository.AssertNumberOfCalls(s.T(), "UpdateEventLastCheckBlock", 9) @@ -49,62 +35,11 @@ func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocksFilteredByDA() { s.applicationContract2.AssertExpectations(s.T()) s.contractFactory.AssertExpectations(s.T()) s.client.AssertExpectations(s.T()) -} - -func (s *EvmReaderSuite) TestItReadsInputsFromNewFinalizedBlocks() { - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient - s.evmReader.defaultBlock = DefaultBlock_Finalized - - s.client.On("HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header0, nil).Once() - s.client.On("HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header1, nil).Once() - s.client.On("HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header2, nil).Once() - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - wsClient.fireNewHead(&header3) - wsClient.fireNewHead(&header3) - wsClient.fireNewHead(&header3) - wsClient.flushHeaders() - - s.repository.AssertNumberOfCalls(s.T(), "CreateEpochsAndInputs", 3) - s.repository.AssertNumberOfCalls(s.T(), "UpdateEventLastCheckBlock", 9) - s.repository.AssertNumberOfCalls(s.T(), "UpdateOutputsExecution", 0) - s.repository.AssertExpectations(s.T()) - s.inputBox.AssertExpectations(s.T()) - s.applicationContract1.AssertExpectations(s.T()) - s.applicationContract2.AssertExpectations(s.T()) - s.contractFactory.AssertExpectations(s.T()) - s.client.AssertExpectations(s.T()) + close(blocked) // release blocked calls } func (s *EvmReaderSuite) TestItUpdatesLastInputCheckBlockWhenThereIsNoInputs() { - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient - // Prepare repository s.repository.Unset("UpdateEventLastCheckBlock") s.repository.On("UpdateEventLastCheckBlock", @@ -160,25 +95,15 @@ func (s *EvmReaderSuite) TestItUpdatesLastInputCheckBlockWhenThereIsNoInputs() { mock.Anything, ).Return(new(big.Int).SetUint64(0), nil) + s.client.EnqueueNewHead(0x11).Once() + s.client.EnqueueNewHead(0x12).Once() + s.client.EnqueueNewHead(0x13).Once() + called, blocked := newBlockedCallNotification(s.client.EnqueueNewHead(0x13)) + // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } + go s.evmReader.Serve() //nolint: errcheck - wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) - wsClient.fireNewHead(&header2) - wsClient.flushHeaders() + s.Require().True(waitNotification(called), "evmreader did not read new header") s.repository.AssertNumberOfCalls(s.T(), "CreateEpochsAndInputs", 0) s.repository.AssertExpectations(s.T()) @@ -190,13 +115,11 @@ func (s *EvmReaderSuite) TestItUpdatesLastInputCheckBlockWhenThereIsNoInputs() { s.applicationContract2.AssertExpectations(s.T()) s.contractFactory.AssertExpectations(s.T()) s.client.AssertExpectations(s.T()) + + close(blocked) // release blocked connection } func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { - - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient - s.applicationContract1.Unset("GetDeploymentBlockNumber") s.applicationContract1.Unset("GetNumberOfExecutedOutputs") s.applicationContract1.On("GetNumberOfExecutedOutputs", @@ -219,6 +142,7 @@ func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { s.contractFactory = newMockAdapterFactory().SetupDefaultBehaviorSingleApp(s.applicationContract1, s.inputBox) s.evmReader.adapterFactory = s.contractFactory + s.evmReader.resolver = newApplicationAdapterResolver(s.evmReader.Logger, s.contractFactory) // Prepare Repo s.repository.Unset("ListApplications") @@ -289,24 +213,13 @@ func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { mock.Anything, ).Return(uint64(0), nil).Once() + s.client.EnqueueNewHead(0x13).Once() + called, blocked := newBlockedCallNotification(s.client.EnqueueNewHead(0x13)) + // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } + go s.evmReader.Serve() //nolint: errcheck - wsClient.fireNewHead(&header2) - // Give a time for - wsClient.flushHeaders() + s.Require().True(waitNotification(called), "evmreader did not read new header") s.repository.AssertNumberOfCalls(s.T(), "CreateEpochsAndInputs", 1) s.repository.AssertExpectations(s.T()) @@ -317,12 +230,11 @@ func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { s.applicationContract1.AssertExpectations(s.T()) s.contractFactory.AssertExpectations(s.T()) s.client.AssertExpectations(s.T()) + + close(blocked) // release blocked connection } func (s *EvmReaderSuite) TestItStartsWhenLastProcessedBlockIsTheMostRecentBlock() { - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient - // Prepare Repo s.repository.Unset("ListApplications") s.repository.On("ListApplications", @@ -366,29 +278,21 @@ func (s *EvmReaderSuite) TestItStartsWhenLastProcessedBlockIsTheMostRecentBlock( mock.Anything, ).Return(s.applicationContract1, s.inputBox, nil, nil).Once() + s.client.EnqueueNewHead(0x13).Once() + called, blocked := newBlockedCallNotification(s.client.EnqueueNewHead(0x13)) + // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } + go s.evmReader.Serve() //nolint: errcheck - wsClient.fireNewHead(&header2) - wsClient.flushHeaders() + s.Require().True(waitNotification(called), "evmreader did not read new header") s.repository.AssertExpectations(s.T()) s.inputBox.AssertExpectations(s.T()) s.applicationContract1.AssertExpectations(s.T()) s.contractFactory.AssertExpectations(s.T()) s.client.AssertExpectations(s.T()) + + close(blocked) // release blocked connection } func (s *EvmReaderSuite) TestCatchUpForeclosedInputsScansThroughForecloseBlock() { diff --git a/internal/evmreader/mocks_test.go b/internal/evmreader/mocks_test.go index c2bc9185e..28f19db2b 100644 --- a/internal/evmreader/mocks_test.go +++ b/internal/evmreader/mocks_test.go @@ -13,7 +13,6 @@ import ( "github.com/cartesi/rollups-node/pkg/contracts/iapplication" "github.com/cartesi/rollups-node/pkg/contracts/idaveconsensus" "github.com/cartesi/rollups-node/pkg/contracts/iinputbox" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -80,13 +79,11 @@ func (m *MockEthClient) SetupDefaultBehavior() *MockEthClient { return m } -func (m *MockEthClient) SetupDefaultWsBehavior() *MockEthClient { - m.On("ChainID", mock.Anything).Return(big.NewInt(1), nil) - m.On("SubscribeNewHead", +func (m *MockEthClient) EnqueueNewHead(blknum int64) *mock.Call { + return m.On("HeaderByNumber", mock.Anything, mock.Anything, - ).Return(subscription0, nil) - return m + ).Return(&types.Header{Number: big.NewInt(blknum)}, nil) } func (m *MockEthClient) Unset(methodName string) { @@ -101,14 +98,6 @@ func (m *MockEthClient) HeaderByNumber( return args.Get(0).(*types.Header), args.Error(1) } -func (m *MockEthClient) SubscribeNewHead( - ctx context.Context, - ch chan<- *types.Header, -) (ethereum.Subscription, error) { - args := m.Called(ctx, ch) - return args.Get(0).(ethereum.Subscription), args.Error(1) -} - func (m *MockEthClient) ChainID(ctx context.Context) (*big.Int, error) { args := m.Called(ctx) if args.Get(0) == nil { @@ -117,67 +106,6 @@ func (m *MockEthClient) ChainID(ctx context.Context) (*big.Int, error) { return args.Get(0).(*big.Int), args.Error(1) } -// --------------------------------------------------------------------------- -// MockSubscription -// --------------------------------------------------------------------------- - -type MockSubscription struct { - mock.Mock -} - -func newMockSubscription() *MockSubscription { - sub := &MockSubscription{} - sub.On("Unsubscribe").Return() - sub.On("Err").Return(make(<-chan error)) - return sub -} - -func (m *MockSubscription) Unsubscribe() { - m.Called() -} - -func (m *MockSubscription) Err() <-chan error { - args := m.Called() - return args.Get(0).(<-chan error) -} - -// --------------------------------------------------------------------------- -// FakeWSEthClient -// --------------------------------------------------------------------------- - -type FakeWSEthClient struct { - ch chan<- *types.Header -} - -func (f *FakeWSEthClient) SubscribeNewHead( - _ context.Context, - ch chan<- *types.Header, -) (ethereum.Subscription, error) { - f.ch = ch - return newMockSubscription(), nil -} - -func (f *FakeWSEthClient) HeaderByNumber( - _ context.Context, - _ *big.Int, -) (*types.Header, error) { - return &header0, nil -} - -func (f *FakeWSEthClient) ChainID(_ context.Context) (*big.Int, error) { - return big.NewInt(1), nil -} - -func (f *FakeWSEthClient) fireNewHead(header *types.Header) { - f.ch <- header -} - -// flushHeaders sends a sentinel header to guarantee that all previously sent -// headers have been fully processed. Works because the channel is unbuffered. -func (f *FakeWSEthClient) flushHeaders() { - f.ch <- &sentinelHeader -} - // --------------------------------------------------------------------------- // MockInputBox // --------------------------------------------------------------------------- @@ -350,7 +278,7 @@ func (m *MockRepository) SetupDefaultBehavior() *MockRepository { ClaimTransactionHash: nil, }, nil).Twice() - // Catch-all: returns empty list for sentinel / extra headers (flushHeaders). + // Catch-all: returns empty list for extra headers. m.On("ListApplications", mock.Anything, mock.Anything, mock.Anything, false). Return([]*Application{}, uint64(0), nil) diff --git a/internal/evmreader/output_test.go b/internal/evmreader/output_test.go index a50bf3fe9..15c175a2d 100644 --- a/internal/evmreader/output_test.go +++ b/internal/evmreader/output_test.go @@ -141,30 +141,17 @@ func (s *EvmReaderSuite) setupOutputExecution() { } func (s *EvmReaderSuite) TestOutputExecution() { - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient + s.client.EnqueueNewHead(0x11).Once() + s.client.EnqueueNewHead(0x12).Once() + s.client.EnqueueNewHead(0x13).Once() + called, blocked := newBlockedCallNotification(s.client.EnqueueNewHead(0x13)) s.setupOutputExecution() // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } + go s.evmReader.Serve() //nolint: errcheck - wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) - wsClient.fireNewHead(&header2) - wsClient.flushHeaders() + s.Require().True(waitNotification(called), "evmreader did not read new header") s.repository.AssertNumberOfCalls(s.T(), "UpdateOutputsExecution", 2) s.repository.AssertExpectations(s.T()) @@ -175,12 +162,10 @@ func (s *EvmReaderSuite) TestOutputExecution() { s.contractFactory.AssertExpectations(s.T()) s.client.AssertExpectations(s.T()) + close(blocked) // release blocked connection } func (s *EvmReaderSuite) TestOutputExecutionOnFinalizedBlocks() { - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient - s.evmReader.defaultBlock = DefaultBlock_Finalized s.client.On("HeaderByNumber", @@ -198,25 +183,14 @@ func (s *EvmReaderSuite) TestOutputExecutionOnFinalizedBlocks() { s.setupOutputExecution() + s.client.EnqueueNewHead(0x11).Once() + s.client.EnqueueNewHead(0x12).Once() + called, blocked := newBlockedCallNotification(s.client.EnqueueNewHead(0x13)) + // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } + go s.evmReader.Serve() //nolint: errcheck - wsClient.fireNewHead(&header3) - wsClient.fireNewHead(&header3) - wsClient.fireNewHead(&header3) - wsClient.flushHeaders() + s.Require().True(waitNotification(called), "evmreader did not read new header") s.repository.AssertNumberOfCalls(s.T(), "UpdateOutputsExecution", 2) s.repository.AssertExpectations(s.T()) @@ -227,6 +201,7 @@ func (s *EvmReaderSuite) TestOutputExecutionOnFinalizedBlocks() { s.contractFactory.AssertExpectations(s.T()) s.client.AssertExpectations(s.T()) + close(blocked) // release blocked connection } // TestOutputExecutionContinuesForForeclosedApps verifies foreclosure does not @@ -417,9 +392,6 @@ func (s *EvmReaderSuite) TestOutputExecutionMismatchMarksApplicationDiverged() { } func (s *EvmReaderSuite) TestCheckOutputFailsWhenRetrieveOutputsFails() { - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient - s.setupOutputExecution() s.applicationContract1.Unset("RetrieveOutputExecutionEvents") @@ -493,25 +465,14 @@ func (s *EvmReaderSuite) TestCheckOutputFailsWhenRetrieveOutputsFails() { mock.Anything, ).Return(nil).Times(5) + s.client.EnqueueNewHead(0x11).Once() + s.client.EnqueueNewHead(0x12).Once() + s.client.EnqueueNewHead(0x13).Once() + called, blocked := newBlockedCallNotification(s.client.EnqueueNewHead(0x13)) // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } + go s.evmReader.Serve() //nolint: errcheck - wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) - wsClient.fireNewHead(&header2) - wsClient.flushHeaders() + s.Require().True(waitNotification(called), "evmreader did not read new header") s.repository.AssertNumberOfCalls(s.T(), "UpdateOutputsExecution", 0) s.repository.AssertExpectations(s.T()) @@ -522,12 +483,10 @@ func (s *EvmReaderSuite) TestCheckOutputFailsWhenRetrieveOutputsFails() { s.contractFactory.AssertExpectations(s.T()) s.client.AssertExpectations(s.T()) + close(blocked) // release blocked calls } func (s *EvmReaderSuite) TestCheckOutputFailsWhenGetOutputsFails() { - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient - s.setupOutputExecution() s.repository.Unset("GetOutput") @@ -609,25 +568,15 @@ func (s *EvmReaderSuite) TestCheckOutputFailsWhenGetOutputsFails() { mock.Anything, ).Return(nil).Times(5) + s.client.EnqueueNewHead(0x11).Once() + s.client.EnqueueNewHead(0x12).Once() + s.client.EnqueueNewHead(0x13).Once() + called, blocked := newBlockedCallNotification(s.client.EnqueueNewHead(0x13)) + // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } + go s.evmReader.Serve() //nolint: errcheck - wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) - wsClient.fireNewHead(&header2) - wsClient.flushHeaders() + s.Require().True(waitNotification(called), "evmreader did not read new header") s.repository.AssertNumberOfCalls(s.T(), "UpdateOutputsExecution", 0) s.repository.AssertExpectations(s.T()) @@ -637,6 +586,8 @@ func (s *EvmReaderSuite) TestCheckOutputFailsWhenGetOutputsFails() { s.applicationContract2.AssertExpectations(s.T()) s.contractFactory.AssertExpectations(s.T()) s.client.AssertExpectations(s.T()) + + close(blocked) // release blocked calls } func (s *EvmReaderSuite) setupOutputMismatchTest() { @@ -647,25 +598,30 @@ func (s *EvmReaderSuite) setupOutputMismatchTest() { s.contractFactory = newMockAdapterFactory() s.evmReader = &Service{ - client: s.client, - wsClient: s.wsClient, - repository: s.repository, - defaultBlock: DefaultBlock_Latest, - adapterFactory: s.contractFactory, - hasEnabledApps: true, - inputReaderEnabled: true, - blockchainMaxRetries: 0, - blockchainSubscriptionRetryInterval: time.Second, - wsLivenessTimeout: 120 * time.Second, + client: s.client, + repository: s.repository, + defaultBlock: DefaultBlock_Latest, + adapterFactory: s.contractFactory, + hasEnabledApps: true, + inputReaderEnabled: true, } logLevel, err := config.GetLogLevel() s.Require().NoError(err) - serviceArgs := &service.CreateInfo{Name: "evm-reader", Impl: s.evmReader, LogLevel: logLevel} + serviceArgs := &service.CreateInfo{ + Name: "evm-reader", + Impl: s.evmReader, + LogLevel: logLevel, + Context: s.ctx, + Cancel: s.cancel, + PollInterval: 100 * time.Millisecond, + } err = service.Create(context.Background(), serviceArgs, &s.evmReader.Service) s.Require().NoError(err) + s.evmReader.resolver = newApplicationAdapterResolver(s.evmReader.Logger, s.contractFactory) + apps := copyApplications(applications) for _, app := range apps { app.LastForecloseCheckBlock = 0x100 @@ -797,28 +753,15 @@ func (s *EvmReaderSuite) setupOutputMismatchTest() { func (s *EvmReaderSuite) TestCheckOutputFailsWhenOutputMismatches() { s.setupOutputMismatchTest() - wsClient := FakeWSEthClient{} - s.evmReader.wsClient = &wsClient + s.client.EnqueueNewHead(0x11).Once() + s.client.EnqueueNewHead(0x12).Once() + s.client.EnqueueNewHead(0x13).Once() + called, blocked := newBlockedCallNotification(s.client.EnqueueNewHead(0x13)) // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- s.evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } + go s.evmReader.Serve() //nolint: errcheck - wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) - wsClient.fireNewHead(&header2) - wsClient.flushHeaders() + s.Require().True(waitNotification(called), "evmreader did not read new header") s.repository.AssertNumberOfCalls(s.T(), "UpdateOutputsExecution", 0) s.repository.AssertExpectations(s.T()) @@ -829,4 +772,5 @@ func (s *EvmReaderSuite) TestCheckOutputFailsWhenOutputMismatches() { s.contractFactory.AssertExpectations(s.T()) s.client.AssertExpectations(s.T()) + close(blocked) // release blocked calls } diff --git a/internal/evmreader/sealedepochs.go b/internal/evmreader/sealedepochs.go index 46b69edcd..1a8b57618 100644 --- a/internal/evmreader/sealedepochs.go +++ b/internal/evmreader/sealedepochs.go @@ -409,6 +409,27 @@ func (r *Service) processApplicationOpenEpoch( app appContracts, mostRecentBlockNumber uint64, ) error { + // This guard uses the tick-start application snapshot. Sealed-epoch + // processing earlier in this same tick can advance the DB input cursor to + // mostRecentBlockNumber, but the open epoch may still need to scan that + // boundary block. The fresh DB cursor is read below and scanned inclusively. + if mostRecentBlockNumber < app.application.LastInputCheckBlock { + r.Logger.Warn( + "Not checking for inputs on current open epoch: most recent block is lower than the last processed one", + "application", app.application.Name, "address", app.application.IApplicationAddress, + "last_input_check_block", app.application.LastInputCheckBlock, + "most_recent_block", mostRecentBlockNumber, + ) + return nil + } else if mostRecentBlockNumber == app.application.LastInputCheckBlock { + r.Logger.Debug("Not checking for inputs on current open epoch: already checked the most recent blocks", + "application", app.application.Name, "address", app.application.IApplicationAddress, + "last_input_check_block", app.application.LastInputCheckBlock, + "most_recent_block", mostRecentBlockNumber, + ) + return nil + } + r.Logger.Debug("Checking for inputs on current open epoch", "application", app.application.Name, "most_recent_block", mostRecentBlockNumber, diff --git a/internal/evmreader/service.go b/internal/evmreader/service.go index 5c00ff7e9..29d2429c7 100644 --- a/internal/evmreader/service.go +++ b/internal/evmreader/service.go @@ -9,7 +9,6 @@ import ( "fmt" "math/big" "sync/atomic" - "time" "github.com/cartesi/rollups-node/internal/config" . "github.com/cartesi/rollups-node/internal/model" @@ -24,28 +23,25 @@ type CreateInfo struct { Config config.EvmreaderConfig - Repository repository.Repository + Repository EvmReaderRepository - EthClient *ethclient.Client - EthWsClient EthClientInterface + EthClient *ethclient.Client } type Service struct { service.Service - client EthClientInterface - wsClient EthClientInterface - adapterFactory AdapterFactory - repository EvmReaderRepository - chainId uint64 - defaultBlock DefaultBlock - hasEnabledApps bool - inputReaderEnabled bool - blockchainMaxRetries uint64 - blockchainSubscriptionRetryInterval time.Duration - wsLivenessTimeout time.Duration - alive atomic.Bool - ready atomic.Bool + client EthClientInterface + adapterFactory AdapterFactory + resolver *applicationAdapterResolver + repository EvmReaderRepository + chainID uint64 + defaultBlock DefaultBlock + hasEnabledApps bool + inputReaderEnabled bool + lastBlockNumber atomic.Uint64 + alive atomic.Bool + ready atomic.Bool } const EvmReaderConfigKey = "evm-reader" @@ -82,18 +78,6 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { chainId.Uint64(), c.Config.BlockchainId) } - if c.EthWsClient == nil { - return nil, fmt.Errorf("EthWsClient on evmreader service Create is nil") - } - chainId, err = c.EthWsClient.ChainID(ctx) - if err != nil { - return nil, err - } - if chainId.Uint64() != c.Config.BlockchainId { - return nil, fmt.Errorf("EthWsClient chainId mismatch: network %d != provided %d", - chainId.Uint64(), c.Config.BlockchainId) - } - s.repository = c.Repository if s.repository == nil { return nil, fmt.Errorf("repository on evmreader service Create is nil") @@ -107,14 +91,10 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { return nil, fmt.Errorf("NodeConfig chainId mismatch: network %d != config %d", chainId.Uint64(), nodeConfig.ChainID) } - s.blockchainMaxRetries = c.Config.BlockchainWsMaxRetries - s.blockchainSubscriptionRetryInterval = c.Config.BlockchainWsReconnectInterval - s.wsLivenessTimeout = c.Config.BlockchainWsLivenessTimeout s.client = c.EthClient - s.wsClient = c.EthWsClient - s.chainId = nodeConfig.ChainID + s.chainID = nodeConfig.ChainID s.defaultBlock = nodeConfig.DefaultBlock s.inputReaderEnabled = nodeConfig.InputReaderEnabled s.hasEnabledApps = true @@ -126,6 +106,7 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { Logger: s.Logger, }, } + s.resolver = newApplicationAdapterResolver(s.Logger, s.adapterFactory) return s, nil } @@ -147,29 +128,11 @@ func (s *Service) Stop(bool) []error { return nil } -func (s *Service) Tick() []error { - return []error{} -} - func (s *Service) Serve() error { s.alive.Store(true) - ready := make(chan struct{}, 1) - go func() { - defer s.alive.Store(false) - defer s.ready.Store(false) - err := s.Run(s.Context, ready) - if err != nil && s.Context.Err() == nil { - s.Logger.Error("Run exited with error", "error", err) - } - s.Cancel() - }() - go func() { - select { - case <-ready: - s.ready.Store(true) - case <-s.Context.Done(): - } - }() + s.ready.Store(true) + defer s.alive.Store(false) + defer s.ready.Store(false) return s.Service.Serve() } diff --git a/internal/evmreader/service_config_test.go b/internal/evmreader/service_config_test.go index e1ec7d249..f99e1a601 100644 --- a/internal/evmreader/service_config_test.go +++ b/internal/evmreader/service_config_test.go @@ -8,6 +8,8 @@ import ( "encoding/json" "errors" "math/big" + "net/http" + "net/http/httptest" "testing" "time" @@ -16,6 +18,7 @@ import ( "github.com/cartesi/rollups-node/internal/repository" "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -40,6 +43,59 @@ func TestCreateWithNilEthClient(t *testing.T) { require.ErrorContains(t, err, "EthClient on evmreader service Create is nil") } +func TestCreateAcceptsRequestTimeoutBelowPollingInterval(t *testing.T) { + config.SetDefaults() + logLevel, err := config.GetLogLevel() + require.NoError(t, err) + + const chainID = uint64(1) + pollInterval := 12 * time.Second + requestTimeout := 5 * time.Second + + rpcServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":"0x1"}`)) + })) + defer rpcServer.Close() + + client, err := ethclient.Dial(rpcServer.URL) + require.NoError(t, err) + defer client.Close() + + rawConfig, err := json.Marshal(PersistentConfig{ + DefaultBlock: DefaultBlock_Finalized, + InputReaderEnabled: true, + ChainID: chainID, + }) + require.NoError(t, err) + + repo := newMockRepository() + repo.On("LoadNodeConfigRaw", mock.Anything, EvmReaderConfigKey). + Return(rawConfig, time.Now(), time.Now(), nil).Once() + + svc, err := Create(context.Background(), &CreateInfo{ + CreateInfo: service.CreateInfo{ + Name: "evm-reader", + LogLevel: logLevel, + PollInterval: pollInterval, + }, + Config: config.EvmreaderConfig{ + BlockchainDefaultBlock: DefaultBlock_Finalized, + BlockchainHttpRequestTimeout: requestTimeout, + BlockchainId: chainID, + EvmReaderPollingInterval: pollInterval, + FeatureInputReaderEnabled: true, + }, + EthClient: client, + Repository: repo, + }) + require.NoError(t, err) + defer svc.Ticker.Stop() + defer svc.Cancel() + + repo.AssertExpectations(t) +} + // --- fetchMostRecentHeader tests --- func (s *EvmReaderSuite) TestFetchMostRecentHeaderRPCError() { @@ -74,7 +130,7 @@ func (s *EvmReaderSuite) TestFetchMostRecentHeaderSuccess() { header, err := s.evmReader.fetchMostRecentHeader(s.ctx, DefaultBlock_Finalized) s.Require().NoError(err) - s.Require().Equal(expected, header) + s.Require().Equal(expected.Number.Uint64(), header) } // --- inputReaderEnabled feature flag tests --- diff --git a/internal/node/node.go b/internal/node/node.go index a351ae224..872ca8c26 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -33,11 +33,10 @@ type CreateInfo struct { Config config.NodeConfig - PrtClient *ethclient.Client - ClaimerClient *ethclient.Client - ReaderClient *ethclient.Client - ReaderWSClient *ethclient.Client - Repository repository.Repository + PrtClient *ethclient.Client + ClaimerClient *ethclient.Client + ReaderClient *ethclient.Client + Repository repository.Repository } type Service struct { @@ -169,12 +168,12 @@ func newEVMReader(ctx context.Context, c *CreateInfo, s *Service) (service.IServ LogColor: c.Config.LogColor, EnableSignalHandling: false, TelemetryCreate: false, + PollInterval: c.Config.EvmReaderPollingInterval, ServeMux: s.ServeMux, }, - EthClient: c.ReaderClient, - EthWsClient: c.ReaderWSClient, - Repository: c.Repository, - Config: *c.Config.ToEvmreaderConfig(), + EthClient: c.ReaderClient, + Repository: c.Repository, + Config: *c.Config.ToEvmreaderConfig(), } readerService, err := evmreader.Create(ctx, &readerArgs) diff --git a/pkg/ethutil/client.go b/pkg/ethutil/client.go index fefe29e0f..55d281419 100644 --- a/pkg/ethutil/client.go +++ b/pkg/ethutil/client.go @@ -19,9 +19,10 @@ import ( // RetryConfig holds configuration for the retryable HTTP client. type RetryConfig struct { - MaxRetries uint64 - RetryMinWait time.Duration - RetryMaxWait time.Duration + MaxRetries uint64 + RetryMinWait time.Duration + RetryMaxWait time.Duration + RequestTimeout time.Duration } // NewEthClient creates an Ethereum JSON-RPC client with retryable HTTP transport. @@ -39,6 +40,7 @@ func NewEthClient( rclient.RetryMax = int(min(retryConfig.MaxRetries, uint64(math.MaxInt))) rclient.RetryWaitMin = retryConfig.RetryMinWait rclient.RetryWaitMax = retryConfig.RetryMaxWait + rclient.HTTPClient.Timeout = retryConfig.RequestTimeout opts := []rpc.ClientOption{ rpc.WithHTTPClient(rclient.StandardClient()), diff --git a/pkg/ethutil/client_test.go b/pkg/ethutil/client_test.go index be2d204e2..e2b23b3bf 100644 --- a/pkg/ethutil/client_test.go +++ b/pkg/ethutil/client_test.go @@ -5,11 +5,16 @@ package ethutil import ( "bytes" + "context" "errors" "fmt" + "io" "log/slog" + "net/http" + "net/http/httptest" "net/url" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -226,3 +231,26 @@ func TestRedactEndpointFromError(t *testing.T) { require.Contains(t, redacted.Error(), "https://alchemy.com") }) } + +func TestNewEthClientRequestTimeout(t *testing.T) { + const requestTimeout = 25 * time.Millisecond + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + time.Sleep(10 * requestTimeout) + _, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":"0x1"}`)) + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + client, err := NewEthClient(context.Background(), server.URL, logger, RetryConfig{ + RequestTimeout: requestTimeout, + }) + require.NoError(t, err) + + start := time.Now() + _, err = client.ChainID(context.Background()) + elapsed := time.Since(start) + + require.Error(t, err) + require.Less(t, elapsed, 5*requestTimeout) +} diff --git a/pkg/service/service.go b/pkg/service/service.go index 90af6fa1f..4ed56f653 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -144,7 +144,6 @@ type Service struct { Impl ServiceImpl Logger *slog.Logger Ticker *time.Ticker - PollInterval time.Duration Context context.Context Cancel context.CancelFunc Sighup chan os.Signal // SIGHUP to reload @@ -174,6 +173,9 @@ func Create(ctx context.Context, c *CreateInfo, s *Service) error { if err := ctx.Err(); err != nil { return err // This returns context.Canceled or context.DeadlineExceeded. } + if s.Ticker == nil && c.PollInterval < 0 { + return fmt.Errorf("PollInterval must be non-negative, got %v", c.PollInterval) + } s.Running.Store(false) s.Name = c.Name @@ -204,8 +206,7 @@ func Create(ctx context.Context, c *CreateInfo, s *Service) error { if c.PollInterval == 0 { c.PollInterval = time.Minute } - s.PollInterval = c.PollInterval - s.Ticker = time.NewTicker(s.PollInterval) + s.Ticker = time.NewTicker(c.PollInterval) } // self-rescheduling diff --git a/pkg/service/service_test.go b/pkg/service/service_test.go index f123763a3..6949c3171 100644 --- a/pkg/service/service_test.go +++ b/pkg/service/service_test.go @@ -64,6 +64,22 @@ func TestServe(t *testing.T) { suite.Run(t, new(ServeSuite)) } +func TestCreateRejectsNegativePollInterval(t *testing.T) { + impl := &mockImpl{} + svc := &Service{} + + require.NotPanics(t, func() { + err := Create(context.Background(), &CreateInfo{ + Name: "test-negative-poll", + LogLevel: slog.LevelError, + Impl: impl, + PollInterval: -time.Second, + }, svc) + require.ErrorContains(t, err, "PollInterval must be non-negative") + }) + require.Nil(t, svc.Ticker) +} + func (s *ServeSuite) TestDisabledReschedulePreservesExistingBehavior() { // With rescheduling disabled and a short poll interval, // Serve() should tick only on timer fires. diff --git a/test/compose/compose.integration.yaml b/test/compose/compose.integration.yaml index c4ba13e2f..0e21a3ccf 100644 --- a/test/compose/compose.integration.yaml +++ b/test/compose/compose.integration.yaml @@ -1,8 +1,8 @@ x-env: &env CARTESI_LOG_LEVEL: info CARTESI_BLOCKCHAIN_HTTP_ENDPOINT: http://ethereum_provider:8545 - CARTESI_BLOCKCHAIN_WS_ENDPOINT: ws://ethereum_provider:8545 CARTESI_BLOCKCHAIN_ID: 31337 + CARTESI_EVM_READER_POLLING_INTERVAL: 1 CARTESI_CONTRACTS_INPUT_BOX_ADDRESS: 0x346B3df038FE9f8380071eC6514D5a83aD143939 CARTESI_CONTRACTS_AUTHORITY_FACTORY_ADDRESS: 0x3C1FE01c542a88A523FF6847eD1E26176c8C4ED0 CARTESI_CONTRACTS_QUORUM_FACTORY_ADDRESS: 0x1f94009389F408B8D0ADfFcF8BBDCe5552BaCa5F diff --git a/test/compose/compose.test.yaml b/test/compose/compose.test.yaml index b5ee6a22e..63046f00b 100644 --- a/test/compose/compose.test.yaml +++ b/test/compose/compose.test.yaml @@ -1,8 +1,8 @@ x-env: &env CARTESI_LOG_LEVEL: info CARTESI_BLOCKCHAIN_HTTP_ENDPOINT: http://ethereum_provider:8545 - CARTESI_BLOCKCHAIN_WS_ENDPOINT: ws://ethereum_provider:8545 CARTESI_BLOCKCHAIN_ID: 31337 + CARTESI_EVM_READER_POLLING_INTERVAL: 1 CARTESI_CONTRACTS_INPUT_BOX_ADDRESS: 0x346B3df038FE9f8380071eC6514D5a83aD143939 CARTESI_CONTRACTS_AUTHORITY_FACTORY_ADDRESS: 0x3C1FE01c542a88A523FF6847eD1E26176c8C4ED0 CARTESI_CONTRACTS_QUORUM_FACTORY_ADDRESS: 0x1f94009389F408B8D0ADfFcF8BBDCe5552BaCa5F diff --git a/test/secrets/blockchain_ws_endpoint.txt b/test/secrets/blockchain_ws_endpoint.txt deleted file mode 100644 index 97cba982e..000000000 --- a/test/secrets/blockchain_ws_endpoint.txt +++ /dev/null @@ -1 +0,0 @@ -ws://ethereum_provider:8545 From 09c00d8eb31fc490cee532faadc74ee75f246dbe Mon Sep 17 00:00:00 2001 From: Renato Maia <1887792+renatomaia@users.noreply.github.com> Date: Thu, 11 Jun 2026 10:55:03 -0300 Subject: [PATCH 2/4] fix(ethclient): impose upper limit for retry delays on HTTP Ethereum API --- pkg/ethutil/client.go | 48 +++++++++++++ pkg/ethutil/client_test.go | 141 +++++++++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+) diff --git a/pkg/ethutil/client.go b/pkg/ethutil/client.go index 55d281419..62d45db5c 100644 --- a/pkg/ethutil/client.go +++ b/pkg/ethutil/client.go @@ -8,7 +8,9 @@ import ( "fmt" "log/slog" "math" + "net/http" "net/url" + "strconv" "strings" "time" @@ -17,6 +19,37 @@ import ( "github.com/hashicorp/go-retryablehttp" ) +// parseRetryAfterHeader parses the Retry-After header and returns the +// delay duration according to the spec: https://httpwg.org/specs/rfc7231.html#header.retry-after +// The bool returned will be true if the header was successfully parsed. +// Otherwise, the header was either not present, or was not parseable according to the spec. +func parseRetryAfterHeader(headers []string) (time.Duration, bool) { + if len(headers) == 0 || headers[0] == "" { + return 0, false + } + header := headers[0] + // 'Retry-After' is provided in seconds. + if sleep, err := strconv.ParseInt(header, 10, 64); err == nil { + if sleep < 0 { // a negative sleep doesn't make sense + return 0, false + } + if sleep > int64(time.Duration(math.MaxInt64)/time.Second) { + return time.Duration(math.MaxInt64), true + } + return time.Second * time.Duration(sleep), true + } + + // 'Retry-After' is provided as a date. + retryTime, err := time.Parse(time.RFC1123, header) + if err != nil { + return 0, false + } + if duration := time.Until(retryTime); duration > 0 { + return duration, true + } + return 0, true // past date +} + // RetryConfig holds configuration for the retryable HTTP client. type RetryConfig struct { MaxRetries uint64 @@ -41,6 +74,7 @@ func NewEthClient( rclient.RetryWaitMin = retryConfig.RetryMinWait rclient.RetryWaitMax = retryConfig.RetryMaxWait rclient.HTTPClient.Timeout = retryConfig.RequestTimeout + rclient.Backoff = retryBackoff opts := []rpc.ClientOption{ rpc.WithHTTPClient(rclient.StandardClient()), @@ -60,6 +94,20 @@ func NewEthClient( return ethclient.NewClient(rpcClient), nil } +// retryBackoff caps server-directed Retry-After delays while preserving the +// library's default exponential backoff for every other case. +func retryBackoff(minDuration, maxDuration time.Duration, attemptNum int, resp *http.Response) time.Duration { + if resp != nil { + if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable { + if sleep, ok := parseRetryAfterHeader(resp.Header["Retry-After"]); ok { + return min(maxDuration, sleep) + } + } + } + + return retryablehttp.DefaultBackoff(minDuration, maxDuration, attemptNum, resp) +} + // Compile-time assertion that redactedLeveledLogger implements the LeveledLogger // interface from hashicorp/go-retryablehttp. Without this, a change to the library's // interface would only be caught at runtime when the logger is first used, because diff --git a/pkg/ethutil/client_test.go b/pkg/ethutil/client_test.go index e2b23b3bf..0a4ab07bb 100644 --- a/pkg/ethutil/client_test.go +++ b/pkg/ethutil/client_test.go @@ -10,9 +10,11 @@ import ( "fmt" "io" "log/slog" + "math" "net/http" "net/http/httptest" "net/url" + "strconv" "testing" "time" @@ -232,6 +234,145 @@ func TestRedactEndpointFromError(t *testing.T) { }) } +func TestParseRetryAfterHeader(t *testing.T) { + maxDuration := time.Duration(math.MaxInt64) + overflowSeconds := strconv.FormatInt(int64(maxDuration/time.Second)+1, 10) + pastDate := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC1123) + + tests := []struct { + name string + headers []string + want time.Duration + wantOK bool + }{ + { + name: "missing header", + wantOK: false, + }, + { + name: "empty header", + headers: []string{""}, + wantOK: false, + }, + { + name: "invalid header", + headers: []string{"not-a-date-or-number"}, + wantOK: false, + }, + { + name: "negative seconds", + headers: []string{"-1"}, + wantOK: false, + }, + { + name: "positive seconds", + headers: []string{"3"}, + want: 3 * time.Second, + wantOK: true, + }, + { + name: "seconds overflow saturates", + headers: []string{overflowSeconds}, + want: maxDuration, + wantOK: true, + }, + { + name: "past date", + headers: []string{pastDate}, + want: 0, + wantOK: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, ok := parseRetryAfterHeader(tt.headers) + require.Equal(t, tt.wantOK, ok) + require.Equal(t, tt.want, got) + }) + } +} + +func TestRetryBackoff(t *testing.T) { + response := func(status int, retryAfter string) *http.Response { + resp := &http.Response{ + StatusCode: status, + Header: http.Header{}, + } + if retryAfter != "" { + resp.Header.Set("Retry-After", retryAfter) + } + return resp + } + + maxDuration := time.Duration(math.MaxInt64) + overflowSeconds := strconv.FormatInt(int64(maxDuration/time.Second)+1, 10) + + tests := []struct { + name string + minDuration time.Duration + maxDuration time.Duration + attemptNum int + resp *http.Response + want time.Duration + }{ + { + name: "429 retry after is capped by max duration", + minDuration: 100 * time.Millisecond, + maxDuration: 3 * time.Second, + attemptNum: 1, + resp: response(http.StatusTooManyRequests, "10"), + want: 3 * time.Second, + }, + { + name: "503 retry after below max is honored", + minDuration: 100 * time.Millisecond, + maxDuration: 3 * time.Second, + attemptNum: 1, + resp: response(http.StatusServiceUnavailable, "2"), + want: 2 * time.Second, + }, + { + name: "overflowing retry after is capped by max duration", + minDuration: 100 * time.Millisecond, + maxDuration: 3 * time.Second, + attemptNum: 1, + resp: response(http.StatusTooManyRequests, overflowSeconds), + want: 3 * time.Second, + }, + { + name: "invalid retry after falls back to exponential backoff", + minDuration: 100 * time.Millisecond, + maxDuration: 3 * time.Second, + attemptNum: 1, + resp: response(http.StatusTooManyRequests, "-1"), + want: 200 * time.Millisecond, + }, + { + name: "retry after is ignored for non retry-after status", + minDuration: 100 * time.Millisecond, + maxDuration: 3 * time.Second, + attemptNum: 1, + resp: response(http.StatusInternalServerError, "2"), + want: 200 * time.Millisecond, + }, + { + name: "exponential overflow is capped by max duration", + minDuration: time.Second, + maxDuration: 3 * time.Second, + attemptNum: 100, + want: 3 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := retryBackoff(tt.minDuration, tt.maxDuration, tt.attemptNum, tt.resp) + require.Equal(t, tt.want, got) + }) + } +} + func TestNewEthClientRequestTimeout(t *testing.T) { const requestTimeout = 25 * time.Millisecond From c6759b802b3cfef7abf25a10e3209b79d7368bbd Mon Sep 17 00:00:00 2001 From: Victor Fusco <1221933+vfusco@users.noreply.github.com> Date: Tue, 16 Jun 2026 15:59:34 -0300 Subject: [PATCH 3/4] fix(claimer): use blockchain HTTP request timeout --- cmd/cartesi-rollups-claimer/root/root.go | 11 ++++++----- cmd/cartesi-rollups-node/root/root.go | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cmd/cartesi-rollups-claimer/root/root.go b/cmd/cartesi-rollups-claimer/root/root.go index d0a98b39a..0aafe79f0 100644 --- a/cmd/cartesi-rollups-claimer/root/root.go +++ b/cmd/cartesi-rollups-claimer/root/root.go @@ -21,7 +21,7 @@ var ( logLevel string logColor bool defaultBlockString string - blockchainHttpEndpoint string + blockchainHTTPEndpoint string databaseConnection string pollInterval string maxStartupTime string @@ -54,7 +54,7 @@ func init() { "tint the logs (colored output)") cli.AddFlagStrVar(flags, &databaseConnection, "database-connection", config.DATABASE_CONNECTION, "Database connection string in the URL format\n(eg.: 'postgres://user:password@hostname:port/database') ") - cli.AddFlagStrVar(flags, &blockchainHttpEndpoint, "blockchain-http-endpoint", config.BLOCKCHAIN_HTTP_ENDPOINT, + cli.AddFlagStrVar(flags, &blockchainHTTPEndpoint, "blockchain-http-endpoint", config.BLOCKCHAIN_HTTP_ENDPOINT, "Blockchain http endpoint") cli.AddFlagStrVar(flags, &pollInterval, "poll-interval", config.CLAIMER_POLLING_INTERVAL, "Poll interval") @@ -100,9 +100,10 @@ func run(cmd *cobra.Command, args []string) { createInfo.EthConn, err = ethutil.NewEthClient( ctx, cfg.BlockchainHttpEndpoint.Raw(), logger, ethutil.RetryConfig{ - MaxRetries: cfg.BlockchainHttpMaxRetries, - RetryMinWait: cfg.BlockchainHttpRetryMinWait, - RetryMaxWait: cfg.BlockchainHttpRetryMaxWait, + MaxRetries: cfg.BlockchainHttpMaxRetries, + RetryMinWait: cfg.BlockchainHttpRetryMinWait, + RetryMaxWait: cfg.BlockchainHttpRetryMaxWait, + RequestTimeout: cfg.BlockchainHttpRequestTimeout, }, authOpt) cli.CheckErr(logger, err) diff --git a/cmd/cartesi-rollups-node/root/root.go b/cmd/cartesi-rollups-node/root/root.go index c17bbefd3..55f022c32 100644 --- a/cmd/cartesi-rollups-node/root/root.go +++ b/cmd/cartesi-rollups-node/root/root.go @@ -169,7 +169,7 @@ func run(cmd *cobra.Command, args []string) { createInfo.ReaderClient, err = newEthClient(ctx, config.ServiceEvmReader, cfg.BlockchainHttpRequestTimeout) cli.CheckErr(logger, err) - createInfo.ClaimerClient, err = newEthClient(ctx, config.ServiceClaimer, 0) + createInfo.ClaimerClient, err = newEthClient(ctx, config.ServiceClaimer, cfg.BlockchainHttpRequestTimeout) cli.CheckErr(logger, err) createInfo.PrtClient, err = newEthClient(ctx, config.ServicePrt, 0) From b9408d22865f11b12c150837898a5dd833c8976b Mon Sep 17 00:00:00 2001 From: Victor Fusco <1221933+vfusco@users.noreply.github.com> Date: Tue, 16 Jun 2026 16:00:17 -0300 Subject: [PATCH 4/4] fix(prt): use blockchain HTTP request timeout --- cmd/cartesi-rollups-node/root/root.go | 2 +- cmd/cartesi-rollups-prt/root/root.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/cartesi-rollups-node/root/root.go b/cmd/cartesi-rollups-node/root/root.go index 55f022c32..d6422501d 100644 --- a/cmd/cartesi-rollups-node/root/root.go +++ b/cmd/cartesi-rollups-node/root/root.go @@ -172,7 +172,7 @@ func run(cmd *cobra.Command, args []string) { createInfo.ClaimerClient, err = newEthClient(ctx, config.ServiceClaimer, cfg.BlockchainHttpRequestTimeout) cli.CheckErr(logger, err) - createInfo.PrtClient, err = newEthClient(ctx, config.ServicePrt, 0) + createInfo.PrtClient, err = newEthClient(ctx, config.ServicePrt, cfg.BlockchainHttpRequestTimeout) cli.CheckErr(logger, err) createInfo.Repository, err = factory.NewRepositoryFromConnectionString(ctx, cfg.DatabaseConnection.Raw()) diff --git a/cmd/cartesi-rollups-prt/root/root.go b/cmd/cartesi-rollups-prt/root/root.go index e7f13673d..85a2a78e6 100644 --- a/cmd/cartesi-rollups-prt/root/root.go +++ b/cmd/cartesi-rollups-prt/root/root.go @@ -89,9 +89,10 @@ func run(cmd *cobra.Command, args []string) { createInfo.EthClient, err = ethutil.NewEthClient( ctx, cfg.BlockchainHttpEndpoint.Raw(), logger, ethutil.RetryConfig{ - MaxRetries: cfg.BlockchainHttpMaxRetries, - RetryMinWait: cfg.BlockchainHttpRetryMinWait, - RetryMaxWait: cfg.BlockchainHttpRetryMaxWait, + MaxRetries: cfg.BlockchainHttpMaxRetries, + RetryMinWait: cfg.BlockchainHttpRetryMinWait, + RetryMaxWait: cfg.BlockchainHttpRetryMaxWait, + RequestTimeout: cfg.BlockchainHttpRequestTimeout, }, authOpt) cli.CheckErr(logger, err)