Production-Grade TypeScript Framework

Stability-First
Execution Engine

Build resilient workflows, fault-tolerant API integrations, and intelligent batch processing with deterministic error handling, comprehensive observability, and full type safety.

Get Started

Install the framework and start building resilient applications in minutes.

npm
npm i @emmvish/stable-infra
yarn
yarn add @emmvish/stable-infra
pnpm
pnpm add @emmvish/stable-infra

Stable Runner (Docker)

Run config-driven jobs using the built-in runner. Create a config file, build the image, and execute jobs with automatic re-runs on config changes.

CONFIG
// examples/runner-config.mjs
              
import { RunnerJobs } from '@emmvish/stable-infra';

export default {
  jobId: 'demo-001',
  job: {
    kind: RunnerJobs.STABLE_REQUEST,
    options: {
      reqData: { 
        hostname: 'jsonplaceholder.typicode.com',
        path: '/todos/1'
      },
      resReq: true
    }
  }
};
BUILD
docker build -t stable-infra-runner .
RUN
docker run --rm \
  -e CONFIG_PATH=/app/examples/runner-config.mjs \
  -e OUTPUT_PATH=/app/output/result.json \
  -e MAX_RUNS=1 \
  -v "$(pwd)/examples:/app/examples" \
  -v "$(pwd)/output:/app/output" \
  stable-infra-runner

Eight Core APIs

From single HTTP requests to complex DAG-based workflow orchestration, scheduled execution, and distributed coordination — every execution mode shares the same powerful resilience stack.

stableRequest

A robust HTTP request client with automatic retries, caching, circuit breaking, and rate limiting. Execute any HTTP method with full type safety for request and response data.

Problem Statement

You need to fetch user profile data from an external API that occasionally returns 500 errors, has rate limits, and sometimes returns stale data. You want to cache successful responses, implement smart retries with exponential backoff, and validate that the response contains the expected user schema before accepting it.

TypeScript
import { 
  stableRequest, 
  REQUEST_METHODS, 
  VALID_REQUEST_PROTOCOLS,
  RETRY_STRATEGIES,
  CircuitBreaker
} from '@emmvish/stable-infra';

(async () => {

// Mock authentication token
const token = 'your-auth-token-here';

// Define strongly-typed interfaces for request/response
interface UserProfileRequest {
  userId: string;
  includeActivity?: boolean;
}

interface UserProfile {
  id: string;
  name: string;
  email: string;
  subscription: 'free' | 'pro' | 'enterprise';
  lastActive: string;
}

// Create a shared circuit breaker for user service
const userServiceBreaker = new CircuitBreaker({
  failureThresholdPercentage: 30,
  minimumRequests: 5,
  recoveryTimeoutMs: 30000,
  halfOpenMaxRequests: 2
});

// Execute request with full resilience stack
const result = await stableRequest<UserProfileRequest, UserProfile>({
  // HTTP configuration with type-safe body
  reqData: {
    method: REQUEST_METHODS.POST,
    protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
    hostname: 'api.userservice.com',
    path: '/v2/users/profile',
    headers: { 'Authorization': `Bearer ${token}` },
    body: { userId: 'usr_12345', includeActivity: true },
    timeout: 5000
  },
  
  // Return actual response data (not just success boolean)
  resReq: true,
  
  // Retry configuration with exponential backoff
  attempts: 4,
  wait: 1000,
  maxAllowedWait: 15000,
  retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
  jitter: 200, // Prevent thundering herd
  
  // Shared circuit breaker instance
  circuitBreaker: userServiceBreaker,
  
  // Cache configuration for idempotent reads
  cache: {
    enabled: true,
    ttl: 60000, // 1 minute cache
    keyGenerator: (config) => `user:${config.data?.userId}`
  },
  
  // Response validation - retry if schema doesn't match
  responseAnalyzer: ({ data, commonBuffer }) => {
    const isValid = !!(data?.id && data?.email && data?.subscription);
    if (isValid && commonBuffer) commonBuffer.lastValidUser = data.id;
    return isValid;
  },
  
  // Observability hooks for monitoring
  logAllErrors: true,
  handleErrors: ({ errorLog }) => {
    console.error('User fetch failed', errorLog);
  },
  handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
    console.log(`Fetched user in ${successfulAttemptData.executionTime}ms`);
  },
  
  // Shared state across attempts
  commonBuffer: { requestStart: Date.now() },
  
  // Metrics guardrails for anomaly detection
  metricsGuardrails: {
    request: { totalExecutionTime: { max: 10000 } },
    infrastructure: { circuitBreaker: { failureRate: { max: 0.5 } } }
  }
});

// Type-safe result handling
if (result.success) {
  const user = result.data as UserProfile;
  console.log(`Welcome, ${user.name} (${user.subscription})`);
  console.log(`Metrics: ${result.metrics?.totalAttempts} attempts`);
  console.log(`Cache: ${result.metrics?.infrastructureMetrics?.cache?.hits} hits`);
} else {
  console.error(`Failed: ${result.error}`);
  console.log(`Error logs: ${result.errorLogs?.length}`);
}

})();

StableBuffer

Transactional shared state for workflows, gateways, and schedulers. Serialize updates, capture snapshots, and enforce guardrails on buffer activity to keep shared state deterministic and safe. Ships with ability to replay and restore state from transaction logs.

Problem Statement

You need a reliable shared buffer across concurrent attempts and phases. Updates must be serialized, snapshots should be persisted, and you want guardrails on mutation volume and latency.

TypeScript
import { StableBuffer } from '@emmvish/stable-infra';

type SharedState = {
  workflowId: string;
  processed: number;
  lastCheckpoint?: string;
};

const initialState: SharedState = {
  workflowId: 'sync-001',
  processed: 0
};

const buffer = new StableBuffer({
  initialState: initialState,
  metricsGuardrails: {
    totalTransactions: { max: 1000 },
    averageQueueWaitMs: { max: 250 }
  }
});

