Skip to main content

Execution Model Architecture

Transaction Processing Pipeline

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│ Mempool │───▶│ Execution │───▶│ State │
│ Manager │ │ Dispatcher │ │ Updates │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Transaction │ │ SIMD │ │ Storage │
│ Validation │ │ Optimization │ │ Persistence │
└─────────────────┘ └─────────────────┘ └─────────────────┘

Execution Dispatcher

Core Architecture

pub struct ExecutionDispatcher {
pub score_cache: Arc<Mutex<ScoreCache>>, // Thread-safe score cache
pub adaptive_weights: AdaptiveWeights, // Dynamic scheduling
pub simd_processor: SimdProcessor, // SIMD optimization
pub config: DispatcherConfig, // Configuration
}

Scheduling Algorithm

impl ExecutionDispatcher {
pub fn schedule_transactions(
&mut self,
mempool_txs: Vec<MempoolTx>,
signed_txs: Vec<SignedTx>,
) -> (Vec<MempoolTx>, Vec<SignedTx>) {
// 1. Score cache lookup for known transactions
let cached_scores = self.lookup_cached_scores(&mempool_txs);

// 2. SIMD processing for uncached transactions
let computed_scores = self.simd_processor.compute_scores_batch(&uncached_txs);

// 3. Combine cached and computed scores
let all_scores = self.combine_scores(cached_scores, computed_scores);

// 4. Apply adaptive weight scheduling
let weighted_scores = self.adaptive_weights.apply_weights(all_scores);

// 5. Sort and select transactions
self.select_optimal_transactions(mempool_txs, weighted_scores)
}
}

SIMD Optimization

Vectorized Transaction Scoring

#[cfg(target_arch = "x86_64")]
#[target_feature(enable = "avx2", enable = "fma")]
unsafe fn compute_score_simd_batch(
fees: &[f64],
classes: &[TxClass],
) -> Vec<f64> {
assert_eq!(fees.len(), classes.len(),
"SIMD input arrays must have same length: fees={}, classes={}",
fees.len(), classes.len());

if fees.len() < SIMD_THRESHOLD {
return compute_score_scalar_batch(fees, classes);
}

let mut results = vec![0.0; fees.len()];
let chunks = fees.len().simd_chunks();

for (chunk_idx, (fee_chunk, class_chunk)) in chunks.enumerate() {
let base_idx = chunk_idx * 8;

// Load 8 transactions into SIMD registers
let fee_vec = _mm256_loadu_pd(fee_chunk.as_ptr() as *const f64);
let class_vec = _mm256_loadu_pd(class_chunk.as_ptr() as *const f64);

// Compute class priorities
let priorities = compute_class_priorities_simd(class_vec);

// Calculate scores: fee * class_priority
let scores = _mm256_mul_pd(fee_vec, priorities);

// Store results
_mm256_storeu_pd(results.as_mut_ptr().add(base_idx) as *mut f64, scores);
}

// Handle remaining elements with scalar fallback
let remaining_start = chunks.remainder_len_start();
for i in remaining_start..fees.len() {
results[i] = compute_score_scalar(fees[i], classes[i]);
}

results
}

Performance Characteristics

  • SIMD Threshold: 32 transactions minimum
  • Vector Width: 4 transactions per AVX2 register
  • Speedup: 1.28x for large batches
  • Fallback: Scalar processing for small batches

Cross-Platform Support

pub fn compute_score_batch(
&self,
fees: &[f64],
classes: &[TxClass],
) -> Vec<f64> {
#[cfg(target_arch = "x86_64")]
{
if is_x86_feature_detected!("avx2") && is_x86_feature_detected!("fma") {
return unsafe { compute_score_simd_batch(fees, classes) };
}
}

#[cfg(target_arch = "aarch64")]
{
if is_aarch64_feature_detected!("neon") {
return unsafe { compute_score_neon_batch(fees, classes) };
}
}

// Scalar fallback for all other architectures
self.compute_score_scalar_batch(fees, classes)
}

Score Cache System

Cache Architecture

pub struct ScoreCache {
cache: HashMap<(u64, TxClass), CacheEntry>, // (sender_id, class) → score
max_size: usize, // Maximum cache entries
ttl: Duration, // Time-to-live
hits: AtomicU64, // Cache hit counter
misses: AtomicU64, // Cache miss counter
}

