Stability-First
Execution Engine
Build resilient workflows, fault-tolerant API integrations, and intelligent batch processing with deterministic error handling, comprehensive observability, and full type safety.
Get Started
Install the framework and start building resilient applications in minutes.
npm i @emmvish/stable-infra
yarn add @emmvish/stable-infra
pnpm add @emmvish/stable-infra
Stable Runner (Docker)
Run config-driven jobs using the built-in runner. Create a config file, build the image, and execute jobs with automatic re-runs on config changes.
// examples/runner-config.mjs
import { RunnerJobs } from '@emmvish/stable-infra';
export default {
jobId: 'demo-001',
job: {
kind: RunnerJobs.STABLE_REQUEST,
options: {
reqData: {
hostname: 'jsonplaceholder.typicode.com',
path: '/todos/1'
},
resReq: true
}
}
};
docker build -t stable-infra-runner .
docker run --rm \
-e CONFIG_PATH=/app/examples/runner-config.mjs \
-e OUTPUT_PATH=/app/output/result.json \
-e MAX_RUNS=1 \
-v "$(pwd)/examples:/app/examples" \
-v "$(pwd)/output:/app/output" \
stable-infra-runner
Eight Core APIs
From single HTTP requests to complex DAG-based workflow orchestration, scheduled execution, and distributed coordination — every execution mode shares the same powerful resilience stack.
stableRequest
A robust HTTP request client with automatic retries, caching, circuit breaking, and rate limiting. Execute any HTTP method with full type safety for request and response data.
Problem Statement
You need to fetch user profile data from an external API that occasionally returns 500 errors, has rate limits, and sometimes returns stale data. You want to cache successful responses, implement smart retries with exponential backoff, and validate that the response contains the expected user schema before accepting it.
import {
stableRequest,
REQUEST_METHODS,
VALID_REQUEST_PROTOCOLS,
RETRY_STRATEGIES,
CircuitBreaker
} from '@emmvish/stable-infra';
(async () => {
// Mock authentication token
const token = 'your-auth-token-here';
// Define strongly-typed interfaces for request/response
interface UserProfileRequest {
userId: string;
includeActivity?: boolean;
}
interface UserProfile {
id: string;
name: string;
email: string;
subscription: 'free' | 'pro' | 'enterprise';
lastActive: string;
}
// Create a shared circuit breaker for user service
const userServiceBreaker = new CircuitBreaker({
failureThresholdPercentage: 30,
minimumRequests: 5,
recoveryTimeoutMs: 30000,
halfOpenMaxRequests: 2
});
// Execute request with full resilience stack
const result = await stableRequest<UserProfileRequest, UserProfile>({
// HTTP configuration with type-safe body
reqData: {
method: REQUEST_METHODS.POST,
protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
hostname: 'api.userservice.com',
path: '/v2/users/profile',
headers: { 'Authorization': `Bearer ${token}` },
body: { userId: 'usr_12345', includeActivity: true },
timeout: 5000
},
// Return actual response data (not just success boolean)
resReq: true,
// Retry configuration with exponential backoff
attempts: 4,
wait: 1000,
maxAllowedWait: 15000,
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
jitter: 200, // Prevent thundering herd
// Shared circuit breaker instance
circuitBreaker: userServiceBreaker,
// Cache configuration for idempotent reads
cache: {
enabled: true,
ttl: 60000, // 1 minute cache
keyGenerator: (config) => `user:${config.data?.userId}`
},
// Response validation - retry if schema doesn't match
responseAnalyzer: ({ data, commonBuffer }) => {
const isValid = !!(data?.id && data?.email && data?.subscription);
if (isValid && commonBuffer) commonBuffer.lastValidUser = data.id;
return isValid;
},
// Observability hooks for monitoring
logAllErrors: true,
handleErrors: ({ errorLog }) => {
console.error('User fetch failed', errorLog);
},
handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
console.log(`Fetched user in ${successfulAttemptData.executionTime}ms`);
},
// Shared state across attempts
commonBuffer: { requestStart: Date.now() },
// Metrics guardrails for anomaly detection
metricsGuardrails: {
request: { totalExecutionTime: { max: 10000 } },
infrastructure: { circuitBreaker: { failureRate: { max: 0.5 } } }
}
});
// Type-safe result handling
if (result.success) {
const user = result.data as UserProfile;
console.log(`Welcome, ${user.name} (${user.subscription})`);
console.log(`Metrics: ${result.metrics?.totalAttempts} attempts`);
console.log(`Cache: ${result.metrics?.infrastructureMetrics?.cache?.hits} hits`);
} else {
console.error(`Failed: ${result.error}`);
console.log(`Error logs: ${result.errorLogs?.length}`);
}
})();
StableBuffer
Transactional shared state for workflows, gateways, and schedulers. Serialize updates, capture snapshots, and enforce guardrails on buffer activity to keep shared state deterministic and safe. Ships with ability to replay and restore state from transaction logs.
Problem Statement
You need a reliable shared buffer across concurrent attempts and phases. Updates must be serialized, snapshots should be persisted, and you want guardrails on mutation volume and latency.
import { StableBuffer } from '@emmvish/stable-infra';
type SharedState = {
workflowId: string;
processed: number;
lastCheckpoint?: string;
};
const initialState: SharedState = {
workflowId: 'sync-001',
processed: 0
};
const buffer = new StableBuffer({
initialState: initialState,
metricsGuardrails: {
totalTransactions: { max: 1000 },
averageQueueWaitMs: { max: 250 }
}
});
// Serialize concurrent updates with a transactional queue
await Promise.all([
buffer.transaction(async (draft) => {
const state = draft as SharedState;
state.processed += 1;
state.lastCheckpoint = new Date().toISOString();
}),
buffer.transaction(async (draft) => {
const state = draft as SharedState;
state.processed += 1;
state.lastCheckpoint = new Date().toISOString();
}),
buffer.transaction(async (draft) => {
const state = draft as SharedState;
state.processed += 1;
state.lastCheckpoint = new Date().toISOString();
})
]);
// Capture a snapshot for persistence or diagnostics
const snapshot = buffer.read() as SharedState;
console.log('Snapshot', snapshot);
// Metrics with guardrail validation
const metrics = buffer.getMetrics();
console.log('Buffer metrics', metrics);
stableFunction
Execute any synchronous or asynchronous function with the full resilience stack — retries, caching, circuit breaking, rate limiting, and comprehensive observability. Perfect for wrapping database operations, external SDK calls, or complex computations.
Problem Statement
You have an expensive ML inference function that occasionally times out, sometimes returns low-confidence results, and has memory constraints requiring concurrency limits. You want to cache results based on input hash, retry on timeouts, and validate confidence scores before accepting predictions.
import {
stableFunction,
RETRY_STRATEGIES,
CircuitBreaker
} from '@emmvish/stable-infra';
(async () => {
// Mock dependencies for demonstration
const imageBuffer = Buffer.from('mock-image-data');
const batchId = 'batch-001';
const hashBuffer = (buf: Buffer) => buf.toString('base64').slice(0, 16);
const metrics = { increment: (key: string) => {}, histogram: (key: string, val: number) => {} };
const logger = { warn: console.warn, info: console.info };
const featureFlags = { get: async (key: string) => ({ inferenceTimeout: 30000 }) };
// Define typed arguments and return type
type InferenceArgs = [
imageBuffer: Buffer,
modelConfig: ModelConfig
];
interface ModelConfig {
modelId: string;
threshold: number;
maxTokens?: number;
}
interface PredictionResult {
labels: Array<{ name: string; confidence: number }>;
processingTime: number;
modelVersion: string;
}
// The expensive ML inference function
async function runInference(
imageBuffer: Buffer,
config: ModelConfig
): Promise<PredictionResult> {
// Simulate ML inference with mock response
await new Promise(r => setTimeout(r, 100));
return {
labels: [{ name: 'cat', confidence: 0.92 }, { name: 'dog', confidence: 0.08 }],
processingTime: 95,
modelVersion: config.modelId
};
}
// Shared circuit breaker for ML service
const mlBreaker = new CircuitBreaker({
failureThresholdPercentage: 50,
minimumRequests: 3,
recoveryTimeoutMs: 60000,
halfOpenMaxRequests: 1
});
// Execute with full resilience stack
const result = await stableFunction<InferenceArgs, PredictionResult>({
// Function and typed arguments
fn: runInference,
args: [imageBuffer, { modelId: 'resnet-50', threshold: 0.85 }],
// Return the actual prediction result
returnResult: true,
// Retry configuration for transient failures
attempts: 3,
wait: 2000,
retryStrategy: RETRY_STRATEGIES.LINEAR,
jitter: 500,
// Execution timeout to prevent hanging
executionTimeout: 30000,
// Circuit breaker for fail-fast protection
circuitBreaker: mlBreaker,
// Concurrency limit (ML models are memory-intensive)
maxConcurrentRequests: 4,
// Rate limiting for GPU resource management
rateLimit: { maxRequests: 20, windowMs: 10000 },
// Cache predictions by input hash
cache: {
enabled: true,
ttl: 300000, // 5 minute cache
keyGenerator: (fn, args) => {
const [buffer, config] = args;
return `inference:${config.modelId}:${hashBuffer(buffer)}`;
}
},
// Validate confidence scores - retry if too low
responseAnalyzer: ({ data, args, commonBuffer }) => {
const [, config] = args;
const highConfidence = data.labels.some(
l => l.confidence >= config.threshold
);
if (highConfidence && commonBuffer) {
commonBuffer.topLabel = data.labels[0].name;
}
return highConfidence;
},
// Observability hooks
logAllErrors: true,
handleErrors: ({ errorLog }) => {
metrics.increment('ml.inference.errors');
logger.warn('Inference failed', {
attempt: errorLog.attempt,
error: errorLog.error
});
},
logAllSuccessfulAttempts: true,
handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
metrics.histogram('ml.inference.latency',
successfulAttemptData.executionTime
);
},
// Dynamic configuration via pre-execution hook
preExecution: {
preExecutionHook: async ({ stableFunctionOptions }) => {
// Load latest model config from feature flags
const flags = await featureFlags.get('ml-config');
return {
executionTimeout: flags.inferenceTimeout || 30000
};
},
applyPreExecutionConfigOverride: true
},
// Shared state and context
commonBuffer: { batchId: batchId },
executionContext: { workflowId: 'image-classification' },
// Metrics guardrails
metricsGuardrails: {
common: { executionTime: { max: 60000 } }
}
});
// Type-safe result handling
if (result.success) {
const prediction: PredictionResult = result.data!;
console.log(`Top label: ${prediction.labels[0].name}`);
console.log(`Confidence: ${(prediction.labels[0].confidence * 100).toFixed(1)}%`);
console.log(`Cache status: ${result.metrics?.infrastructureMetrics?.cache}`);
} else {
console.error(`Inference failed: ${result.error}`);
}
})();
stableApiGateway
Batch orchestration engine for executing multiple HTTP requests and functions together. Features request grouping, configuration cascading (Global → Group → Individual), concurrent/sequential execution, shared state buffers, and racing mode for redundant calls.
Problem Statement
Your e-commerce checkout flow needs to: (1) validate inventory across multiple warehouses in parallel, (2) calculate shipping rates from 3 providers and race for the fastest response, (3) process payment, and (4) send confirmation. Each step has different retry/timeout requirements, and you need aggregate metrics and shared state to pass data between steps.
import {
stableApiGateway,
REQUEST_METHODS,
RETRY_STRATEGIES,
RequestOrFunction,
CircuitBreaker,
type API_GATEWAY_ITEM
} from '@emmvish/stable-infra';
(async () => {
// Mock tax rates by state
const stateTaxRates: Record<string, number> = {
'CA': 0.0725, 'NY': 0.08, 'TX': 0.0625, 'WA': 0.065
};
// Define request body types
interface InventoryRequest { sku: string; quantity: number; }
interface PaymentRequest { amount: number; currency: string; }
// Define response types
interface InventoryResult { available: boolean; warehouse: string; stock: number; }
interface PaymentResult { transactionId: string; status: string; }
// Define function types
type TaxCalcArgs = [subtotal: number, state: string];
type TaxResult = { tax: number; total: number; };
// Union types for mixed gateway items
type RequestBodyUnion = InventoryRequest | PaymentRequest;
type ResponseUnion = InventoryResult | PaymentResult;
// Create circuit breaker for gateway
const gatewayBreaker = new CircuitBreaker({
failureThresholdPercentage: 50,
minimumRequests: 5,
recoveryTimeoutMs: 30000
});
// Build mixed items array with requests and functions (using union types)
const checkoutItems: API_GATEWAY_ITEM<RequestBodyUnion, ResponseUnion, TaxCalcArgs, TaxResult>[] = [
// Inventory checks (grouped for shared config)
{
type: RequestOrFunction.REQUEST,
request: {
id: 'inventory-west',
groupId: 'inventory-checks',
requestOptions: {
reqData: {
hostname: 'warehouse-west.api.com',
path: '/inventory/check',
method: REQUEST_METHODS.POST,
body: { sku: 'PROD-123', quantity: 2 }
},
resReq: true
}
}
},
{
type: RequestOrFunction.REQUEST,
request: {
id: 'inventory-east',
groupId: 'inventory-checks',
requestOptions: {
reqData: {
hostname: 'warehouse-east.api.com',
path: '/inventory/check',
method: REQUEST_METHODS.POST,
body: { sku: 'PROD-123', quantity: 2 }
},
resReq: true
}
}
},
// Tax calculation function
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'calculate-tax',
functionOptions: {
fn: (subtotal: number, state: string): TaxResult => {
const rate = stateTaxRates[state] || 0.08;
return { tax: subtotal * rate, total: subtotal * (1 + rate) };
},
args: [149.99, 'CA'] as TaxCalcArgs,
returnResult: true,
cache: { enabled: true, ttl: 3600000 } // Cache tax rates
}
}
},
// Payment processing (critical - more retries)
{
type: RequestOrFunction.REQUEST,
request: {
id: 'process-payment',
groupId: 'payment',
requestOptions: {
reqData: {
hostname: 'payments.stripe.com',
path: '/v1/charges',
method: REQUEST_METHODS.POST,
body: { amount: 16199, currency: 'usd' }
},
resReq: true,
attempts: 5, // Override group config
responseAnalyzer: ({ data }) => (data as PaymentResult)?.status === 'succeeded'
}
}
}
];
// Execute with gateway options (pass empty array for separate functions param to use all generics)
const result = await stableApiGateway<RequestBodyUnion, ResponseUnion, TaxCalcArgs, TaxResult>(checkoutItems, [], {
// Parallel execution for independent operations
concurrentExecution: true,
maxConcurrentRequests: 6,
// Request groups with cascading config
requestGroups: [
{
id: 'inventory-checks',
commonConfig: {
commonAttempts: 2,
commonWait: 500,
commonRetryStrategy: RETRY_STRATEGIES.FIXED
}
},
{
id: 'payment',
commonConfig: {
commonAttempts: 3,
commonWait: 2000,
commonRetryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
commonJitter: 300
}
}
],
// Global defaults (lowest priority)
commonAttempts: 2,
commonWait: 1000,
commonLogAllErrors: true,
// Shared state accessible in all items
sharedBuffer: {
orderId: 'ORD-98765',
customerId: 'CUST-12345',
startTime: Date.now()
},
// Global rate limiting
rateLimit: { maxRequests: 50, windowMs: 10000 },
// Circuit breaker for entire gateway
circuitBreaker: gatewayBreaker,
// Gateway-level timeout
maxTimeout: 60000,
// Execution context for tracing
executionContext: { workflowId: 'checkout-flow' },
// Metrics guardrails
metricsGuardrails: {
apiGateway: { successRate: { min: 0.9 } }
}
});
// Access individual results by ID (type-safe with union result type)
const inventoryWest = result.find((r) => r.requestId === 'inventory-west');
const taxResult = result.find((r) => r.requestId === 'calculate-tax');
const payment = result.find((r) => r.requestId === 'process-payment');
// Type-safe result access
if (inventoryWest?.success) {
const inventory = inventoryWest.data as InventoryResult;
console.log(`Warehouse: ${inventory.warehouse}, Stock: ${inventory.stock}`);
}
if (taxResult?.success) {
const tax = taxResult.data as TaxResult;
console.log(`Tax: $${tax.tax.toFixed(2)}, Total: $${tax.total.toFixed(2)}`);
}
if (payment?.success) {
const paymentData = payment.data as PaymentResult;
console.log(`Transaction: ${paymentData.transactionId}, Status: ${paymentData.status}`);
}
// Aggregate metrics
if (result.metrics) {
console.log(`Success rate: ${result.metrics.successRate.toFixed(1)}%`);
console.log(`Total time: ${result.metrics.executionTime}ms`);
console.log(`Throughput: ${result.metrics.throughput.toFixed(2)} items/s`);
console.log(`Request groups:`, result.metrics.requestGroups);
}
})();
stableWorkflow
Array-based multi-phase workflow orchestration with 5 execution modes: Sequential, Concurrent, Mixed (hybrid), Non-Linear (with CONTINUE / SKIP / REPLAY / JUMP / TERMINATE decisions), and Branched (parallel execution paths). Supports phase replay, early termination, and state persistence for workflow recovery.
Problem Statement
Your data pipeline has 4 phases: (1) fetch raw data, (2) validate & clean in parallel with (3) enrichment, then (4) persist. If validation fails, you need to retry with relaxed rules. If enrichment data is stale, skip it. You need shared state between phases, phase-level decision hooks, and the ability to branch for A/B testing different processing strategies.
import {
stableWorkflow,
PHASE_DECISION_ACTIONS,
REQUEST_METHODS,
RequestOrFunction,
RETRY_STRATEGIES,
PersistenceStage,
type STABLE_WORKFLOW_PHASE
} from '@emmvish/stable-infra';
(async () => {
// Mock dependencies for demonstration
const logger = { info: console.info, debug: console.error };
const redis = {
set: async (key: string, val: any) => { localStorage?.setItem?.(key, JSON.stringify(val)); },
get: async (key: string) => JSON.parse(localStorage?.getItem?.(key) || 'null')
};
const validateRecord = (record: any, strict: boolean) => record != null;
// Define request body types (for HTTP requests)
interface FetchRequest { /* empty - GET request */ }
interface PersistRequest { data: any[]; metadata: Record<string, any>; }
// Define response types
interface RawData { records: any[]; timestamp: string; }
interface EnrichedData { records: any[]; enrichedAt: string; }
interface PersistResult { success: boolean; }
// Define function types
interface ValidatedData { valid: any[]; errors: string[]; }
type ValidateFnArgs = [data: RawData, strict: boolean];
type ValidateFnReturn = ValidatedData;
// Union types for workflow with mixed request/response types
type WorkflowRequestBody = FetchRequest | PersistRequest;
type WorkflowResponseData = RawData | EnrichedData | PersistResult;
const phases: STABLE_WORKFLOW_PHASE<WorkflowRequestBody, WorkflowResponseData, ValidateFnArgs, ValidateFnReturn>[] = [
// Phase 1: Fetch raw data (sequential)
{
id: 'fetch-data',
requests: [{
id: 'get-raw-data',
requestOptions: {
reqData: {
hostname: 'data-lake.internal',
path: '/exports/latest'
},
resReq: true,
attempts: 3,
cache: { enabled: true, ttl: 300000 }
}
}],
// Store results in shared buffer for later phases
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
if (phaseResult.success && sharedBuffer) {
sharedBuffer.rawData = phaseResult.responses[0].data;
sharedBuffer.recordCount = (phaseResult.responses[0].data as RawData).records.length;
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
}
},
// Phase 2: Validate (marks start of concurrent group)
{
id: 'validate',
markConcurrentPhase: true, // Runs concurrently with phase 3
items: [{
type: RequestOrFunction.FUNCTION,
function: {
id: 'validate-records',
functionOptions: {
fn: async (data: RawData, strict: boolean): Promise<ValidatedData> => {
const valid = data.records.filter(r => validateRecord(r, strict));
return { valid, errors: [] };
},
args: [{ records: [], timestamp: '' }, true],
returnResult: true
}
}
}],
maxReplayCount: 2,
allowReplay: true,
phaseDecisionHook: async ({ phaseResult, sharedBuffer, executionHistory }) => {
const validData = phaseResult.responses[0].data as ValidatedData;
const replayCount = executionHistory.filter(e => e.phaseId === 'validate').length;
// If too many invalid records, retry with relaxed validation
if (sharedBuffer && validData.valid.length < sharedBuffer.recordCount * 0.8) {
if (replayCount < 2) {
sharedBuffer.strictValidation = false;
return { action: PHASE_DECISION_ACTIONS.REPLAY };
}
}
if (sharedBuffer) sharedBuffer.validatedData = validData.valid;
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
// Phase 3: Enrich (concurrent with validation)
{
id: 'enrich',
markConcurrentPhase: true, // Same concurrent group
requests: [{
id: 'fetch-enrichment',
requestOptions: {
reqData: {
hostname: 'enrichment-service.internal',
path: '/metadata'
},
resReq: true
}
}],
allowSkip: true,
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
if (!phaseResult.success) {
// Enrichment is optional - continue without it
if (sharedBuffer) sharedBuffer.enrichmentSkipped = true;
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
// Check data freshness
const enrichment = phaseResult.responses[0].data as EnrichedData;
const age = Date.now() - new Date(enrichment.enrichedAt).getTime();
if (age > 3600000) { // > 1 hour old
if (sharedBuffer) sharedBuffer.enrichmentSkipped = true;
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
if (sharedBuffer) sharedBuffer.enrichmentData = enrichment;
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
// Phase 4: Persist (sequential, after concurrent group)
{
id: 'persist',
requests: [{
id: 'save-to-warehouse',
requestOptions: {
reqData: {
hostname: 'data-warehouse.internal',
path: '/ingest',
method: REQUEST_METHODS.POST
},
resReq: false,
attempts: 5,
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL
}
}]
}
];
// Execute with mixed mode enabled (explicit generics for full type safety)
const result = await stableWorkflow<WorkflowRequestBody, WorkflowResponseData, ValidateFnArgs, ValidateFnReturn>(phases, {
workflowId: 'data-pipeline-001',
// Enable mixed execution (sequential + concurrent groups)
enableMixedExecution: true,
enableNonLinearExecution: true, // Enable REPLAY/SKIP/JUMP
// Global configuration
commonAttempts: 2,
commonWait: 1000,
maxWorkflowIterations: 100, // Safety limit
// Shared state across all phases
sharedBuffer: {
pipelineStart: Date.now(),
strictValidation: true
},
// Workflow-level circuit breaker
circuitBreaker: { failureThresholdPercentage: 50, minimumRequests: 3, recoveryTimeoutMs: 60000 },
// Observability hooks
handlePhaseCompletion: ({ phaseResult, workflowId }) => {
logger.info(`Phase ${phaseResult.phaseId} completed`, {
workflowId,
success: phaseResult.success,
duration: phaseResult.metrics?.executionTime
});
},
handlePhaseDecision: ({ decision, phaseResult }) => {
logger.debug(`Decision for ${phaseResult.phaseId}: ${decision.action}`);
},
// State persistence for recovery
statePersistence: {
persistenceFunction: async ({ executionContext, buffer, persistenceStage }) => {
const key = `workflow:${executionContext.workflowId}`;
if (persistenceStage === PersistenceStage.BEFORE_HOOK) {
return (await redis.get(key)) || buffer;
}
await redis.set(key, buffer);
return buffer;
},
loadBeforeHooks: true,
storeAfterHooks: true
},
// Metrics guardrails
metricsGuardrails: {
workflow: { totalPhases: { max: 10 }, executionTime: { max: 300000 } }
}
});
// Access workflow results
console.log(`Workflow success: ${result.success}`);
console.log(`Phases executed: ${result.totalPhases}`);
console.log(`Replays: ${result.metrics?.totalPhaseReplays || 0}`);
console.log(`Phase results:`, result.phases.map((p) => p.phaseId));
})();
stableWorkflowGraph
DAG-based workflow orchestration with explicit node dependencies, conditional routing, parallel groups, and merge points. Offers higher parallelism than array-based workflows and enables complex topologies like diamond patterns, fan-out/fan-in, and conditional branching based on runtime results.
Problem Statement
Your ML pipeline requires: (1) data preprocessing, then parallel (2) feature extraction and (3) model inference, which merge before (4) post-processing. If inference confidence is low, route to (5) human review instead. You need automatic DAG validation, conditional edge traversal, and the ability to execute independent nodes in true parallel.
import {
stableWorkflowGraph,
WorkflowGraphBuilder,
WorkflowEdgeConditionTypes,
RequestOrFunction,
REQUEST_METHODS,
type ConditionalEvaluationContext
} from '@emmvish/stable-infra';
(async () => {
// Mock dependencies for demonstration
const inputBuffer = Buffer.from('mock-input-data');
const logger = { info: console.info };
const preprocessPipeline = {
execute: async (data: Buffer) => ({ normalized: true, data: data.toString() })
};
const featureExtractor = {
extract: async (input: any) => ({ features: [0.1, 0.5, 0.3] })
};
// Define request body types
interface MLRequest { features?: number[]; data?: any; }
interface ReviewRequest { itemId: string; data: any; }
interface StoreRequest { results: any; metadata: Record<string, any>; }
// Define response types
interface PreprocessResult { normalized: boolean; data: string; }
interface InferenceResult { confidence: number; predictions: any[]; }
interface FeatureResult { features: number[]; }
interface ReviewResult { queued: boolean; reviewId: string; }
interface StoreResult { stored: boolean; }
interface FormatResult { features: any; predictions: any; timestamp: string; }
// Union types for graph workflow
type GraphRequestBody = MLRequest | ReviewRequest | StoreRequest;
type GraphResponseData = PreprocessResult | InferenceResult | FeatureResult | ReviewResult | StoreResult | FormatResult;
// Function types (use any[] for heterogeneous function signatures)
type GraphFunctionArgs = any[];
type GraphFunctionReturn = PreprocessResult | FeatureResult | FormatResult;
// Build the workflow graph with fluent API (using all 4 generics)
const graph = new WorkflowGraphBuilder<GraphRequestBody, GraphResponseData, GraphFunctionArgs, GraphFunctionReturn>()
// Entry point: Data preprocessing
.addPhase('preprocess', {
items: [{
type: RequestOrFunction.FUNCTION,
function: {
id: 'normalize-data',
functionOptions: {
fn: async (rawData: Buffer) => {
// Normalize, resize, convert formats
return preprocessPipeline.execute(rawData);
},
args: [inputBuffer],
returnResult: true,
executionTimeout: 30000
}
}
}]
})
// Parallel group: Feature extraction + Model inference
.addPhase('extract-features', {
items: [{
type: RequestOrFunction.FUNCTION,
function: {
id: 'feature-extractor',
functionOptions: {
fn: featureExtractor.extract,
args: [null], // Will be populated from sharedBuffer
returnResult: true,
cache: { enabled: true, ttl: 600000 }
}
}
}]
})
.addPhase('run-inference', {
requests: [{
id: 'ml-inference',
requestOptions: {
reqData: {
hostname: 'ml-service.internal',
path: '/predict',
method: REQUEST_METHODS.POST
},
resReq: true,
attempts: 3,
responseAnalyzer: ({ data }) => (data as InferenceResult)?.confidence !== undefined
}
}]
})
// Create parallel group node
.addParallelGroup('parallel-processing', ['extract-features', 'run-inference'])
// Merge point: Wait for parallel group to complete
.addMergePoint('sync-results', ['parallel-processing'])
// Conditional routing based on inference confidence
.addConditional('confidence-check', ({ results, sharedBuffer }: ConditionalEvaluationContext<GraphRequestBody, GraphResponseData, GraphFunctionArgs, GraphFunctionReturn>) => {
const inferenceResult = results.get('run-inference');
const confidence = (inferenceResult?.responses?.[0]?.data as InferenceResult)?.confidence || 0;
// Route to human review if confidence is low
if (confidence < 0.85) {
if (sharedBuffer) sharedBuffer.needsReview = true;
return 'human-review';
}
return 'postprocess';
})
// Human review path
.addPhase('human-review', {
requests: [{
id: 'queue-review',
requestOptions: {
reqData: {
hostname: 'review-queue.internal',
path: '/submit',
method: REQUEST_METHODS.POST
},
resReq: true
}
}]
})
// Post-processing path
.addPhase('postprocess', {
items: [{
type: RequestOrFunction.FUNCTION,
function: {
id: 'format-output',
functionOptions: {
fn: async (features: any, predictions: any) => {
return {
features,
predictions,
timestamp: new Date().toISOString()
};
},
args: [{}, {}],
returnResult: true
}
}
}]
})
// Final merge: Both paths converge
.addMergePoint('final-merge', ['human-review', 'postprocess'])
// Final output phase
.addPhase('output', {
requests: [{
id: 'store-results',
requestOptions: {
reqData: {
hostname: 'results-store.internal',
path: '/save',
method: REQUEST_METHODS.POST
},
resReq: false,
attempts: 5
}
}]
})
// Connect nodes with edge conditions
.connect('preprocess', 'parallel-processing', {
condition: { type: WorkflowEdgeConditionTypes.SUCCESS }
})
.connect('parallel-processing', 'sync-results')
.connect('sync-results', 'confidence-check')
// Conditional routing edges (handled by conditional node)
.connect('human-review', 'final-merge')
.connect('postprocess', 'final-merge')
.connect('final-merge', 'output')
// Set entry and build
.setEntryPoint('preprocess')
.build();
// Execute the graph workflow (with all 4 explicit generics)
const result = await stableWorkflowGraph<GraphRequestBody, GraphResponseData, GraphFunctionArgs, GraphFunctionReturn>(graph, {
workflowId: 'ml-pipeline-001',
// Shared state across all nodes
sharedBuffer: {
inputId: 'sample-12345',
modelVersion: 'v2.3.1'
},
// Global configuration
commonAttempts: 2,
commonWait: 1000,
maxConcurrentRequests: 10,
// Circuit breaker for entire graph
circuitBreaker: { failureThresholdPercentage: 50, minimumRequests: 5, recoveryTimeoutMs: 60000 },
// Graph-level timeout
maxTimeout: 120000,
// Observability
handlePhaseCompletion: ({ phaseResult }) => {
logger.info(`Node ${phaseResult.phaseId} completed`);
},
// Execution context for tracing
executionContext: { workflowId: 'ml-pipeline-001' }
});
// Access results
console.log(`Graph success: ${result.success}`);
console.log(`Nodes executed: ${result.executionHistory?.length}`);
console.log(`Path taken: ${result.executionHistory?.map((n) => n.phaseId).join(' → ')}`);
// Note: sharedBuffer is passed in options but not returned in result
// Check phases for specific results instead
const inferencePhase = result.phases.find((p) => p.phaseId === 'run-inference');
const needsReview = (inferencePhase?.responses?.[0]?.data as InferenceResult)?.confidence < 0.85;
console.log(`Human review needed: ${needsReview}`);
})();
StableScheduler
Schedule any stableRequest, stableFunction, or workflow to run on cron, interval, or timestamp triggers. Includes queue management, safe start/stop controls, and optional state persistence via custom save/load handlers.
Problem Statement
You need to run a data sync every 15 minutes, trigger a backfill job at a specific timestamp, and keep scheduler state recoverable across restarts. You want a unified scheduling layer that uses the same resilience stack and observability hooks as the rest of the framework.
import {
StableScheduler,
ScheduleTypes,
type SchedulerSchedule,
type SchedulerState
} from '@emmvish/stable-infra';
(async () => {
type SchedulerJob = {
id?: string;
schedule?: SchedulerSchedule;
kind: 'sync' | 'backfill';
payload?: string;
};
// In-memory persistence for demos/tests
let persistedState: SchedulerState<SchedulerJob> | null = null;
// Create a scheduler with a typed handler
const scheduler = new StableScheduler<SchedulerJob>(
{
maxParallel: 2,
persistence: {
enabled: true,
saveState: (state) => {
persistedState = state;
},
loadState: () => persistedState
}
},
async (job, context) => {
if (job.kind === 'sync') {
console.log(`Sync job ran (${context.jobId})`, job.payload);
return;
}
console.log(`Backfill executed at ${context.scheduledAt}`);
}
);
const restored = await scheduler.restoreState();
if (!restored) {
scheduler.addJobs([
{
id: 'sync-job',
kind: 'sync',
payload: 'users',
schedule: { type: ScheduleTypes.INTERVAL, everyMs: 15 * 60 * 1000 }
},
{
id: 'backfill-job',
kind: 'backfill',
schedule: { type: ScheduleTypes.TIMESTAMP, at: Date.now() + 60000 }
}
]);
}
scheduler.start();
// Stop after 2 seconds in demo environments
setTimeout(() => scheduler.stop(), 2000);
})();
DistributedCoordinator
Deploy stable-infra components across multiple nodes with distributed locking (with fencing tokens), compare-and-swap state management, quorum-based leader election, pub/sub messaging with configurable delivery guarantees, two-phase commit transactions, and distributed buffers & schedulers.
Problem Statement
You're running multiple instances of your service and need to coordinate access to shared resources without conflicts. You want distributed locks with fencing tokens to prevent stale holders, leader election with quorum support for split-brain prevention, and atomic multi-key transactions across distributed state — all with the same stability guarantees as the rest of the framework.
import {
stableWorkflow,
StableScheduler,
DistributedCoordinator,
InMemoryDistributedAdapter,
createDistributedStableBuffer,
createDistributedSchedulerConfig,
withDistributedBufferLock,
DistributedTransactionOperationType,
DistributedConflictResolution,
DistributedLockStatus,
DistributedLockRenewalMode,
DistributedTransactionStatus,
DistributedMessageDelivery,
ScheduleTypes,
REQUEST_METHODS
} from '@emmvish/stable-infra';
import type {
SchedulerSchedule,
SchedulerRunContext,
STABLE_WORKFLOW_PHASE,
DistributedStableBuffer
} from '@emmvish/stable-infra';
(async () => {
// ==========================================================
// 1. Distributed Coordinator
// ==========================================================
const adapter = new InMemoryDistributedAdapter();
const coordinator = new DistributedCoordinator({
adapter,
namespace: 'order-service',
enableLeaderElection: true
});
await coordinator.connect();
// ==========================================================
// 2. Distributed Locking with Fencing Tokens
// ==========================================================
const lock = await coordinator.acquireLock({
resource: 'order:123',
ttlMs: 30000,
renewalMode: DistributedLockRenewalMode.AUTO,
renewalIntervalMs: 10000
});
if (lock.status === DistributedLockStatus.ACQUIRED) {
console.log(`Lock acquired, fencing token: ${lock.fencingToken}`);
await coordinator.releaseLock(lock.handle!);
}
// ==========================================================
// 3. Compare-and-Swap & 2PC Transaction
// ==========================================================
await coordinator.setState('account:balance', { balance: 100 });
const casResult = await coordinator.compareAndSwap({
key: 'account:balance',
expectedVersion: 1,
newValue: { balance: 150 }
});
console.log(`CAS swapped: ${casResult.swapped}`);
const txResult = await coordinator.executeTransaction([
{ type: DistributedTransactionOperationType.SET,
key: 'account:A', value: { balance: 50 } },
{ type: DistributedTransactionOperationType.SET,
key: 'account:B', value: { balance: 150 } },
{ type: DistributedTransactionOperationType.INCREMENT,
key: 'transfers:count', delta: 1 }
]);
console.log(`TX status: ${txResult.status}, success: ${txResult.success}`);
// ==========================================================
// 4. Pub/Sub Messaging
// ==========================================================
const subscription = await coordinator.subscribe(
'order-events',
async (message) => {
console.log(`Received: ${JSON.stringify(message.payload)}`);
},
{ deliveryMode: DistributedMessageDelivery.AT_LEAST_ONCE }
);
await coordinator.publish('order-events', {
type: 'ORDER_COMPLETED',
orderId: '123'
});
// ==========================================================
// 5. Distributed Buffer (Cross-Node Shared State)
// ==========================================================
const distributedBuffer = await createDistributedStableBuffer({
distributed: { adapter, namespace: 'order-service' },
initialState: {
orderCount: 0,
totalRevenue: 0,
failedOrders: [] as string[]
},
conflictResolution: DistributedConflictResolution.CUSTOM,
mergeStrategy: (local, remote) => ({
orderCount: Math.max(local.orderCount, remote.orderCount),
totalRevenue: Math.max(local.totalRevenue, remote.totalRevenue),
failedOrders: [...new Set([...local.failedOrders, ...remote.failedOrders])]
}),
syncOnTransaction: true
});
const { buffer: sharedBuffer } = distributedBuffer;
// Atomic update with distributed lock
await withDistributedBufferLock(distributedBuffer, async () => {
await sharedBuffer.run(state => {
state.orderCount += 1;
state.totalRevenue += 99.99;
});
});
// ==========================================================
// 6. Distributed Scheduler + Workflow Execution
// ==========================================================
interface OrderJob {
id: string;
orderId: string;
schedule?: SchedulerSchedule;
}
const setup = await createDistributedSchedulerConfig<OrderJob>({
distributed: { adapter, namespace: 'order-service' },
scheduler: { maxParallel: 5, tickIntervalMs: 500 },
enableLeaderElection: true,
circuitBreaker: {
failureThresholdPercentage: 50,
minimumRequests: 5,
recoveryTimeoutMs: 30000
},
onBecomeLeader: () => console.log('Became leader'),
onLoseLeadership: () => console.log('Lost leadership')
});
// Each job runs a workflow with distributed shared state
const handler = async (
job: OrderJob,
ctx: SchedulerRunContext
) => {
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'validate',
requests: [{
id: 'check-inventory',
requestOptions: {
reqData: {
hostname: 'api.example.com',
method: REQUEST_METHODS.POST,
path: '/inventory/check',
body: { orderId: job.orderId }
}
}
}]
},
{
id: 'fulfill',
requests: [{
id: 'create-shipment',
requestOptions: {
reqData: {
hostname: 'api.example.com',
method: REQUEST_METHODS.POST,
path: '/shipping/create',
body: { orderId: job.orderId }
}
}
}]
}
];
const result = await stableWorkflow(phases, {
workflowId: `order-${job.orderId}`,
sharedBuffer,
stopOnFirstPhaseError: true
});
if (result.success) {
await withDistributedBufferLock(distributedBuffer, async () => {
await sharedBuffer.run(s => { s.orderCount += 1; });
});
}
return result;
};
const scheduler = new StableScheduler<OrderJob>(
{ ...setup.config, sharedBuffer } as any,
handler
);
// Subscribe to incoming orders and schedule them
await coordinator.subscribe<{ orderId: string }>(
'incoming-orders',
async (msg) => {
scheduler.addJob({
id: `order-${msg.payload.orderId}`,
orderId: msg.payload.orderId,
schedule: {
type: ScheduleTypes.TIMESTAMP,
at: Date.now()
}
});
}
);
const isLeader = await setup.waitForLeadership(10000);
if (isLeader) {
scheduler.start();
console.log('Distributed scheduler started');
}
// ==========================================================
// 7. Monitoring & Graceful Shutdown
// ==========================================================
const metrics = coordinator.getMetrics();
console.log('Metrics:', {
nodeId: metrics.nodeId,
isLeader: metrics.isLeader,
locks: metrics.lockAcquisitions,
state: sharedBuffer.read()
});
process.on('SIGTERM', async () => {
scheduler.stop();
await subscription.unsubscribe();
await setup.disconnect();
await distributedBuffer.disconnect();
await coordinator.disconnect();
process.exit(0);
});
})();
Stability-First Resilience Stack
Every execution mode inherits a comprehensive suite of production-ready resilience mechanisms — built-in, not bolted-on.
Intelligence Hooks
Response analyzers and error analyzers provide decision hooks for intelligent retry logic based on actual response content, not just HTTP status codes.
Observability Hooks
Comprehensive logging via handleErrors and handleSuccessfulAttemptData hooks. Track every attempt with timestamps, execution times, and custom metadata.
Retry Strategies
FIXED, LINEAR, and EXPONENTIAL backoff with configurable jitter to prevent thundering herd. Set max attempts, wait times, and caps.
Circuit Breaking
Three-state fail-fast protection (CLOSED → OPEN → HALF_OPEN). Configurable failure thresholds, reset timeouts, and half-open request limits.
Caching
HTTP response caching and function memoization with TTL, LRU eviction, and custom key generators. Respects cache-control headers.
Rate Limiting
Sliding-window token bucket rate limiter. Configure max requests per window to respect external API quotas and prevent throttling.
Concurrency Limiting
Semaphore-based concurrency control. Limit parallel executions to prevent resource exhaustion and respect downstream capacity.
Config Cascading
Global → Phase → Branch → Group → Item (Request / Function) configuration hierarchy. Lower levels override higher levels, preventing repetition while maintaining expressiveness.
Shared Buffers & Persistence
Mutable shared state accessible across attempts, phases, branches, and items. Pass computed data without global state pollution. Buffers can be persisted using State Persistence Hooks.
Pre-Execution Hooks
Dynamic configuration injection before execution. Load tokens, modify arguments, or override config based on runtime conditions.
Trial Mode
Dry-run execution without actual HTTP calls or side effects. Test workflows, validate configurations, and debug decision logic safely.
Metrics & Guardrails
Comprehensive metrics extraction with validation guardrails. Set min / max thresholds for execution times, success rates, and infrastructure stats.
Advanced Workflow Capabilities
From simple batch processing to complex DAG orchestration — choose the execution model that fits your topology.
The framework provides two workflow engines: stableWorkflow for array-based phase execution with rich control flow, and stableWorkflowGraph for DAG-based execution with explicit dependencies. Both support mixed items (requests + functions) and share the same resilience stack.
Sequential Workflows
Phases execute one after another in array order. Next phase starts only after previous completes.
- Predictable execution order
- Ideal for dependent phases
- Data pipelines (fetch → transform → save)
Parallel Phases
All phases start simultaneously via Promise.all(). Faster execution for independent operations.
- Maximum concurrency
- No guaranteed order
- Bulk operations without dependencies
Mixed (Hybrid) Execution
Sequential flow with concurrent groups. Mark phases with markConcurrentPhase to form parallel groups.
- Best of both worlds
- Partial parallelization
- Optimize specific sections
Non-Linear Control Flow
Dynamic control via phase decision hooks: CONTINUE, SKIP, REPLAY, JUMP, TERMINATE.
- Conditional execution
- Phase replay with limits
- Early termination
- State machine patterns
Branched Workflows
Multiple independent execution paths with their own phase sequences. Supports branch racing for first-wins semantics.
- Parallel branches
- A/B testing patterns
- Branch-level decisions
- Racing with cancellation
DAG Workflows
Graph-based execution with explicit node dependencies, conditional routing, and merge points.
- Diamond patterns
- Fan-out / fan-in
- Conditional edge traversal
- Higher parallelism
Array-Based vs Graph-Based Workflows
| Feature | stableWorkflow (Array) | stableWorkflowGraph (DAG) |
|---|---|---|
| Topology Definition | Phase array with flags | Explicit nodes and edges |
| Parallelism Model | markConcurrentPhase groups | Automatic from dependencies |
| Conditional Routing | JUMP / SKIP decisions | Conditional nodes & edge conditions |
| Merge / Sync Points | End of concurrent group | Explicit MERGE_POINT nodes |
| Phase Replay | Built-in | Via custom logic |
| Branching | STABLE_WORKFLOW_BRANCH | BRANCH nodes |
| Branch Racing | enableBranchRacing | Parallel group racing |
| Best For | Linear / branched flows with control | Complex DAGs with high parallelism |