// Serialize concurrent updates with a transactional queue
await Promise.all([
  buffer.transaction(async (draft) => {
    const state = draft as SharedState;
    state.processed += 1;
    state.lastCheckpoint = new Date().toISOString();
  }),
  buffer.transaction(async (draft) => {
    const state = draft as SharedState;
    state.processed += 1;
    state.lastCheckpoint = new Date().toISOString();
  }),
  buffer.transaction(async (draft) => {
    const state = draft as SharedState;
    state.processed += 1;
    state.lastCheckpoint = new Date().toISOString();
  })
]);

// Capture a snapshot for persistence or diagnostics
const snapshot = buffer.read() as SharedState;
console.log('Snapshot', snapshot);

// Metrics with guardrail validation
const metrics = buffer.getMetrics();
console.log('Buffer metrics', metrics);

stableFunction

Execute any synchronous or asynchronous function with the full resilience stack — retries, caching, circuit breaking, rate limiting, and comprehensive observability. Perfect for wrapping database operations, external SDK calls, or complex computations.

Problem Statement

You have an expensive ML inference function that occasionally times out, sometimes returns low-confidence results, and has memory constraints requiring concurrency limits. You want to cache results based on input hash, retry on timeouts, and validate confidence scores before accepting predictions.

TypeScript
import { 
  stableFunction, 
  RETRY_STRATEGIES,
  CircuitBreaker 
} from '@emmvish/stable-infra';

(async () => {

// Mock dependencies for demonstration
const imageBuffer = Buffer.from('mock-image-data');
const batchId = 'batch-001';
const hashBuffer = (buf: Buffer) => buf.toString('base64').slice(0, 16);
const metrics = { increment: (key: string) => {}, histogram: (key: string, val: number) => {} };
const logger = { warn: console.warn, info: console.info };
const featureFlags = { get: async (key: string) => ({ inferenceTimeout: 30000 }) };

// Define typed arguments and return type
type InferenceArgs = [
  imageBuffer: Buffer, 
  modelConfig: ModelConfig
];

interface ModelConfig {
  modelId: string;
  threshold: number;
  maxTokens?: number;
}

interface PredictionResult {
  labels: Array<{ name: string; confidence: number }>;
  processingTime: number;
  modelVersion: string;
}

// The expensive ML inference function
async function runInference(
  imageBuffer: Buffer, 
  config: ModelConfig
): Promise<PredictionResult> {
  // Simulate ML inference with mock response
  await new Promise(r => setTimeout(r, 100));
  return {
    labels: [{ name: 'cat', confidence: 0.92 }, { name: 'dog', confidence: 0.08 }],
    processingTime: 95,
    modelVersion: config.modelId
  };
}

// Shared circuit breaker for ML service
const mlBreaker = new CircuitBreaker({
  failureThresholdPercentage: 50,
  minimumRequests: 3,
  recoveryTimeoutMs: 60000,
  halfOpenMaxRequests: 1
});

// Execute with full resilience stack
const result = await stableFunction<InferenceArgs, PredictionResult>({
  // Function and typed arguments
  fn: runInference,
  args: [imageBuffer, { modelId: 'resnet-50', threshold: 0.85 }],
  
  // Return the actual prediction result
  returnResult: true,
  
  // Retry configuration for transient failures
  attempts: 3,
  wait: 2000,
  retryStrategy: RETRY_STRATEGIES.LINEAR,
  jitter: 500,
  
  // Execution timeout to prevent hanging
  executionTimeout: 30000,
  
  // Circuit breaker for fail-fast protection
  circuitBreaker: mlBreaker,
  
  // Concurrency limit (ML models are memory-intensive)
  maxConcurrentRequests: 4,
  
  // Rate limiting for GPU resource management
  rateLimit: { maxRequests: 20, windowMs: 10000 },
  
  // Cache predictions by input hash
  cache: {
    enabled: true,
    ttl: 300000, // 5 minute cache
    keyGenerator: (fn, args) => {
      const [buffer, config] = args;
      return `inference:${config.modelId}:${hashBuffer(buffer)}`;
    }
  },
  
  // Validate confidence scores - retry if too low
  responseAnalyzer: ({ data, args, commonBuffer }) => {
    const [, config] = args;
    const highConfidence = data.labels.some(
      l => l.confidence >= config.threshold
    );
    if (highConfidence && commonBuffer) {
      commonBuffer.topLabel = data.labels[0].name;
    }
    return highConfidence;
  },
  
  // Observability hooks
  logAllErrors: true,
  handleErrors: ({ errorLog }) => {
    metrics.increment('ml.inference.errors');
    logger.warn('Inference failed', { 
      attempt: errorLog.attempt,
      error: errorLog.error 
    });
  },
  logAllSuccessfulAttempts: true,
  handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
    metrics.histogram('ml.inference.latency', 
      successfulAttemptData.executionTime
    );
  },
  
  // Dynamic configuration via pre-execution hook
  preExecution: {
    preExecutionHook: async ({ stableFunctionOptions }) => {
      // Load latest model config from feature flags
      const flags = await featureFlags.get('ml-config');
      return {
        executionTimeout: flags.inferenceTimeout || 30000
      };
    },
    applyPreExecutionConfigOverride: true
  },
  
  // Shared state and context
  commonBuffer: { batchId: batchId },
  executionContext: { workflowId: 'image-classification' },
  
  // Metrics guardrails
  metricsGuardrails: {
    common: { executionTime: { max: 60000 } }
  }
});

// Type-safe result handling
if (result.success) {
  const prediction: PredictionResult = result.data!;
  console.log(`Top label: ${prediction.labels[0].name}`);
  console.log(`Confidence: ${(prediction.labels[0].confidence * 100).toFixed(1)}%`);
  console.log(`Cache status: ${result.metrics?.infrastructureMetrics?.cache}`);
} else {
  console.error(`Inference failed: ${result.error}`);
}

})();

stableApiGateway

Batch orchestration engine for executing multiple HTTP requests and functions together. Features request grouping, configuration cascading (Global → Group → Individual), concurrent/sequential execution, shared state buffers, and racing mode for redundant calls.