pub struct CacheEntry {
pub score: f64, // Cached score
pub timestamp: Instant, // Cache timestamp
pub block_height: u64, // Height when cached
}

Cache Operations

impl ScoreCache {
pub fn get_cached_score(&self, sender_id: u64, class: TxClass) -> Option<f64> {
let key = (sender_id, class);

if let Some(entry) = self.cache.get(&key) {
// Check TTL expiration
if entry.timestamp.elapsed() < self.ttl {
self.hits.fetch_add(1, Ordering::Relaxed);
return Some(entry.score);
} else {
// Remove expired entry
self.cache.remove(&key);
}
}

self.misses.fetch_add(1, Ordering::Relaxed);
None
}

pub fn cache_score(&mut self, sender_id: u64, class: TxClass, score: f64, block_height: u64) {
let key = (sender_id, class);
let entry = CacheEntry {
score,
timestamp: Instant::now(),
block_height,
};

// LRU eviction if cache is full
if self.cache.len() >= self.max_size {
self.evict_oldest();
}

self.cache.insert(key, entry);
}
}

Cache Performance Metrics

pub struct CacheStats {
pub hits: u64, // Cache hits
pub misses: u64, // Cache misses
pub hit_rate: f64, // Hit rate (hits / total)
pub size: usize, // Current cache size
pub max_size: usize, // Maximum cache size
}

impl ScoreCache {
pub fn get_stats(&self) -> CacheStats {
let hits = self.hits.load(Ordering::Relaxed);
let misses = self.misses.load(Ordering::Relaxed);
let total = hits + misses;

CacheStats {
hits,
misses,
hit_rate: if total > 0 { hits as f64 / total as f64 } else { 0.0 },
size: self.cache.len(),
max_size: self.max_size,
}
}
}

Adaptive Weight Scheduling

Weight Calculation Algorithm

pub struct AdaptiveWeights {
pub fee_weight: f64, // Fee importance (0.0-1.0)
pub priority_weight: f64, // Priority importance (0.0-1.0)
pub update_interval: u64, // Update frequency (blocks)
pub last_update: u64, // Last update height
}

impl AdaptiveWeights {
pub fn calculate_adaptive_weights(&mut self, mempool_state: &MempoolState) {
// Analyze mempool conditions
let fee_pressure = self.calculate_fee_pressure(mempool_state);
let congestion_level = self.calculate_congestion(mempool_state);

// Adjust weights based on conditions
if fee_pressure > HIGH_FEE_THRESHOLD {
self.fee_weight = (self.fee_weight + 0.1).min(0.8);
self.priority_weight = (1.0 - self.fee_weight).max(0.2);
} else if congestion_level > HIGH_CONGESTION_THRESHOLD {
self.priority_weight = (self.priority_weight + 0.1).min(0.8);
self.fee_weight = (1.0 - self.priority_weight).max(0.2);
}
}

pub fn apply_weights(&self, scores: Vec<f64>) -> Vec<f64> {
scores.iter()
.map(|&score| score * self.fee_weight + score * self.priority_weight)
.collect()
}
}

Mempool State Analysis

pub struct MempoolState {
pub total_txs: usize, // Total transactions
pub avg_fee_rate: f64, // Average fee rate
pub size_bytes: usize, // Mempool size in bytes
pub oldest_tx_age: Duration, // Oldest transaction age
pub class_distribution: BTreeMap<TxClass, usize>, // Class distribution
}

Transaction Execution

Execution Pipeline

pub struct TransactionExecutor {
pub runtime: Runtime, // Contract runtime
pub gas_meter: GasMeter, // Gas measurement
pub state_manager: StateManager, // State management
}

impl TransactionExecutor {
pub fn execute_transaction(
&mut self,
tx: &SignedTx,
current_state: &StateDB,
) -> Result<ExecutionResult, ExecutionError> {
// 1. Validate transaction
self.validate_transaction(tx, current_state)?;

// 2. Setup execution context
let context = ExecutionContext::new(tx, current_state);

// 3. Execute with gas metering
let result = self.runtime.execute_with_gas_limit(
&context,
tx.call_data.clone(),
self.gas_meter.remaining(),
)?;

// 4. Update state
self.state_manager.apply_state_changes(result.state_changes)?;

// 5. Consume gas
self.gas_meter.consume_gas(result.gas_used);

Ok(result)
}
}

Contract Execution Model

