Execution Model Architecture
Transaction Processing Pipeline
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Mempool │───▶│ Execution │───▶│ State │
│ Manager │ │ Dispatcher │ │ Updates │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Transaction │ │ SIMD │ │ Storage │
│ Validation │ │ Optimization │ │ Persistence │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Execution Dispatcher
Core Architecture
pub struct ExecutionDispatcher {
pub score_cache: Arc<Mutex<ScoreCache>>, // Thread-safe score cache
pub adaptive_weights: AdaptiveWeights, // Dynamic scheduling
pub simd_processor: SimdProcessor, // SIMD optimization
pub config: DispatcherConfig, // Configuration
}
Scheduling Algorithm
impl ExecutionDispatcher {
pub fn schedule_transactions(
&mut self,
mempool_txs: Vec<MempoolTx>,
signed_txs: Vec<SignedTx>,
) -> (Vec<MempoolTx>, Vec<SignedTx>) {
// 1. Score cache lookup for known transactions
let cached_scores = self.lookup_cached_scores(&mempool_txs);
// 2. SIMD processing for uncached transactions
let computed_scores = self.simd_processor.compute_scores_batch(&uncached_txs);
// 3. Combine cached and computed scores
let all_scores = self.combine_scores(cached_scores, computed_scores);
// 4. Apply adaptive weight scheduling
let weighted_scores = self.adaptive_weights.apply_weights(all_scores);
// 5. Sort and select transactions
self.select_optimal_transactions(mempool_txs, weighted_scores)
}
}
SIMD Optimization
Vectorized Transaction Scoring
#[cfg(target_arch = "x86_64")]
#[target_feature(enable = "avx2", enable = "fma")]
unsafe fn compute_score_simd_batch(
fees: &[f64],
classes: &[TxClass],
) -> Vec<f64> {
assert_eq!(fees.len(), classes.len(),
"SIMD input arrays must have same length: fees={}, classes={}",
fees.len(), classes.len());
if fees.len() < SIMD_THRESHOLD {
return compute_score_scalar_batch(fees, classes);
}
let mut results = vec![0.0; fees.len()];
let chunks = fees.len().simd_chunks();
for (chunk_idx, (fee_chunk, class_chunk)) in chunks.enumerate() {
let base_idx = chunk_idx * 8;
// Load 8 transactions into SIMD registers
let fee_vec = _mm256_loadu_pd(fee_chunk.as_ptr() as *const f64);
let class_vec = _mm256_loadu_pd(class_chunk.as_ptr() as *const f64);
// Compute class priorities
let priorities = compute_class_priorities_simd(class_vec);
// Calculate scores: fee * class_priority
let scores = _mm256_mul_pd(fee_vec, priorities);
// Store results
_mm256_storeu_pd(results.as_mut_ptr().add(base_idx) as *mut f64, scores);
}
// Handle remaining elements with scalar fallback
let remaining_start = chunks.remainder_len_start();
for i in remaining_start..fees.len() {
results[i] = compute_score_scalar(fees[i], classes[i]);
}
results
}
Performance Characteristics
- SIMD Threshold: 32 transactions minimum
- Vector Width: 4 transactions per AVX2 register
- Speedup: 1.28x for large batches
- Fallback: Scalar processing for small batches
Cross-Platform Support
pub fn compute_score_batch(
&self,
fees: &[f64],
classes: &[TxClass],
) -> Vec<f64> {
#[cfg(target_arch = "x86_64")]
{
if is_x86_feature_detected!("avx2") && is_x86_feature_detected!("fma") {
return unsafe { compute_score_simd_batch(fees, classes) };
}
}
#[cfg(target_arch = "aarch64")]
{
if is_aarch64_feature_detected!("neon") {
return unsafe { compute_score_neon_batch(fees, classes) };
}
}
// Scalar fallback for all other architectures
self.compute_score_scalar_batch(fees, classes)
}
Score Cache System
Cache Architecture
pub struct ScoreCache {
cache: HashMap<(u64, TxClass), CacheEntry>, // (sender_id, class) → score
max_size: usize, // Maximum cache entries
ttl: Duration, // Time-to-live
hits: AtomicU64, // Cache hit counter
misses: AtomicU64, // Cache miss counter
}
pub struct CacheEntry {
pub score: f64, // Cached score
pub timestamp: Instant, // Cache timestamp
pub block_height: u64, // Height when cached
}
Cache Operations
impl ScoreCache {
pub fn get_cached_score(&self, sender_id: u64, class: TxClass) -> Option<f64> {
let key = (sender_id, class);
if let Some(entry) = self.cache.get(&key) {
// Check TTL expiration
if entry.timestamp.elapsed() < self.ttl {
self.hits.fetch_add(1, Ordering::Relaxed);
return Some(entry.score);
} else {
// Remove expired entry
self.cache.remove(&key);
}
}
self.misses.fetch_add(1, Ordering::Relaxed);
None
}
pub fn cache_score(&mut self, sender_id: u64, class: TxClass, score: f64, block_height: u64) {
let key = (sender_id, class);
let entry = CacheEntry {
score,
timestamp: Instant::now(),
block_height,
};
// LRU eviction if cache is full
if self.cache.len() >= self.max_size {
self.evict_oldest();
}
self.cache.insert(key, entry);
}
}
Cache Performance Metrics
pub struct CacheStats {
pub hits: u64, // Cache hits
pub misses: u64, // Cache misses
pub hit_rate: f64, // Hit rate (hits / total)
pub size: usize, // Current cache size
pub max_size: usize, // Maximum cache size
}
impl ScoreCache {
pub fn get_stats(&self) -> CacheStats {
let hits = self.hits.load(Ordering::Relaxed);
let misses = self.misses.load(Ordering::Relaxed);
let total = hits + misses;
CacheStats {
hits,
misses,
hit_rate: if total > 0 { hits as f64 / total as f64 } else { 0.0 },
size: self.cache.len(),
max_size: self.max_size,
}
}
}
Adaptive Weight Scheduling
Weight Calculation Algorithm
pub struct AdaptiveWeights {
pub fee_weight: f64, // Fee importance (0.0-1.0)
pub priority_weight: f64, // Priority importance (0.0-1.0)
pub update_interval: u64, // Update frequency (blocks)
pub last_update: u64, // Last update height
}
impl AdaptiveWeights {
pub fn calculate_adaptive_weights(&mut self, mempool_state: &MempoolState) {
// Analyze mempool conditions
let fee_pressure = self.calculate_fee_pressure(mempool_state);
let congestion_level = self.calculate_congestion(mempool_state);
// Adjust weights based on conditions
if fee_pressure > HIGH_FEE_THRESHOLD {
self.fee_weight = (self.fee_weight + 0.1).min(0.8);
self.priority_weight = (1.0 - self.fee_weight).max(0.2);
} else if congestion_level > HIGH_CONGESTION_THRESHOLD {
self.priority_weight = (self.priority_weight + 0.1).min(0.8);
self.fee_weight = (1.0 - self.priority_weight).max(0.2);
}
}
pub fn apply_weights(&self, scores: Vec<f64>) -> Vec<f64> {
scores.iter()
.map(|&score| score * self.fee_weight + score * self.priority_weight)
.collect()
}
}
Mempool State Analysis
pub struct MempoolState {
pub total_txs: usize, // Total transactions
pub avg_fee_rate: f64, // Average fee rate
pub size_bytes: usize, // Mempool size in bytes
pub oldest_tx_age: Duration, // Oldest transaction age
pub class_distribution: BTreeMap<TxClass, usize>, // Class distribution
}
Transaction Execution
Execution Pipeline
pub struct TransactionExecutor {
pub runtime: Runtime, // Contract runtime
pub gas_meter: GasMeter, // Gas measurement
pub state_manager: StateManager, // State management
}
impl TransactionExecutor {
pub fn execute_transaction(
&mut self,
tx: &SignedTx,
current_state: &StateDB,
) -> Result<ExecutionResult, ExecutionError> {
// 1. Validate transaction
self.validate_transaction(tx, current_state)?;
// 2. Setup execution context
let context = ExecutionContext::new(tx, current_state);
// 3. Execute with gas metering
let result = self.runtime.execute_with_gas_limit(
&context,
tx.call_data.clone(),
self.gas_meter.remaining(),
)?;
// 4. Update state
self.state_manager.apply_state_changes(result.state_changes)?;
// 5. Consume gas
self.gas_meter.consume_gas(result.gas_used);
Ok(result)
}
}
Contract Execution Model
pub struct Runtime {
pub call_stack: Vec<CallFrame>, // Call stack
pub overlay_state: BTreeMap<[u8; 32], Account>, // Overlay state
pub event_system: EventSystem, // Event handling
pub depth: u8, // Current call depth
}
impl Runtime {
pub fn execute_contract(
&mut self,
contract_address: [u8; 32],
caller: [u8; 32],
value: u128,
calldata: Vec<u8],
) -> Result<Vec<u8>, ContractError> {
// Check call depth limit
if self.depth >= MAX_CALL_DEPTH {
return Err(ContractError::ExecutionError(
"Maximum call depth exceeded".to_string()
));
}
// Get contract info
let contract = self.get_contract(contract_address)?;
// Create call frame
let frame = CallFrame {
contract_address,
caller,
value,
calldata,
return_data: Vec::new(),
gas_remaining: self.gas_meter.remaining(),
depth: self.depth,
storage_snapshot: self.compute_storage_root(),
};
// Push frame and execute
self.call_stack.push(frame);
self.depth += 1;
let result = self.execute_contract_code(&contract.code);
// Pop frame
self.call_stack.pop();
self.depth -= 1;
result
}
}
Performance Optimization
Batch Processing
pub struct BatchProcessor {
pub batch_size: usize, // Batch size
pub parallel_workers: usize, // Parallel workers
pub processing_queue: VecDeque<TransactionBatch>, // Processing queue
}
impl BatchProcessor {
pub fn process_batch_parallel(
&mut self,
batch: TransactionBatch,
) -> Vec<ExecutionResult> {
use rayon::prelude::*;
batch.transactions
.par_iter()
.map(|tx| self.execute_single_transaction(tx))
.collect()
}
}
Memory Management
pub struct MemoryPool {
pub transaction_pool: Vec<Transaction>, // Transaction pool
pub state_buffer: Vec<u8>, // State buffer
pub result_buffer: Vec<ExecutionResult>, // Result buffer
pub max_memory: usize, // Maximum memory usage
}
impl MemoryPool {
pub fn allocate_transaction(&mut self) -> Option<Transaction> {
if self.transaction_pool.len() < self.max_memory / std::mem::size_of::<Transaction>() {
self.transaction_pool.pop()
} else {
None
}
}
pub fn reset(&mut self) {
self.transaction_pool.clear();
self.state_buffer.clear();
self.result_buffer.clear();
}
}
Error Handling
Execution Errors
pub enum ExecutionError {
InsufficientGas { required: u64, available: u64 },
InvalidNonce { expected: u64, actual: u64 },
InsufficientBalance { required: u128, available: u128 },
ContractNotFound { address: [u8; 32] },
ContractExecutionFailed(String),
StackOverflow { depth: u8, max_depth: u8 },
OutOfMemory { requested: usize, available: usize },
}
Recovery Strategies
impl ExecutionDispatcher {
pub fn handle_execution_error(&mut self, error: ExecutionError) -> RecoveryAction {
match error {
ExecutionError::InsufficientGas { .. } => RecoveryAction::SkipTransaction,
ExecutionError::InvalidNonce { .. } => RecoveryAction::ReorderMempool,
ExecutionError::InsufficientBalance { .. } => RecoveryAction::RemoveTransaction,
ExecutionError::ContractNotFound { .. } => RecoveryAction::SkipTransaction,
ExecutionError::ContractExecutionFailed(_) => RecoveryAction::RetryLater,
ExecutionError::StackOverflow { .. } => RecoveryAction::AbortBatch,
ExecutionError::OutOfMemory { .. } => RecoveryAction::ReduceBatchSize,
}
}
}
Performance Metrics
Execution Metrics
pub struct ExecutionMetrics {
pub transactions_processed: u64, // Total processed
pub average_execution_time: Duration, // Average execution time
pub cache_hit_rate: f64, // Cache hit rate
pub simd_utilization: f64, // SIMD utilization rate
pub gas_consumed: u64, // Total gas consumed
pub errors_encountered: u64, // Error count
}
Performance Targets
- Transaction Throughput: 10,000+ TPS
- Average Execution Time: < 100μs per transaction
- Cache Hit Rate: ≥80%
- SIMD Utilization: ≥70% for large batches
- Error Rate: <0.1%
This execution model provides a highly optimized, scalable, and reliable transaction processing system for Savitri Network.