Problem Statement

Your e-commerce checkout flow needs to: (1) validate inventory across multiple warehouses in parallel, (2) calculate shipping rates from 3 providers and race for the fastest response, (3) process payment, and (4) send confirmation. Each step has different retry/timeout requirements, and you need aggregate metrics and shared state to pass data between steps.

TypeScript
import { 
  stableApiGateway, 
  REQUEST_METHODS,
  RETRY_STRATEGIES,
  RequestOrFunction,
  CircuitBreaker,
  type API_GATEWAY_ITEM
} from '@emmvish/stable-infra';

(async () => {

// Mock tax rates by state
const stateTaxRates: Record<string, number> = {
  'CA': 0.0725, 'NY': 0.08, 'TX': 0.0625, 'WA': 0.065
};

// Define request body types
interface InventoryRequest { sku: string; quantity: number; }
interface PaymentRequest { amount: number; currency: string; }

// Define response types
interface InventoryResult { available: boolean; warehouse: string; stock: number; }
interface PaymentResult { transactionId: string; status: string; }

// Define function types
type TaxCalcArgs = [subtotal: number, state: string];
type TaxResult = { tax: number; total: number; };

// Union types for mixed gateway items
type RequestBodyUnion = InventoryRequest | PaymentRequest;
type ResponseUnion = InventoryResult | PaymentResult;

// Create circuit breaker for gateway
const gatewayBreaker = new CircuitBreaker({
  failureThresholdPercentage: 50,
  minimumRequests: 5,
  recoveryTimeoutMs: 30000
});

// Build mixed items array with requests and functions (using union types)
const checkoutItems: API_GATEWAY_ITEM<RequestBodyUnion, ResponseUnion, TaxCalcArgs, TaxResult>[] = [
  // Inventory checks (grouped for shared config)
  {
    type: RequestOrFunction.REQUEST,
    request: {
      id: 'inventory-west',
      groupId: 'inventory-checks',
      requestOptions: {
        reqData: {
          hostname: 'warehouse-west.api.com',
          path: '/inventory/check',
          method: REQUEST_METHODS.POST,
          body: { sku: 'PROD-123', quantity: 2 }
        },
        resReq: true
      }
    }
  },
  {
    type: RequestOrFunction.REQUEST,
    request: {
      id: 'inventory-east',
      groupId: 'inventory-checks',
      requestOptions: {
        reqData: {
          hostname: 'warehouse-east.api.com',
          path: '/inventory/check',
          method: REQUEST_METHODS.POST,
          body: { sku: 'PROD-123', quantity: 2 }
        },
        resReq: true
      }
    }
  },
  // Tax calculation function
  {
    type: RequestOrFunction.FUNCTION,
    function: {
      id: 'calculate-tax',
      functionOptions: {
        fn: (subtotal: number, state: string): TaxResult => {
          const rate = stateTaxRates[state] || 0.08;
          return { tax: subtotal * rate, total: subtotal * (1 + rate) };
        },
        args: [149.99, 'CA'] as TaxCalcArgs,
        returnResult: true,
        cache: { enabled: true, ttl: 3600000 } // Cache tax rates
      }
    }
  },
  // Payment processing (critical - more retries)
  {
    type: RequestOrFunction.REQUEST,
    request: {
      id: 'process-payment',
      groupId: 'payment',
      requestOptions: {
        reqData: {
          hostname: 'payments.stripe.com',
          path: '/v1/charges',
          method: REQUEST_METHODS.POST,
          body: { amount: 16199, currency: 'usd' }
        },
        resReq: true,
        attempts: 5, // Override group config
        responseAnalyzer: ({ data }) => (data as PaymentResult)?.status === 'succeeded'
      }
    }
  }
];

// Execute with gateway options (pass empty array for separate functions param to use all generics)
const result = await stableApiGateway<RequestBodyUnion, ResponseUnion, TaxCalcArgs, TaxResult>(checkoutItems, [], {
  // Parallel execution for independent operations
  concurrentExecution: true,
  maxConcurrentRequests: 6,
  
  // Request groups with cascading config
  requestGroups: [
    {
      id: 'inventory-checks',
      commonConfig: {
        commonAttempts: 2,
        commonWait: 500,
        commonRetryStrategy: RETRY_STRATEGIES.FIXED
      }
    },
    {
      id: 'payment',
      commonConfig: {
        commonAttempts: 3,
        commonWait: 2000,
        commonRetryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
        commonJitter: 300
      }
    }
  ],
  
  // Global defaults (lowest priority)
  commonAttempts: 2,
  commonWait: 1000,
  commonLogAllErrors: true,
  
  // Shared state accessible in all items
  sharedBuffer: {
    orderId: 'ORD-98765',
    customerId: 'CUST-12345',
    startTime: Date.now()
  },
  
  // Global rate limiting
  rateLimit: { maxRequests: 50, windowMs: 10000 },
  
  // Circuit breaker for entire gateway
  circuitBreaker: gatewayBreaker,
  
  // Gateway-level timeout
  maxTimeout: 60000,
  
  // Execution context for tracing
  executionContext: { workflowId: 'checkout-flow' },
  
  // Metrics guardrails
  metricsGuardrails: {
    apiGateway: { successRate: { min: 0.9 } }
  }
});

// Access individual results by ID (type-safe with union result type)
const inventoryWest = result.find((r) => r.requestId === 'inventory-west');
const taxResult = result.find((r) => r.requestId === 'calculate-tax');
const payment = result.find((r) => r.requestId === 'process-payment');

// Type-safe result access
if (inventoryWest?.success) {
  const inventory = inventoryWest.data as InventoryResult;
  console.log(`Warehouse: ${inventory.warehouse}, Stock: ${inventory.stock}`);
}

if (taxResult?.success) {
  const tax = taxResult.data as TaxResult;
  console.log(`Tax: $${tax.tax.toFixed(2)}, Total: $${tax.total.toFixed(2)}`);
}

if (payment?.success) {
  const paymentData = payment.data as PaymentResult;
  console.log(`Transaction: ${paymentData.transactionId}, Status: ${paymentData.status}`);
}

// Aggregate metrics
if (result.metrics) {
  console.log(`Success rate: ${result.metrics.successRate.toFixed(1)}%`);
  console.log(`Total time: ${result.metrics.executionTime}ms`);
  console.log(`Throughput: ${result.metrics.throughput.toFixed(2)} items/s`);
  console.log(`Request groups:`, result.metrics.requestGroups);
}

})();