pub struct Runtime {
pub call_stack: Vec<CallFrame>, // Call stack
pub overlay_state: BTreeMap<[u8; 32], Account>, // Overlay state
pub event_system: EventSystem, // Event handling
pub depth: u8, // Current call depth
}

impl Runtime {
pub fn execute_contract(
&mut self,
contract_address: [u8; 32],
caller: [u8; 32],
value: u128,
calldata: Vec<u8],
) -> Result<Vec<u8>, ContractError> {
// Check call depth limit
if self.depth >= MAX_CALL_DEPTH {
return Err(ContractError::ExecutionError(
"Maximum call depth exceeded".to_string()
));
}

// Get contract info
let contract = self.get_contract(contract_address)?;

// Create call frame
let frame = CallFrame {
contract_address,
caller,
value,
calldata,
return_data: Vec::new(),
gas_remaining: self.gas_meter.remaining(),
depth: self.depth,
storage_snapshot: self.compute_storage_root(),
};

// Push frame and execute
self.call_stack.push(frame);
self.depth += 1;

let result = self.execute_contract_code(&contract.code);

// Pop frame
self.call_stack.pop();
self.depth -= 1;

result
}
}

Performance Optimization

Batch Processing

pub struct BatchProcessor {
pub batch_size: usize, // Batch size
pub parallel_workers: usize, // Parallel workers
pub processing_queue: VecDeque<TransactionBatch>, // Processing queue
}

impl BatchProcessor {
pub fn process_batch_parallel(
&mut self,
batch: TransactionBatch,
) -> Vec<ExecutionResult> {
use rayon::prelude::*;

batch.transactions
.par_iter()
.map(|tx| self.execute_single_transaction(tx))
.collect()
}
}

Memory Management

pub struct MemoryPool {
pub transaction_pool: Vec<Transaction>, // Transaction pool
pub state_buffer: Vec<u8>, // State buffer
pub result_buffer: Vec<ExecutionResult>, // Result buffer
pub max_memory: usize, // Maximum memory usage
}

impl MemoryPool {
pub fn allocate_transaction(&mut self) -> Option<Transaction> {
if self.transaction_pool.len() < self.max_memory / std::mem::size_of::<Transaction>() {
self.transaction_pool.pop()
} else {
None
}
}

pub fn reset(&mut self) {
self.transaction_pool.clear();
self.state_buffer.clear();
self.result_buffer.clear();
}
}

Error Handling

Execution Errors

pub enum ExecutionError {
InsufficientGas { required: u64, available: u64 },
InvalidNonce { expected: u64, actual: u64 },
InsufficientBalance { required: u128, available: u128 },
ContractNotFound { address: [u8; 32] },
ContractExecutionFailed(String),
StackOverflow { depth: u8, max_depth: u8 },
OutOfMemory { requested: usize, available: usize },
}

Recovery Strategies

impl ExecutionDispatcher {
pub fn handle_execution_error(&mut self, error: ExecutionError) -> RecoveryAction {
match error {
ExecutionError::InsufficientGas { .. } => RecoveryAction::SkipTransaction,
ExecutionError::InvalidNonce { .. } => RecoveryAction::ReorderMempool,
ExecutionError::InsufficientBalance { .. } => RecoveryAction::RemoveTransaction,
ExecutionError::ContractNotFound { .. } => RecoveryAction::SkipTransaction,
ExecutionError::ContractExecutionFailed(_) => RecoveryAction::RetryLater,
ExecutionError::StackOverflow { .. } => RecoveryAction::AbortBatch,
ExecutionError::OutOfMemory { .. } => RecoveryAction::ReduceBatchSize,
}
}
}

Performance Metrics

Execution Metrics

pub struct ExecutionMetrics {
pub transactions_processed: u64, // Total processed
pub average_execution_time: Duration, // Average execution time
pub cache_hit_rate: f64, // Cache hit rate
pub simd_utilization: f64, // SIMD utilization rate
pub gas_consumed: u64, // Total gas consumed
pub errors_encountered: u64, // Error count
}

Performance Targets

  • Transaction Throughput: 10,000+ TPS
  • Average Execution Time: < 100μs per transaction
  • Cache Hit Rate: ≥80%
  • SIMD Utilization: ≥70% for large batches
  • Error Rate: <0.1%

This execution model provides a highly optimized, scalable, and reliable transaction processing system for Savitri Network.