Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a978c2c
feat(structlog): add call frame tracking to identify EVM call contexts
mattevans Jan 14, 2026
3b880bc
test: add comprehensive unit tests for extractCallAddress function
mattevans Jan 14, 2026
fe336bf
test: add unit tests for CREATE/CREATE2 address extraction
mattevans Jan 14, 2026
2e59fcf
style(transaction_processing.go): move comment to line above log to m…
mattevans Jan 14, 2026
a43dac7
refactor(processor): replace receipt-based CREATE address lookup with…
mattevans Jan 14, 2026
b74db36
fix(call_tracker): align root frame depth with EVM traces (depth 1)
mattevans Jan 14, 2026
cbc6942
feat(structlog): add GasSelf field to isolate CALL/CREATE overhead fr…
mattevans Jan 15, 2026
618519b
test(structlog): fix and expand address extraction tests for CALL opc…
mattevans Jan 19, 2026
0308dbc
WIP: current changes
mattevans Jan 21, 2026
54994f3
fix(structlog): correct EOA call detection to prevent phantom synthet…
mattevans Jan 21, 2026
4c633d2
chore: accidental commit
mattevans Jan 21, 2026
33528d0
refactor: remove unused ParityTrace types and helper from execution p…
mattevans Jan 21, 2026
14d2d34
feat: add Node interface and EmbeddedNode for library embedding
Savid Jan 22, 2026
b796d44
refactor: abstract execution types to remove CGO dependency
Savid Jan 22, 2026
d78f1e1
refactor(structlog): add embedded-mode support to eliminate 99% of st…
mattevans Jan 22, 2026
f535991
perf(embedded_node): force DisableStack=true in DebugTraceTransaction…
mattevans Jan 22, 2026
4d3f78b
feat: add support for pre-computed GasUsed values from embedded tracer
mattevans Jan 22, 2026
237eba7
style(transaction_processing.go): add blank line after batch slice cr…
mattevans Jan 22, 2026
43e1b54
Merge branch 'master' into feat/call-frames
mattevans Jan 22, 2026
1ced807
Merge pull request #46 from ethpandaops/feat/embedded-node-interface-…
mattevans Jan 22, 2026
aa1b063
Merge branch 'master' into feat/embedded-node-interface
mattevans Jan 22, 2026
63f256b
refactor: change ChainID return type from int32 to int64
mattevans Jan 22, 2026
759d6eb
test(embedded_node_test.go): remove obsolete TestEmbeddedNode_Delegat…
mattevans Jan 23, 2026
c583e0a
Merge branch 'feat/call-frames' into feat/embedded-node-interface
mattevans Jan 23, 2026
0f7509b
fix(pool): register OnReady callbacks before spawning goroutines
mattevans Jan 23, 2026
23bb5a5
refactor(embedded_node): add debug logging for OnReady callback execu…
mattevans Jan 23, 2026
88b9ea4
fix(transaction_processing.go): extract pre-computed GasUsed values f…
mattevans Jan 23, 2026
1b73163
test(gas_cost_test.go): add unit tests for hasPrecomputedGasUsed func…
mattevans Jan 23, 2026
baa62fe
merge latest master
Savid Jan 23, 2026
e8d972b
lint
Savid Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 91 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ A distributed system for processing Ethereum execution layer data with support f
### Core Components

- **Ethereum Nodes**: Configure execution node endpoints
- **Redis**: Task queue and leader election coordination
- **Redis**: Task queue and leader election coordination
- **State Manager**: Track processing progress in ClickHouse
- **Processors**: Configure structlog extraction settings

Expand All @@ -57,6 +57,93 @@ A distributed system for processing Ethereum execution layer data with support f
└─────────────────────────────────────────┘
```

## Embedded Mode (Library Usage)

The execution-processor can be embedded as a library within an execution client, providing direct data access without JSON-RPC overhead.

### Implementing DataSource

```go
import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
)

type MyDataSource struct {
client *MyExecutionClient
}

func (ds *MyDataSource) BlockNumber(ctx context.Context) (*uint64, error) {
num := ds.client.CurrentBlock()
return &num, nil
}

func (ds *MyDataSource) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
return ds.client.GetBlock(number), nil
}

func (ds *MyDataSource) BlockReceipts(ctx context.Context, number *big.Int) ([]*types.Receipt, error) {
return ds.client.GetBlockReceipts(number), nil
}