stableWorkflow

Array-based multi-phase workflow orchestration with 5 execution modes: Sequential, Concurrent, Mixed (hybrid), Non-Linear (with CONTINUE / SKIP / REPLAY / JUMP / TERMINATE decisions), and Branched (parallel execution paths). Supports phase replay, early termination, and state persistence for workflow recovery.

Problem Statement

Your data pipeline has 4 phases: (1) fetch raw data, (2) validate & clean in parallel with (3) enrichment, then (4) persist. If validation fails, you need to retry with relaxed rules. If enrichment data is stale, skip it. You need shared state between phases, phase-level decision hooks, and the ability to branch for A/B testing different processing strategies.

TypeScript
import { 
  stableWorkflow, 
  PHASE_DECISION_ACTIONS,
  REQUEST_METHODS,
  RequestOrFunction,
  RETRY_STRATEGIES,
  PersistenceStage,
  type STABLE_WORKFLOW_PHASE
} from '@emmvish/stable-infra';

(async () => {

// Mock dependencies for demonstration
const logger = { info: console.info, debug: console.error };
const redis = {
  set: async (key: string, val: any) => { localStorage?.setItem?.(key, JSON.stringify(val)); },
  get: async (key: string) => JSON.parse(localStorage?.getItem?.(key) || 'null')
};
const validateRecord = (record: any, strict: boolean) => record != null;

// Define request body types (for HTTP requests)
interface FetchRequest { /* empty - GET request */ }
interface PersistRequest { data: any[]; metadata: Record<string, any>; }

// Define response types
interface RawData { records: any[]; timestamp: string; }
interface EnrichedData { records: any[]; enrichedAt: string; }
interface PersistResult { success: boolean; }

// Define function types
interface ValidatedData { valid: any[]; errors: string[]; }
type ValidateFnArgs = [data: RawData, strict: boolean];
type ValidateFnReturn = ValidatedData;

// Union types for workflow with mixed request/response types
type WorkflowRequestBody = FetchRequest | PersistRequest;
type WorkflowResponseData = RawData | EnrichedData | PersistResult;

const phases: STABLE_WORKFLOW_PHASE<WorkflowRequestBody, WorkflowResponseData, ValidateFnArgs, ValidateFnReturn>[] = [
  // Phase 1: Fetch raw data (sequential)
  {
    id: 'fetch-data',
    requests: [{
      id: 'get-raw-data',
      requestOptions: {
        reqData: {
          hostname: 'data-lake.internal',
          path: '/exports/latest'
        },
        resReq: true,
        attempts: 3,
        cache: { enabled: true, ttl: 300000 }
      }
    }],
    // Store results in shared buffer for later phases
    phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
      if (phaseResult.success && sharedBuffer) {
        sharedBuffer.rawData = phaseResult.responses[0].data;
        sharedBuffer.recordCount = (phaseResult.responses[0].data as RawData).records.length;
        return { action: PHASE_DECISION_ACTIONS.CONTINUE };
      }
      return { action: PHASE_DECISION_ACTIONS.TERMINATE };
    }
  },
  
  // Phase 2: Validate (marks start of concurrent group)
  {
    id: 'validate',
    markConcurrentPhase: true, // Runs concurrently with phase 3
    items: [{
      type: RequestOrFunction.FUNCTION,
      function: {
        id: 'validate-records',
        functionOptions: {
          fn: async (data: RawData, strict: boolean): Promise<ValidatedData> => {
            const valid = data.records.filter(r => validateRecord(r, strict));
            return { valid, errors: [] };
          },
          args: [{ records: [], timestamp: '' }, true],
          returnResult: true
        }
      }
    }],
    maxReplayCount: 2,
    allowReplay: true,
    phaseDecisionHook: async ({ phaseResult, sharedBuffer, executionHistory }) => {
      const validData = phaseResult.responses[0].data as ValidatedData;
      const replayCount = executionHistory.filter(e => e.phaseId === 'validate').length;
      
      // If too many invalid records, retry with relaxed validation
      if (sharedBuffer && validData.valid.length < sharedBuffer.recordCount * 0.8) {
        if (replayCount < 2) {
          sharedBuffer.strictValidation = false;
          return { action: PHASE_DECISION_ACTIONS.REPLAY };
        }
      }
      
      if (sharedBuffer) sharedBuffer.validatedData = validData.valid;
      return { action: PHASE_DECISION_ACTIONS.CONTINUE };
    }
  },
  
  // Phase 3: Enrich (concurrent with validation)
  {
    id: 'enrich',
    markConcurrentPhase: true, // Same concurrent group
    requests: [{
      id: 'fetch-enrichment',
      requestOptions: {
        reqData: {
          hostname: 'enrichment-service.internal',
          path: '/metadata'
        },
        resReq: true
      }
    }],
    allowSkip: true,
    phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
      if (!phaseResult.success) {
        // Enrichment is optional - continue without it
        if (sharedBuffer) sharedBuffer.enrichmentSkipped = true;
        return { action: PHASE_DECISION_ACTIONS.CONTINUE };
      }
      
      // Check data freshness
      const enrichment = phaseResult.responses[0].data as EnrichedData;
      const age = Date.now() - new Date(enrichment.enrichedAt).getTime();
      if (age > 3600000) { // > 1 hour old
        if (sharedBuffer) sharedBuffer.enrichmentSkipped = true;
        return { action: PHASE_DECISION_ACTIONS.CONTINUE };
      }
      
      if (sharedBuffer) sharedBuffer.enrichmentData = enrichment;
      return { action: PHASE_DECISION_ACTIONS.CONTINUE };
    }
  },
  
  // Phase 4: Persist (sequential, after concurrent group)
  {
    id: 'persist',
    requests: [{
      id: 'save-to-warehouse',
      requestOptions: {
        reqData: {
          hostname: 'data-warehouse.internal',
          path: '/ingest',
          method: REQUEST_METHODS.POST
        },
        resReq: false,
        attempts: 5,
        retryStrategy: RETRY_STRATEGIES.EXPONENTIAL
      }
    }]
  }
];

// Execute with mixed mode enabled (explicit generics for full type safety)
const result = await stableWorkflow<WorkflowRequestBody, WorkflowResponseData, ValidateFnArgs, ValidateFnReturn>(phases, {
  workflowId: 'data-pipeline-001',
  
  // Enable mixed execution (sequential + concurrent groups)
  enableMixedExecution: true,
  enableNonLinearExecution: true, // Enable REPLAY/SKIP/JUMP
  
  // Global configuration
  commonAttempts: 2,
  commonWait: 1000,
  maxWorkflowIterations: 100, // Safety limit
  
  // Shared state across all phases
  sharedBuffer: {
    pipelineStart: Date.now(),
    strictValidation: true
  },
  
  // Workflow-level circuit breaker
  circuitBreaker: { failureThresholdPercentage: 50, minimumRequests: 3, recoveryTimeoutMs: 60000 },
  
  // Observability hooks
  handlePhaseCompletion: ({ phaseResult, workflowId }) => {
    logger.info(`Phase ${phaseResult.phaseId} completed`, {
      workflowId,
      success: phaseResult.success,
      duration: phaseResult.metrics?.executionTime
    });
  },
  handlePhaseDecision: ({ decision, phaseResult }) => {
    logger.debug(`Decision for ${phaseResult.phaseId}: ${decision.action}`);
  },
  
  // State persistence for recovery
  statePersistence: {
    persistenceFunction: async ({ executionContext, buffer, persistenceStage }) => {
      const key = `workflow:${executionContext.workflowId}`;
      if (persistenceStage === PersistenceStage.BEFORE_HOOK) {
        return (await redis.get(key)) || buffer;
      }
      await redis.set(key, buffer);
      return buffer;
    },
    loadBeforeHooks: true,
    storeAfterHooks: true
  },
  
  // Metrics guardrails
  metricsGuardrails: {
    workflow: { totalPhases: { max: 10 }, executionTime: { max: 300000 } }
  }
});

// Access workflow results
console.log(`Workflow success: ${result.success}`);
console.log(`Phases executed: ${result.totalPhases}`);
console.log(`Replays: ${result.metrics?.totalPhaseReplays || 0}`);
console.log(`Phase results:`, result.phases.map((p) => p.phaseId));

})();

stableWorkflowGraph

DAG-based workflow orchestration with explicit node dependencies, conditional routing, parallel groups, and merge points. Offers higher parallelism than array-based workflows and enables complex topologies like diamond patterns, fan-out/fan-in, and conditional branching based on runtime results.

Problem Statement

Your ML pipeline requires: (1) data preprocessing, then parallel (2) feature extraction and (3) model inference, which merge before (4) post-processing. If inference confidence is low, route to (5) human review instead. You need automatic DAG validation, conditional edge traversal, and the ability to execute independent nodes in true parallel.

TypeScript
import { 
  stableWorkflowGraph, 
  WorkflowGraphBuilder,
  WorkflowEdgeConditionTypes,
  RequestOrFunction,
  REQUEST_METHODS,
  type ConditionalEvaluationContext
} from '@emmvish/stable-infra';