func (ds *MyDataSource) TransactionReceipt(ctx context.Context, hash string) (*types.Receipt, error) {
return ds.client.GetReceipt(hash), nil
}

func (ds *MyDataSource) DebugTraceTransaction(
ctx context.Context,
hash string,
blockNumber *big.Int,
opts execution.TraceOptions,
) (*execution.TraceTransaction, error) {
return ds.client.TraceTransaction(hash, opts), nil
}

func (ds *MyDataSource) ChainID() int64 {
return ds.client.ChainID()
}

func (ds *MyDataSource) ClientType() string {
return "my-client/1.0.0"
}

func (ds *MyDataSource) IsSynced() bool {
return ds.client.IsSynced()
}
```

### Creating an Embedded Pool

```go
import (
"github.com/ethpandaops/execution-processor/pkg/ethereum"
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
)

// Create embedded node with your data source
dataSource := &MyDataSource{client: myClient}
node := execution.NewEmbeddedNode(log, "embedded", dataSource)

// Create pool with the embedded node
pool := ethereum.NewPoolWithNodes(log, "processor", []execution.Node{node}, nil)
pool.Start(ctx)

// Mark ready when your client is synced and ready to serve data
node.MarkReady(ctx)
```

### Embedded vs RPC Mode

| Aspect | RPC Mode | Embedded Mode |
|--------|----------|---------------|
| Data Access | JSON-RPC over HTTP | Direct function calls |
| Readiness | Auto-detected via RPC health checks | Host calls MarkReady() |
| Performance | Network + serialization overhead | Zero serialization overhead |
| Use Case | External execution clients | Library integration |

## Manual Block Queue API

The execution processor provides an HTTP API for manually queuing blocks for reprocessing. This is useful for fixing data issues or reprocessing specific blocks.
Expand All @@ -80,7 +167,7 @@ curl -X POST http://localhost:8080/api/v1/queue/block/transaction_structlog/1234
"status": "queued",
"block_number": 12345,
"processor": "transaction_structlog",
"queue": "process:forwards",
"queue": "process:forwards",
"transaction_count": 150,
"tasks_created": 150
}
Expand Down Expand Up @@ -158,7 +245,7 @@ curl -X POST http://localhost:8080/api/v1/queue/blocks/transaction_structlog \
# Run tests
go test ./...

# Run with race detector
# Run with race detector
go test ./... --race

# Build
Expand All @@ -167,4 +254,4 @@ go build .

## License

See LICENSE file.
See LICENSE file.
84 changes: 84 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Package config provides configuration types for execution-processor.
// This package is designed to be imported without pulling in go-ethereum dependencies,
// making it suitable for embedded mode integrations.
package config

import (
"fmt"
"time"

"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
"github.com/ethpandaops/execution-processor/pkg/processor"
"github.com/ethpandaops/execution-processor/pkg/redis"
"github.com/ethpandaops/execution-processor/pkg/state"
)

// EthereumConfig is the ethereum network configuration.
// This is a copy of ethereum.Config to avoid importing pkg/ethereum
// which would pull in go-ethereum dependencies.
type EthereumConfig struct {
// Execution configuration
Execution []*execution.Config `yaml:"execution"`
// Override network name for custom networks (bypasses networkMap)
OverrideNetworkName *string `yaml:"overrideNetworkName"`
}

// Validate validates the ethereum configuration.
func (c *EthereumConfig) Validate() error {
for i, exec := range c.Execution {
if err := exec.Validate(); err != nil {
return fmt.Errorf("invalid execution configuration at index %d: %w", i, err)
}
}

return nil
}

// Config is the main configuration for execution-processor.
type Config struct {
// MetricsAddr is the address to listen on for metrics.
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
// HealthCheckAddr is the address to listen on for healthcheck.
HealthCheckAddr *string `yaml:"healthCheckAddr"`
// PProfAddr is the address to listen on for pprof.
PProfAddr *string `yaml:"pprofAddr"`
// APIAddr is the address to listen on for the API server.
APIAddr *string `yaml:"apiAddr"`
// LoggingLevel is the logging level to use.
LoggingLevel string `yaml:"logging" default:"info"`
// Ethereum is the ethereum network configuration.
Ethereum EthereumConfig `yaml:"ethereum"`
// Redis is the redis configuration.
Redis *redis.Config `yaml:"redis"`
// StateManager is the state manager configuration.
StateManager state.Config `yaml:"stateManager"`
// Processors is the processor configuration.
Processors processor.Config `yaml:"processors"`
// ShutdownTimeout is the timeout for shutting down the server.
ShutdownTimeout time.Duration `yaml:"shutdownTimeout" default:"10s"`
}

// Validate validates the configuration.
func (c *Config) Validate() error {
if c.Redis == nil {
return fmt.Errorf("redis configuration is required")
}

if err := c.Redis.Validate(); err != nil {
return fmt.Errorf("invalid redis configuration: %w", err)
}

if err := c.Ethereum.Validate(); err != nil {
return fmt.Errorf("invalid ethereum configuration: %w", err)
}

if err := c.StateManager.Validate(); err != nil {
return fmt.Errorf("invalid state manager configuration: %w", err)
}

if err := c.Processors.Validate(); err != nil {
return fmt.Errorf("invalid processor configuration: %w", err)
}

return nil
}
136 changes: 136 additions & 0 deletions pkg/ethereum/execution/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package execution

import "math/big"

// Hash represents a 32-byte hash.
type Hash [32]byte

// Hex returns the hex string representation of the hash.
func (h Hash) Hex() string {
return "0x" + encodeHex(h[:])
}

// String returns the hex string representation of the hash.
func (h Hash) String() string {
return h.Hex()
}

// Address represents a 20-byte Ethereum address.
type Address [20]byte

// Hex returns the hex string representation of the address with checksum.
func (a Address) Hex() string {
return "0x" + encodeHex(a[:])
}

// String returns the hex string representation of the address.
func (a Address) String() string {
return a.Hex()
}

// encodeHex encodes bytes as hex string without 0x prefix.
func encodeHex(b []byte) string {
const hexChars = "0123456789abcdef"

result := make([]byte, len(b)*2)

for i, v := range b {
result[i*2] = hexChars[v>>4]
result[i*2+1] = hexChars[v&0x0f]
}

return string(result)
}

// Transaction type constants matching go-ethereum values.
const (
LegacyTxType = 0
AccessListTxType = 1
DynamicFeeTxType = 2
BlobTxType = 3
)

// Block interface defines methods for accessing block data.
// Implementations are provided by data sources (RPC, embedded clients).
type Block interface {
// Number returns the block number.
Number() *big.Int

// Hash returns the block hash.
Hash() Hash

// ParentHash returns the parent block hash.
ParentHash() Hash

// BaseFee returns the base fee per gas (EIP-1559), or nil for pre-London blocks.
BaseFee() *big.Int

// Transactions returns all transactions in the block.
Transactions() []Transaction
}

// Transaction interface defines methods for accessing transaction data.
// The From() method returns the sender address, computed by the data source
// using its own crypto implementation (avoiding go-ethereum crypto imports).
type Transaction interface {
// Hash returns the transaction hash.
Hash() Hash

// Type returns the transaction type (0=legacy, 1=access list, 2=dynamic fee, 3=blob).
Type() uint8

// To returns the recipient address, or nil for contract creation.
To() *Address

// From returns the sender address.
// This is computed by the data source using types.Sender() or equivalent.
From() Address

// Nonce returns the sender account nonce.
Nonce() uint64

// Gas returns the gas limit.
Gas() uint64

// GasPrice returns the gas price (for legacy transactions).
GasPrice() *big.Int

// GasTipCap returns the max priority fee per gas (EIP-1559).
GasTipCap() *big.Int

// GasFeeCap returns the max fee per gas (EIP-1559).
GasFeeCap() *big.Int

// Value returns the value transferred in wei.
Value() *big.Int

// Data returns the input data (calldata).
Data() []byte

// Size returns the encoded transaction size in bytes.
Size() uint64

// ChainId returns the chain ID, or nil for legacy transactions.
ChainId() *big.Int

// BlobGas returns the blob gas used (for blob transactions).
BlobGas() uint64

// BlobGasFeeCap returns the max blob fee per gas (for blob transactions).
BlobGasFeeCap() *big.Int

// BlobHashes returns the versioned hashes (for blob transactions).
BlobHashes() []Hash
}

// Receipt interface defines methods for accessing transaction receipt data.
type Receipt interface {
// Status returns the transaction status (1=success, 0=failure).
Status() uint64

// TxHash returns the transaction hash.
TxHash() Hash

// GasUsed returns the gas used by the transaction.
GasUsed() uint64
}
Loading