(async () => {

// Mock dependencies for demonstration
const inputBuffer = Buffer.from('mock-input-data');
const logger = { info: console.info };
const preprocessPipeline = {
  execute: async (data: Buffer) => ({ normalized: true, data: data.toString() })
};
const featureExtractor = {
  extract: async (input: any) => ({ features: [0.1, 0.5, 0.3] })
};

// Define request body types
interface MLRequest { features?: number[]; data?: any; }
interface ReviewRequest { itemId: string; data: any; }
interface StoreRequest { results: any; metadata: Record<string, any>; }

// Define response types
interface PreprocessResult { normalized: boolean; data: string; }
interface InferenceResult { confidence: number; predictions: any[]; }
interface FeatureResult { features: number[]; }
interface ReviewResult { queued: boolean; reviewId: string; }
interface StoreResult { stored: boolean; }
interface FormatResult { features: any; predictions: any; timestamp: string; }

// Union types for graph workflow
type GraphRequestBody = MLRequest | ReviewRequest | StoreRequest;
type GraphResponseData = PreprocessResult | InferenceResult | FeatureResult | ReviewResult | StoreResult | FormatResult;

// Function types (use any[] for heterogeneous function signatures)
type GraphFunctionArgs = any[];
type GraphFunctionReturn = PreprocessResult | FeatureResult | FormatResult;

// Build the workflow graph with fluent API (using all 4 generics)
const graph = new WorkflowGraphBuilder<GraphRequestBody, GraphResponseData, GraphFunctionArgs, GraphFunctionReturn>()
  
  // Entry point: Data preprocessing
  .addPhase('preprocess', {
    items: [{
      type: RequestOrFunction.FUNCTION,
      function: {
        id: 'normalize-data',
        functionOptions: {
          fn: async (rawData: Buffer) => {
            // Normalize, resize, convert formats
            return preprocessPipeline.execute(rawData);
          },
          args: [inputBuffer],
          returnResult: true,
          executionTimeout: 30000
        }
      }
    }]
  })
  
  // Parallel group: Feature extraction + Model inference
  .addPhase('extract-features', {
    items: [{
      type: RequestOrFunction.FUNCTION,
      function: {
        id: 'feature-extractor',
        functionOptions: {
          fn: featureExtractor.extract,
          args: [null], // Will be populated from sharedBuffer
          returnResult: true,
          cache: { enabled: true, ttl: 600000 }
        }
      }
    }]
  })
  
  .addPhase('run-inference', {
    requests: [{
      id: 'ml-inference',
      requestOptions: {
        reqData: {
          hostname: 'ml-service.internal',
          path: '/predict',
          method: REQUEST_METHODS.POST
        },
        resReq: true,
        attempts: 3,
        responseAnalyzer: ({ data }) => (data as InferenceResult)?.confidence !== undefined
      }
    }]
  })
  
  // Create parallel group node
  .addParallelGroup('parallel-processing', ['extract-features', 'run-inference'])
  
  // Merge point: Wait for parallel group to complete
  .addMergePoint('sync-results', ['parallel-processing'])
  
  // Conditional routing based on inference confidence
  .addConditional('confidence-check', ({ results, sharedBuffer }: ConditionalEvaluationContext<GraphRequestBody, GraphResponseData, GraphFunctionArgs, GraphFunctionReturn>) => {
    const inferenceResult = results.get('run-inference');
    const confidence = (inferenceResult?.responses?.[0]?.data as InferenceResult)?.confidence || 0;
    
    // Route to human review if confidence is low
    if (confidence < 0.85) {
      if (sharedBuffer) sharedBuffer.needsReview = true;
      return 'human-review';
    }
    return 'postprocess';
  })
  
  // Human review path
  .addPhase('human-review', {
    requests: [{
      id: 'queue-review',
      requestOptions: {
        reqData: {
          hostname: 'review-queue.internal',
          path: '/submit',
          method: REQUEST_METHODS.POST
        },
        resReq: true
      }
    }]
  })
  
  // Post-processing path
  .addPhase('postprocess', {
    items: [{
      type: RequestOrFunction.FUNCTION,
      function: {
        id: 'format-output',
        functionOptions: {
          fn: async (features: any, predictions: any) => {
            return {
              features,
              predictions,
              timestamp: new Date().toISOString()
            };
          },
          args: [{}, {}],
          returnResult: true
        }
      }
    }]
  })
  
  // Final merge: Both paths converge
  .addMergePoint('final-merge', ['human-review', 'postprocess'])
  
  // Final output phase
  .addPhase('output', {
    requests: [{
      id: 'store-results',
      requestOptions: {
        reqData: {
          hostname: 'results-store.internal',
          path: '/save',
          method: REQUEST_METHODS.POST
        },
        resReq: false,
        attempts: 5
      }
    }]
  })
  
  // Connect nodes with edge conditions
  .connect('preprocess', 'parallel-processing', { 
    condition: { type: WorkflowEdgeConditionTypes.SUCCESS }
  })
  .connect('parallel-processing', 'sync-results')
  .connect('sync-results', 'confidence-check')
  // Conditional routing edges (handled by conditional node)
  .connect('human-review', 'final-merge')
  .connect('postprocess', 'final-merge')
  .connect('final-merge', 'output')
  
  // Set entry and build
  .setEntryPoint('preprocess')
  .build();

// Execute the graph workflow (with all 4 explicit generics)
const result = await stableWorkflowGraph<GraphRequestBody, GraphResponseData, GraphFunctionArgs, GraphFunctionReturn>(graph, {
  workflowId: 'ml-pipeline-001',
  
  // Shared state across all nodes
  sharedBuffer: {
    inputId: 'sample-12345',
    modelVersion: 'v2.3.1'
  },
  
  // Global configuration
  commonAttempts: 2,
  commonWait: 1000,
  maxConcurrentRequests: 10,
  
  // Circuit breaker for entire graph
  circuitBreaker: { failureThresholdPercentage: 50, minimumRequests: 5, recoveryTimeoutMs: 60000 },
  
  // Graph-level timeout
  maxTimeout: 120000,
  
  // Observability
  handlePhaseCompletion: ({ phaseResult }) => {
    logger.info(`Node ${phaseResult.phaseId} completed`);
  },
  
  // Execution context for tracing
  executionContext: { workflowId: 'ml-pipeline-001' }
});

// Access results
console.log(`Graph success: ${result.success}`);
console.log(`Nodes executed: ${result.executionHistory?.length}`);
console.log(`Path taken: ${result.executionHistory?.map((n) => n.phaseId).join(' → ')}`);
// Note: sharedBuffer is passed in options but not returned in result
// Check phases for specific results instead
const inferencePhase = result.phases.find((p) => p.phaseId === 'run-inference');
const needsReview = (inferencePhase?.responses?.[0]?.data as InferenceResult)?.confidence < 0.85;
console.log(`Human review needed: ${needsReview}`);

})();

StableScheduler

Schedule any stableRequest, stableFunction, or workflow to run on cron, interval, or timestamp triggers. Includes queue management, safe start/stop controls, and optional state persistence via custom save/load handlers.

Problem Statement

You need to run a data sync every 15 minutes, trigger a backfill job at a specific timestamp, and keep scheduler state recoverable across restarts. You want a unified scheduling layer that uses the same resilience stack and observability hooks as the rest of the framework.

TypeScript
import { 
  StableScheduler,
  ScheduleTypes,
  type SchedulerSchedule,
  type SchedulerState
} from '@emmvish/stable-infra';

(async () => {

type SchedulerJob = {
  id?: string;
  schedule?: SchedulerSchedule;
  kind: 'sync' | 'backfill';
  payload?: string;
};

// In-memory persistence for demos/tests
let persistedState: SchedulerState<SchedulerJob> | null = null;

// Create a scheduler with a typed handler
const scheduler = new StableScheduler<SchedulerJob>(
  {
    maxParallel: 2,
    persistence: {
      enabled: true,
      saveState: (state) => {
        persistedState = state;
      },
      loadState: () => persistedState
    }
  },
  async (job, context) => {
    if (job.kind === 'sync') {
      console.log(`Sync job ran (${context.jobId})`, job.payload);
      return;
    }
    console.log(`Backfill executed at ${context.scheduledAt}`);
  }
);

const restored = await scheduler.restoreState();
if (!restored) {
  scheduler.addJobs([
    {
      id: 'sync-job',
      kind: 'sync',
      payload: 'users',
      schedule: { type: ScheduleTypes.INTERVAL, everyMs: 15 * 60 * 1000 }
    },
    {
      id: 'backfill-job',
      kind: 'backfill',
      schedule: { type: ScheduleTypes.TIMESTAMP, at: Date.now() + 60000 }
    }
  ]);
}

scheduler.start();

// Stop after 2 seconds in demo environments
setTimeout(() => scheduler.stop(), 2000);

})();

DistributedCoordinator

Deploy stable-infra components across multiple nodes with distributed locking (with fencing tokens), compare-and-swap state management, quorum-based leader election, pub/sub messaging with configurable delivery guarantees, two-phase commit transactions, and distributed buffers & schedulers.

Problem Statement

You're running multiple instances of your service and need to coordinate access to shared resources without conflicts. You want distributed locks with fencing tokens to prevent stale holders, leader election with quorum support for split-brain prevention, and atomic multi-key transactions across distributed state — all with the same stability guarantees as the rest of the framework.

TypeScript
import {
  stableWorkflow,
  StableScheduler,
  DistributedCoordinator,
  InMemoryDistributedAdapter,
  createDistributedStableBuffer,
  createDistributedSchedulerConfig,
  withDistributedBufferLock,
  DistributedTransactionOperationType,
  DistributedConflictResolution,
  DistributedLockStatus,
  DistributedLockRenewalMode,
  DistributedTransactionStatus,
  DistributedMessageDelivery,
  ScheduleTypes,
  REQUEST_METHODS
} from '@emmvish/stable-infra';

import type {
  SchedulerSchedule,
  SchedulerRunContext,
  STABLE_WORKFLOW_PHASE,
  DistributedStableBuffer
} from '@emmvish/stable-infra';

(async () => {

// ==========================================================
// 1. Distributed Coordinator
// ==========================================================

const adapter = new InMemoryDistributedAdapter();
const coordinator = new DistributedCoordinator({
  adapter,
  namespace: 'order-service',
  enableLeaderElection: true
});
await coordinator.connect();

// ==========================================================
// 2. Distributed Locking with Fencing Tokens
// ==========================================================

const lock = await coordinator.acquireLock({
  resource: 'order:123',
  ttlMs: 30000,
  renewalMode: DistributedLockRenewalMode.AUTO,
  renewalIntervalMs: 10000
});

if (lock.status === DistributedLockStatus.ACQUIRED) {
  console.log(`Lock acquired, fencing token: ${lock.fencingToken}`);
  await coordinator.releaseLock(lock.handle!);
}

// ==========================================================
// 3. Compare-and-Swap & 2PC Transaction
// ==========================================================

await coordinator.setState('account:balance', { balance: 100 });
const casResult = await coordinator.compareAndSwap({
  key: 'account:balance',
  expectedVersion: 1,
  newValue: { balance: 150 }
});
console.log(`CAS swapped: ${casResult.swapped}`);

const txResult = await coordinator.executeTransaction([
  { type: DistributedTransactionOperationType.SET,
    key: 'account:A', value: { balance: 50 } },
  { type: DistributedTransactionOperationType.SET,
    key: 'account:B', value: { balance: 150 } },
  { type: DistributedTransactionOperationType.INCREMENT,
    key: 'transfers:count', delta: 1 }
]);
console.log(`TX status: ${txResult.status}, success: ${txResult.success}`);

// ==========================================================
// 4. Pub/Sub Messaging
// ==========================================================

const subscription = await coordinator.subscribe(
  'order-events',
  async (message) => {
    console.log(`Received: ${JSON.stringify(message.payload)}`);
  },
  { deliveryMode: DistributedMessageDelivery.AT_LEAST_ONCE }
);

await coordinator.publish('order-events', {
  type: 'ORDER_COMPLETED',
  orderId: '123'
});

// ==========================================================
// 5. Distributed Buffer (Cross-Node Shared State)
// ==========================================================

const distributedBuffer = await createDistributedStableBuffer({
  distributed: { adapter, namespace: 'order-service' },
  initialState: {
    orderCount: 0,
    totalRevenue: 0,
    failedOrders: [] as string[]
  },
  conflictResolution: DistributedConflictResolution.CUSTOM,
  mergeStrategy: (local, remote) => ({
    orderCount: Math.max(local.orderCount, remote.orderCount),
    totalRevenue: Math.max(local.totalRevenue, remote.totalRevenue),
    failedOrders: [...new Set([...local.failedOrders, ...remote.failedOrders])]
  }),
  syncOnTransaction: true
});
const { buffer: sharedBuffer } = distributedBuffer;

// Atomic update with distributed lock
await withDistributedBufferLock(distributedBuffer, async () => {
  await sharedBuffer.run(state => {
    state.orderCount += 1;
    state.totalRevenue += 99.99;
  });
});

// ==========================================================
// 6. Distributed Scheduler + Workflow Execution
// ==========================================================

interface OrderJob {
  id: string;
  orderId: string;
  schedule?: SchedulerSchedule;
}

const setup = await createDistributedSchedulerConfig<OrderJob>({
  distributed: { adapter, namespace: 'order-service' },
  scheduler: { maxParallel: 5, tickIntervalMs: 500 },
  enableLeaderElection: true,
  circuitBreaker: {
    failureThresholdPercentage: 50,
    minimumRequests: 5,
    recoveryTimeoutMs: 30000
  },
  onBecomeLeader: () => console.log('Became leader'),
  onLoseLeadership: () => console.log('Lost leadership')
});

// Each job runs a workflow with distributed shared state
const handler = async (
  job: OrderJob,
  ctx: SchedulerRunContext
) => {
  const phases: STABLE_WORKFLOW_PHASE[] = [
    {
      id: 'validate',
      requests: [{
        id: 'check-inventory',
        requestOptions: {
          reqData: {
            hostname: 'api.example.com',
            method: REQUEST_METHODS.POST,
            path: '/inventory/check',
            body: { orderId: job.orderId }
          }
        }
      }]
    },
    {
      id: 'fulfill',
      requests: [{
        id: 'create-shipment',
        requestOptions: {
          reqData: {
            hostname: 'api.example.com',
            method: REQUEST_METHODS.POST,
            path: '/shipping/create',
            body: { orderId: job.orderId }
          }
        }
      }]
    }
  ];

  const result = await stableWorkflow(phases, {
    workflowId: `order-${job.orderId}`,
    sharedBuffer,
    stopOnFirstPhaseError: true
  });

  if (result.success) {
    await withDistributedBufferLock(distributedBuffer, async () => {
      await sharedBuffer.run(s => { s.orderCount += 1; });
    });
  }
  return result;
};

const scheduler = new StableScheduler<OrderJob>(
  { ...setup.config, sharedBuffer } as any,
  handler
);

// Subscribe to incoming orders and schedule them
await coordinator.subscribe<{ orderId: string }>(
  'incoming-orders',
  async (msg) => {
    scheduler.addJob({
      id: `order-${msg.payload.orderId}`,
      orderId: msg.payload.orderId,
      schedule: {
        type: ScheduleTypes.TIMESTAMP,
        at: Date.now()
      }
    });
  }
);

const isLeader = await setup.waitForLeadership(10000);
if (isLeader) {
  scheduler.start();
  console.log('Distributed scheduler started');
}

// ==========================================================
// 7. Monitoring & Graceful Shutdown
// ==========================================================

const metrics = coordinator.getMetrics();
console.log('Metrics:', {
  nodeId: metrics.nodeId,
  isLeader: metrics.isLeader,
  locks: metrics.lockAcquisitions,
  state: sharedBuffer.read()
});

process.on('SIGTERM', async () => {
  scheduler.stop();
  await subscription.unsubscribe();
  await setup.disconnect();
  await distributedBuffer.disconnect();
  await coordinator.disconnect();
  process.exit(0);
});

})();

Stability-First Resilience Stack

Every execution mode inherits a comprehensive suite of production-ready resilience mechanisms — built-in, not bolted-on.

Intelligence Hooks

Response analyzers and error analyzers provide decision hooks for intelligent retry logic based on actual response content, not just HTTP status codes.

Observability Hooks

Comprehensive logging via handleErrors and handleSuccessfulAttemptData hooks. Track every attempt with timestamps, execution times, and custom metadata.

Retry Strategies

FIXED, LINEAR, and EXPONENTIAL backoff with configurable jitter to prevent thundering herd. Set max attempts, wait times, and caps.

Circuit Breaking

Three-state fail-fast protection (CLOSED → OPEN → HALF_OPEN). Configurable failure thresholds, reset timeouts, and half-open request limits.

Caching

HTTP response caching and function memoization with TTL, LRU eviction, and custom key generators. Respects cache-control headers.

Rate Limiting

Sliding-window token bucket rate limiter. Configure max requests per window to respect external API quotas and prevent throttling.

Concurrency Limiting

Semaphore-based concurrency control. Limit parallel executions to prevent resource exhaustion and respect downstream capacity.

Config Cascading

Global → Phase → Branch → Group → Item (Request / Function) configuration hierarchy. Lower levels override higher levels, preventing repetition while maintaining expressiveness.

Shared Buffers & Persistence

Mutable shared state accessible across attempts, phases, branches, and items. Pass computed data without global state pollution. Buffers can be persisted using State Persistence Hooks.

Pre-Execution Hooks

Dynamic configuration injection before execution. Load tokens, modify arguments, or override config based on runtime conditions.

Trial Mode

Dry-run execution without actual HTTP calls or side effects. Test workflows, validate configurations, and debug decision logic safely.

Metrics & Guardrails

Comprehensive metrics extraction with validation guardrails. Set min / max thresholds for execution times, success rates, and infrastructure stats.

Advanced Workflow Capabilities

From simple batch processing to complex DAG orchestration — choose the execution model that fits your topology.

The framework provides two workflow engines: stableWorkflow for array-based phase execution with rich control flow, and stableWorkflowGraph for DAG-based execution with explicit dependencies. Both support mixed items (requests + functions) and share the same resilience stack.

Sequential Workflows

Phases execute one after another in array order. Next phase starts only after previous completes.

  • Predictable execution order
  • Ideal for dependent phases
  • Data pipelines (fetch → transform → save)

Parallel Phases

All phases start simultaneously via Promise.all(). Faster execution for independent operations.

  • Maximum concurrency
  • No guaranteed order
  • Bulk operations without dependencies

Mixed (Hybrid) Execution

Sequential flow with concurrent groups. Mark phases with markConcurrentPhase to form parallel groups.

  • Best of both worlds
  • Partial parallelization
  • Optimize specific sections

Non-Linear Control Flow

Dynamic control via phase decision hooks: CONTINUE, SKIP, REPLAY, JUMP, TERMINATE.

  • Conditional execution
  • Phase replay with limits
  • Early termination
  • State machine patterns

Branched Workflows

Multiple independent execution paths with their own phase sequences. Supports branch racing for first-wins semantics.

  • Parallel branches
  • A/B testing patterns
  • Branch-level decisions
  • Racing with cancellation

DAG Workflows

Graph-based execution with explicit node dependencies, conditional routing, and merge points.

  • Diamond patterns
  • Fan-out / fan-in
  • Conditional edge traversal
  • Higher parallelism

Array-Based vs Graph-Based Workflows

Feature stableWorkflow (Array) stableWorkflowGraph (DAG)
Topology Definition Phase array with flags Explicit nodes and edges
Parallelism Model markConcurrentPhase groups Automatic from dependencies
Conditional Routing JUMP / SKIP decisions Conditional nodes & edge conditions
Merge / Sync Points End of concurrent group Explicit MERGE_POINT nodes
Phase Replay Built-in Via custom logic
Branching STABLE_WORKFLOW_BRANCH BRANCH nodes
Branch Racing enableBranchRacing Parallel group racing
Best For Linear / branched flows with control Complex DAGs with high parallelism