Real-world use cases demonstrating the power of stable-request
A comprehensive data synchronization workflow that fetches data from multiple API endpoints, validates and transforms it, then uploads to an internal system with full observability.
Enterprise data synchronization from external APIs with validation, transformation, and batch uploading.
import {
stableWorkflow,
RETRY_STRATEGIES,
PHASE_DECISION_ACTIONS,
REQUEST_METHODS,
VALID_REQUEST_PROTOCOLS,
type STABLE_WORKFLOW_PHASE
} from '@emmvish/stable-request';
// Shared state for workflow
interface SyncState {
syncId: string;
startTime: number;
users: any[];
posts: any[];
comments: any[];
enrichedData: any[];
validationErrors: string[];
uploadedRecords: number;
failedRecords: number;
retryCount: number;
}
const syncState: SyncState = {
syncId: `sync-${Date.now()}`,
startTime: Date.now(),
users: [],
posts: [],
comments: [],
enrichedData: [],
validationErrors: [],
uploadedRecords: 0,
failedRecords: 0,
retryCount: 0
};
// Circuit breaker configuration for source API
const sourceApiBreaker = {
failureThresholdPercentage: 50,
minimumRequests: 3,
recoveryTimeoutMs: 30000,
successThresholdPercentage: 60,
halfOpenMaxRequests: 2,
trackIndividualAttempts: false
};
// Define the multi-phase workflow
const syncPhases: STABLE_WORKFLOW_PHASE[] = [
// Phase 1: Fetch data from multiple sources concurrently
{
id: 'fetch-source-data',
concurrentExecution: true,
requests: [
{
id: 'fetch-users',
requestOptions: {
reqData: {
path: '/users?_limit=5',
method: REQUEST_METHODS.GET
},
resReq: true,
handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
const buffer = commonBuffer as SyncState;
buffer.users = successfulAttemptData.data;
console.log(`✅ Fetched ${buffer.users.length} users`);
}
}
},
{
id: 'fetch-posts',
requestOptions: {
reqData: {
path: '/posts?_limit=10',
method: REQUEST_METHODS.GET
},
resReq: true,
handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
const buffer = commonBuffer as SyncState;
buffer.posts = successfulAttemptData.data;
console.log(`✅ Fetched ${buffer.posts.length} posts`);
}
}
},
{
id: 'fetch-comments',
requestOptions: {
reqData: {
path: '/comments?_limit=20',
method: REQUEST_METHODS.GET
},
resReq: true,
handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
const buffer = commonBuffer as SyncState;
buffer.comments = successfulAttemptData.data;
console.log(`✅ Fetched ${buffer.comments.length} comments`);
}
}
}
]
},
// Phase 2: Data enrichment and transformation
{
id: 'enrich-data',
requests: [
{
id: 'enrich-posts-with-users',
requestOptions: {
reqData: { path: '/posts/1', method: REQUEST_METHODS.GET },
resReq: false,
preExecution: {
preExecutionHook: ({ commonBuffer }) => {
const buffer = commonBuffer as SyncState;
// Enrich posts with user information
buffer.enrichedData = buffer.posts.map(post => {
const user = buffer.users.find(u => u.id === post.userId);
const postComments = buffer.comments.filter(c => c.postId === post.id);
return {
postId: post.id,
title: post.title,
body: post.body,
author: user ? {
id: user.id,
name: user.name,
email: user.email,
company: user.company?.name
} : null,
commentCount: postComments.length,
enrichedAt: new Date().toISOString(),
syncId: buffer.syncId
};
});
console.log(`✅ Enriched ${buffer.enrichedData.length} posts`);
return {};
},
applyPreExecutionConfigOverride: false
}
}
}
]
},
// Phase 3: Data validation with conditional retry
{
id: 'validate-data',
allowReplay: true,
maxReplayCount: 2,
requests: [
{
id: 'validate-enriched-data',
requestOptions: {
reqData: { path: '/posts/1', method: REQUEST_METHODS.GET },
resReq: false,
preExecution: {
preExecutionHook: ({ commonBuffer }) => {
const buffer = commonBuffer as SyncState;
buffer.validationErrors = [];
// Validate enriched data
buffer.enrichedData.forEach((item, index) => {
if (!item.author) {
buffer.validationErrors.push(`Record ${index}: Missing author`);
}
if (!item.title || item.title.length < 5) {
buffer.validationErrors.push(`Record ${index}: Invalid title`);
}
});
console.log(`✅ Validation complete: ${buffer.validationErrors.length} errors`);
return {};
},
applyPreExecutionConfigOverride: false
}
}
}
],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
const buffer = sharedBuffer as SyncState;
if (buffer.validationErrors.length > 0 && buffer.retryCount < 2) {
buffer.retryCount++;
console.log(`⚠️ Validation errors found, retrying...`);
return { action: PHASE_DECISION_ACTIONS.REPLAY };
}
console.log(`✅ Validation passed, proceeding...`);
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
// Phase 4: Batch upload
{
id: 'upload-data',
concurrentExecution: true,
requests: [] // Will be populated dynamically
}
];
// Execute workflow
const result = await stableWorkflow(syncPhases, {
workflowId: 'data-sync-pipeline',
commonRequestData: {
hostname: 'jsonplaceholder.typicode.com',
protocol: VALID_REQUEST_PROTOCOLS.HTTPS
},
commonAttempts: 3,
commonWait: 1000,
commonRetryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
enableNonLinearExecution: true,
circuitBreaker: sourceApiBreaker,
commonCache: { ttl: 300000, enabled: true },
rateLimit: { maxRequests: 50, windowMs: 10000 },
sharedBuffer: syncState,
handlePhaseCompletion: async ({ phaseResult }) => {
console.log(`✅ Phase ${phaseResult.phaseId} completed in ${phaseResult.executionTime}ms`);
}
});
console.log(`\n📊 Workflow Results:`);
console.log(`Total Phases: ${result.totalPhases}`);
console.log(`Success: ${result.success}`);
console.log(`Duration: ${result.executionTime}ms`);
A sophisticated microservice orchestration pattern coordinating user validation, inventory management, payment processing, and notifications with proper failure handling.
E-commerce order processing coordinating multiple microservices with isolated failure handling and proper SLA requirements.
import {
stableWorkflow,
RETRY_STRATEGIES,
PHASE_DECISION_ACTIONS,
CircuitBreaker,
REQUEST_METHODS,
type STABLE_WORKFLOW_BRANCH
} from '@emmvish/stable-request';
// Order processing state
interface OrderContext {
orderId: string;
customerId: number;
items: Array<{ productId: number; quantity: number; price: number }>;
totalAmount: number;
customerData?: any;
inventoryCheck?: { available: boolean; reservationId?: string };
paymentResult?: { success: boolean; transactionId?: string };
notificationsSent?: string[];
startTime: number;
retryAttempts: {
payment: number;
inventory: number;
notification: number;
};
serviceFailures: {
userService: number;
inventoryService: number;
paymentService: number;
notificationService: number;
};
fallbacksUsed: string[];
}
const orderContext: OrderContext = {
orderId: `ORD-${Date.now()}`,
customerId: 1,
items: [
{ productId: 1, quantity: 2, price: 29.99 },
{ productId: 2, quantity: 1, price: 49.99 }
],
totalAmount: 109.97,
startTime: Date.now(),
retryAttempts: { payment: 0, inventory: 0, notification: 0 },
serviceFailures: {
userService: 0,
inventoryService: 0,
paymentService: 0,
notificationService: 0
},
fallbacksUsed: [],
notificationsSent: []
};
// Circuit breakers for each microservice
const circuitBreakers = {
userService: new CircuitBreaker({
failureThresholdPercentage: 60,
minimumRequests: 2,
recoveryTimeoutMs: 20000,
successThresholdPercentage: 50,
halfOpenMaxRequests: 1
}),
inventoryService: new CircuitBreaker({
failureThresholdPercentage: 50,
minimumRequests: 2,
recoveryTimeoutMs: 30000,
successThresholdPercentage: 60,
halfOpenMaxRequests: 2
}),
paymentService: new CircuitBreaker({
failureThresholdPercentage: 40,
minimumRequests: 2,
recoveryTimeoutMs: 45000,
successThresholdPercentage: 70,
halfOpenMaxRequests: 1
}),
notificationService: new CircuitBreaker({
failureThresholdPercentage: 70,
minimumRequests: 3,
recoveryTimeoutMs: 15000,
successThresholdPercentage: 50,
halfOpenMaxRequests: 3
})
};
// Define the branched workflow
const orderBranches: STABLE_WORKFLOW_BRANCH[] = [
// Branch 1: User Validation (Critical)
{
id: 'user-validation',
markConcurrentBranch: false,
phases: [
{
id: 'fetch-user-details',
requests: [
{
id: 'get-user',
groupId: 'critical',
requestOptions: {
reqData: {
path: `/users/${orderContext.customerId}`,
method: REQUEST_METHODS.GET
},
resReq: true,
circuitBreaker: circuitBreakers.userService,
handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
const buffer = commonBuffer as OrderContext;
buffer.customerData = successfulAttemptData.data;
console.log(`✅ Customer validated: ${buffer.customerData.name}`);
},
handleErrors: async ({ errorLog, commonBuffer }) => {
const buffer = commonBuffer as OrderContext;
buffer.serviceFailures.userService++;
console.log(`⚠️ User Service attempt failed`);
}
}
}
]
},
{
id: 'verify-user-status',
requests: [
{
id: 'check-user-status',
groupId: 'critical',
requestOptions: {
reqData: {
path: `/users/${orderContext.customerId}/todos?_limit=1`,
method: REQUEST_METHODS.GET
},
resReq: false,
preExecution: {
preExecutionHook: ({ commonBuffer }) => {
const buffer = commonBuffer as OrderContext;
console.log(`✅ Account status verified for ${buffer.customerData?.name}`);
return {};
},
applyPreExecutionConfigOverride: false,
continueOnPreExecutionHookFailure: false
}
}
}
]
}
]
},
// Branch 2: Inventory Management (Critical with fallback)
{
id: 'inventory-management',
markConcurrentBranch: true,
allowReplay: true,
maxReplayCount: 2,
phases: [
{
id: 'check-inventory',
requests: [
{
id: 'verify-stock',
groupId: 'critical',
requestOptions: {
reqData: {
path: '/posts?_limit=5',
method: REQUEST_METHODS.GET
},
resReq: true,
circuitBreaker: circuitBreakers.inventoryService,
handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
const buffer = commonBuffer as OrderContext;
buffer.inventoryCheck = {
available: true,
reservationId: `RES-${Date.now()}`
};
console.log(`✅ Inventory available, reserved: ${buffer.inventoryCheck.reservationId}`);
}
}
}
]
}
],
branchDecisionHook: async ({ branchResult, sharedBuffer }) => {
const buffer = sharedBuffer as OrderContext;
if (!branchResult.success && buffer.retryAttempts.inventory < 2) {
buffer.retryAttempts.inventory++;
console.log(`🔄 Retrying inventory check (attempt ${buffer.retryAttempts.inventory})...`);
return { action: PHASE_DECISION_ACTIONS.REPLAY };
}
if (!branchResult.success) {
console.log(`❌ Inventory check failed after retries`);
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
}
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
// Branch 3: Payment Processing (Critical)
{
id: 'payment-processing',
markConcurrentBranch: true,
phases: [
{
id: 'authorize-payment',
requests: [
{
id: 'process-payment',
groupId: 'critical',
requestOptions: {
reqData: {
path: '/posts/1',
method: REQUEST_METHODS.GET
},
resReq: true,
circuitBreaker: circuitBreakers.paymentService,
handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
const buffer = commonBuffer as OrderContext;
buffer.paymentResult = {
success: true,
transactionId: `TXN-${Date.now()}`
};
console.log(`✅ Payment processed: ${buffer.paymentResult.transactionId}`);
}
}
}
]
}
]
},
// Branch 4: Notification Service (Optional)
{
id: 'notification-service',
markConcurrentBranch: true,
phases: [
{
id: 'send-notifications',
requests: [
{
id: 'email-notification',
groupId: 'optional',
requestOptions: {
reqData: {
path: '/comments/1',
method: REQUEST_METHODS.GET
},
resReq: false,
circuitBreaker: circuitBreakers.notificationService,
preExecution: {
preExecutionHook: ({ commonBuffer }) => {
const buffer = commonBuffer as OrderContext;
buffer.notificationsSent = ['email', 'sms'];
console.log(`✅ Notifications sent to customer`);
return {};
},
applyPreExecutionConfigOverride: false
}
}
}
]
}
]
}
];
// Execute branched workflow
const result = await stableWorkflow([], {
workflowId: 'order-processing',
commonRequestData: {
hostname: 'jsonplaceholder.typicode.com',
protocol: 'https'
},
enableBranchExecution: true,
branches: orderBranches,
concurrentBranchExecution: true,
requestGroups: [
{
id: 'critical',
commonConfig: {
commonAttempts: 5,
commonWait: 1000,
commonRetryStrategy: RETRY_STRATEGIES.EXPONENTIAL
}
},
{
id: 'optional',
commonConfig: {
commonAttempts: 1,
commonWait: 500
}
}
],
sharedBuffer: orderContext,
handleBranchCompletion: async ({ branchId, success }) => {
if (!success && branchId !== 'notification-service') {
console.error(`❌ Critical branch ${branchId} failed`);
}
}
});
console.log(`\n📊 Order Processing Results:`);
console.log(`Success: ${result.success}`);
console.log(`Order ID: ${orderContext.orderId}`);
console.log(`Duration: ${result.executionTime}ms`);
A comprehensive health monitoring system that tracks service availability, response times, and SLA compliance with real-time alerting.
Production monitoring system tracking API health with automatic alerting on SLA breaches or consecutive failures.
import {
stableRequest,
RETRY_STRATEGIES,
CircuitBreaker,
VALID_REQUEST_PROTOCOLS,
REQUEST_METHODS
} from '@emmvish/stable-request';
// Service endpoint configuration
interface ServiceEndpoint {
name: string;
url: string;
critical: boolean;
slaThresholdMs: number;
healthCheckPath: string;
}
interface HealthCheckResult {
service: string;
status: 'healthy' | 'degraded' | 'down';
responseTime: number;
timestamp: string;
consecutiveFailures: number;
circuitBreakerState: string;
slaCompliant: boolean;
}
// Define monitored services
const services: ServiceEndpoint[] = [
{
name: 'User Authentication API',
url: 'https://jsonplaceholder.typicode.com',
critical: true,
slaThresholdMs: 200,
healthCheckPath: '/users/1'
},
{
name: 'Payment Gateway',
url: 'https://jsonplaceholder.typicode.com',
critical: true,
slaThresholdMs: 500,
healthCheckPath: '/posts/1'
},
{
name: 'Notification Service',
url: 'https://jsonplaceholder.typicode.com',
critical: false,
slaThresholdMs: 1000,
healthCheckPath: '/comments/1'
}
];
// Initialize circuit breakers for each service
const circuitBreakers = new Map();
services.forEach(service => {
circuitBreakers.set(service.name, new CircuitBreaker({
failureThresholdPercentage: service.critical ? 50 : 70,
minimumRequests: 3,
recoveryTimeoutMs: service.critical ? 10000 : 20000,
successThresholdPercentage: 60,
halfOpenMaxRequests: 2
}));
});
// Track consecutive failures per service
const failureTracker = new Map();
// Perform health check using stableRequest
async function checkServiceHealth(service: ServiceEndpoint): Promise {
const startTime = Date.now();
const circuitBreaker = circuitBreakers.get(service.name)!;
console.log(`🔍 Checking ${service.name}...`);
try {
await stableRequest({
reqData: {
hostname: service.url.replace(/^https?:\/\//, ''),
protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
path: service.healthCheckPath,
method: REQUEST_METHODS.GET,
headers: {
'User-Agent': 'HealthMonitor/1.0',
'Accept': 'application/json'
},
timeout: service.slaThresholdMs * 2
},
// Retry configuration based on criticality
attempts: service.critical ? 3 : 2,
wait: 1000,
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
maxAllowedWait: 5000,
// Circuit breaker to prevent cascade failures
circuitBreaker: circuitBreaker,
// Cache successful health checks (5 second TTL)
cache: { ttl: 5000, enabled: true },
// Response validation
resReq: true,
responseAnalyzer: ({ data }) => {
return data && typeof data === 'object' && Object.keys(data).length > 0;
},
// Performance tracking
handleSuccessfulAttemptData: async ({ successfulAttemptData }) => {
const responseTime = successfulAttemptData.executionTime || 0;
const slaCompliant = responseTime <= service.slaThresholdMs;
if (!slaCompliant) {
console.log(` ⚠️ SLA Warning: ${responseTime}ms (threshold: ${service.slaThresholdMs}ms)`);
} else {
console.log(` ✅ Healthy - ${responseTime}ms`);
}
},
// Error handling
finalErrorAnalyzer: async ({ error }) => {
console.log(` ❌ Health check failed: ${error}`);
const currentFailures = failureTracker.get(service.name) || 0;
failureTracker.set(service.name, currentFailures + 1);
if (service.critical && currentFailures >= 2) {
console.log(` 🚨 ALERT: Critical service has ${currentFailures + 1} consecutive failures!`);
}
return false; // Throw error
}
});
// Success - reset failure counter
failureTracker.set(service.name, 0);
const responseTime = Date.now() - startTime;
const slaCompliant = responseTime <= service.slaThresholdMs;
return {
service: service.name,
status: slaCompliant ? 'healthy' : 'degraded',
responseTime,
timestamp: new Date().toISOString(),
consecutiveFailures: 0,
circuitBreakerState: 'CLOSED',
slaCompliant
};
} catch (error) {
const consecutiveFailures = failureTracker.get(service.name) || 0;
return {
service: service.name,
status: 'down',
responseTime: Date.now() - startTime,
timestamp: new Date().toISOString(),
consecutiveFailures,
circuitBreakerState: 'OPEN',
slaCompliant: false
};
}
}
// Monitor all services
async function monitorServices() {
console.log('🏥 Starting Health Monitoring\n');
const results = await Promise.all(
services.map(service => checkServiceHealth(service))
);
console.log('\n📊 Health Monitoring Summary:');
results.forEach(health => {
const icon = health.status === 'healthy' ? '✅' :
health.status === 'degraded' ? '⚠️' : '❌';
console.log(`${icon} ${health.service}: ${health.status} (${health.responseTime}ms)`);
});
}
// Run health checks periodically
setInterval(() => monitorServices(), 10000);
monitorServices(); // Run immediately
Process large batches of images through an external API with concurrency control, rate limiting, and progress tracking.
import {
stableApiGateway,
RETRY_STRATEGIES,
RateLimiter,
ConcurrencyLimiter,
REQUEST_METHODS,
VALID_REQUEST_PROTOCOLS,
type API_GATEWAY_REQUEST
} from '@emmvish/stable-request';
// Image processing job types
interface ImageJob {
id: string;
imageUrl: string;
operations: string[];
priority: 'high' | 'normal' | 'low';
size: 'small' | 'medium' | 'large';
}
// Simulated image processing jobs
const imageBatch: ImageJob[] = [
{ id: 'img-001', imageUrl: '/photos/1', operations: ['resize', 'thumbnail'], priority: 'high', size: 'large' },
{ id: 'img-002', imageUrl: '/photos/2', operations: ['watermark'], priority: 'normal', size: 'medium' },
{ id: 'img-003', imageUrl: '/photos/3', operations: ['compress'], priority: 'low', size: 'small' },
{ id: 'img-004', imageUrl: '/photos/4', operations: ['resize', 'compress'], priority: 'high', size: 'large' },
{ id: 'img-005', imageUrl: '/photos/5', operations: ['thumbnail'], priority: 'normal', size: 'small' },
{ id: 'img-006', imageUrl: '/photos/6', operations: ['watermark', 'compress'], priority: 'low', size: 'medium' },
{ id: 'img-007', imageUrl: '/photos/7', operations: ['resize'], priority: 'high', size: 'large' },
{ id: 'img-008', imageUrl: '/photos/8', operations: ['thumbnail', 'watermark'], priority: 'normal', size: 'medium' },
{ id: 'img-009', imageUrl: '/photos/9', operations: ['compress'], priority: 'low', size: 'small' },
{ id: 'img-010', imageUrl: '/photos/10', operations: ['resize', 'watermark'], priority: 'high', size: 'large' }
];
// Initialize rate limiter (20 requests per second)
const rateLimiter = new RateLimiter(1000, 20);
// Initialize concurrency limiter (max 5 concurrent requests)
const concurrencyLimiter = new ConcurrencyLimiter(5);
// Track processing results
const processingResults = {
successful: [] as string[],
failed: [] as string[],
processingTimes: [] as number[]
};
// Process batch of images
async function processBatchImages() {
const startTime = Date.now();
console.log('🖼️ Starting Batch Image Processing');
console.log('═'.repeat(70));
console.log(`Total Images: ${imageBatch.length}`);
console.log(`Rate Limit: 20 req/sec | Concurrency Limit: 5 concurrent`);
console.log('');
// Convert jobs to API requests
const requests: API_GATEWAY_REQUEST[] = imageBatch.map(job => ({
id: job.id,
groupId: job.priority,
requestOptions: {
reqData: {
path: job.imageUrl,
method: REQUEST_METHODS.GET
},
resReq: true,
// Retry configuration based on priority
attempts: job.priority === 'high' ? 5 : job.priority === 'normal' ? 3 : 2,
wait: job.priority === 'high' ? 2000 : 1000,
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
maxAllowedWait: 10000,
// Apply rate and concurrency limiting
rateLimiter: rateLimiter,
concurrencyLimiter: concurrencyLimiter,
// Success tracking
handleSuccessfulAttemptData: async ({ successfulAttemptData }) => {
processingResults.successful.push(job.id);
processingResults.processingTimes.push(successfulAttemptData.executionTime || 0);
const priorityIcon = job.priority === 'high' ? '🔴' :
job.priority === 'normal' ? '🟡' : '🟢';
console.log(` ${priorityIcon} ${job.id}: Processed ${job.operations.join(', ')} (${successfulAttemptData.executionTime}ms)`);
},
// Error handling
finalErrorAnalyzer: async ({ error }) => {
processingResults.failed.push(job.id);
console.log(` ❌ ${job.id}: Processing failed - ${error}`);
return true; // Suppress error to continue with other images
}
}
}));
console.log('🔄 Processing images...\n');
// Execute batch processing with stableApiGateway
const results = await stableApiGateway(requests, {
concurrentExecution: true,
// Common configuration
commonRequestData: {
hostname: 'jsonplaceholder.typicode.com',
protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
headers: {
'Content-Type': 'application/json',
'X-Batch-ID': `BATCH-${Date.now()}`,
'User-Agent': 'ImageProcessor/2.0'
},
timeout: 10000
},
// Request groups with different retry strategies
requestGroups: [
{
id: 'high',
commonConfig: {
commonAttempts: 5,
commonWait: 2000,
commonRetryStrategy: RETRY_STRATEGIES.EXPONENTIAL
}
},
{
id: 'normal',
commonConfig: {
commonAttempts: 3,
commonWait: 1000,
commonRetryStrategy: RETRY_STRATEGIES.LINEAR
}
},
{
id: 'low',
commonConfig: {
commonAttempts: 2,
commonWait: 500,
commonRetryStrategy: RETRY_STRATEGIES.FIXED
}
}
],
stopOnFirstError: false,
commonAttempts: 3,
commonWait: 1000
});
const duration = Date.now() - startTime;
// Generate processing report
console.log('\n' + '═'.repeat(70));
console.log('📊 BATCH PROCESSING SUMMARY');
console.log('═'.repeat(70));
const successfulResults = results.filter(r => r.success);
const failedResults = results.filter(r => !r.success);
console.log(`\nProcessing Results:`);
console.log(` ✅ Successful: ${successfulResults.length}/${imageBatch.length}`);
console.log(` ❌ Failed: ${failedResults.length}/${imageBatch.length}`);
console.log(` ⏱️ Total Duration: ${duration}ms`);
if (processingResults.processingTimes.length > 0) {
const avgTime = processingResults.processingTimes.reduce((a, b) => a + b, 0) /
processingResults.processingTimes.length;
console.log(` 📈 Average Time per Image: ${avgTime.toFixed(0)}ms`);
}
console.log('\n✨ Batch processing complete!');
return {
totalJobs: imageBatch.length,
successful: successfulResults.length,
failed: failedResults.length,
duration,
averageTime: processingResults.processingTimes.reduce((a, b) => a + b, 0) /
processingResults.processingTimes.length
};
}
// Run the batch processing
processBatchImages();
Test feature flags and new implementations without making real API calls using trial mode with configurable success rates.
import {
stableRequest,
RETRY_STRATEGIES,
REQUEST_METHODS,
VALID_REQUEST_PROTOCOLS
} from '@emmvish/stable-request';
// Test scenarios
interface ChaosScenario {
name: string;
description: string;
failureProbability: number;
retryFailureProbability?: number;
expectedBehavior: string;
}
// Define chaos engineering scenarios
const chaosScenarios: ChaosScenario[] = [
{
name: 'Healthy System',
description: 'No failures - baseline test',
failureProbability: 0,
expectedBehavior: 'All requests succeed immediately'
},
{
name: 'Intermittent Failures',
description: '30% initial failure rate with recovery',
failureProbability: 0.3,
retryFailureProbability: 0.1,
expectedBehavior: 'Some requests fail initially but recover on retry'
},
{
name: 'High Failure Rate',
description: '70% failure rate with some recovery',
failureProbability: 0.7,
retryFailureProbability: 0.4,
expectedBehavior: 'Most requests require multiple retries'
},
{
name: 'Persistent Failures',
description: '50% failure rate that persists',
failureProbability: 0.5,
retryFailureProbability: 0.9,
expectedBehavior: 'Significant failures even after retries'
},
{
name: 'Complete Outage',
description: '100% failure rate - total unavailability',
failureProbability: 1.0,
retryFailureProbability: 1.0,
expectedBehavior: 'All requests fail after exhausting retries'
}
];
// Track test results
interface TestResult {
scenario: string;
success: boolean;
attempts: number;
duration: number;
error?: string;
}
const testResults: TestResult[] = [];
// Run chaos test for a single scenario
async function runChaosTest(scenario: ChaosScenario): Promise {
console.log(`\n🧪 Testing: ${scenario.name}`);
console.log(` ${scenario.description}`);
console.log(` Failure Rate: ${(scenario.failureProbability * 100).toFixed(0)}% initial`);
const startTime = Date.now();
let attemptCount = 0;
try {
await stableRequest({
reqData: {
hostname: 'api.example.com',
protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
path: '/chaos-test',
method: REQUEST_METHODS.GET,
headers: {
'X-Test-Scenario': scenario.name
},
timeout: 5000
},
// Configure retry behavior
attempts: 5,
wait: 500,
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
maxAllowedWait: 3000,
// Enable trial mode for failure simulation
trialMode: {
enabled: true,
reqFailureProbability: scenario.failureProbability,
retryFailureProbability: scenario.retryFailureProbability
},
// Track all attempts
handleSuccessfulAttemptData: async ({ successfulAttemptData }) => {
attemptCount = parseInt(successfulAttemptData.attempt?.split('/')[0] || '1');
if (attemptCount > 1) {
console.log(` ✅ Recovered on attempt ${attemptCount} (${successfulAttemptData.executionTime}ms)`);
} else {
console.log(` ✅ Succeeded immediately (${successfulAttemptData.executionTime}ms)`);
}
},
// Error analysis
finalErrorAnalyzer: async ({ error }) => {
console.log(` ❌ Failed: ${error}`);
return false; // Re-throw
},
resReq: true
});
const duration = Date.now() - startTime;
return {
scenario: scenario.name,
success: true,
attempts: attemptCount,
duration
};
} catch (error) {
const duration = Date.now() - startTime;
return {
scenario: scenario.name,
success: false,
attempts: attemptCount,
duration,
error: error instanceof Error ? error.message : String(error)
};
}
}
// Run all chaos engineering tests
async function runChaosEngineeringTests() {
console.log('🌪️ Starting Chaos Engineering Tests');
console.log('═'.repeat(70));
console.log('Testing system resilience under various failure scenarios');
console.log(`Total Scenarios: ${chaosScenarios.length}`);
console.log('═'.repeat(70));
// Run each scenario
for (const scenario of chaosScenarios) {
const result = await runChaosTest(scenario);
testResults.push(result);
await new Promise(resolve => setTimeout(resolve, 200));
}
// Generate comprehensive resilience report
console.log('\n' + '═'.repeat(70));
console.log('📊 CHAOS ENGINEERING RESULTS');
console.log('═'.repeat(70));
const successfulTests = testResults.filter(r => r.success).length;
console.log(`\nOverall Statistics:`);
console.log(` Tests Passed: ${successfulTests}/${testResults.length}`);
console.log(` Tests Failed: ${testResults.length - successfulTests}/${testResults.length}`);
console.log(`\nResilience Analysis:`);
testResults.forEach((result, index) => {
const scenario = chaosScenarios[index];
const icon = result.success ? '✅' : '❌';
console.log(`\n${icon} ${result.scenario}:`);
console.log(` Expected: ${scenario.expectedBehavior}`);
console.log(` Result: ${result.success ? 'SUCCESS' : 'FAILED'}`);
console.log(` Attempts Used: ${result.attempts}`);
console.log(` Duration: ${result.duration}ms`);
if (result.error) {
console.log(` Error: ${result.error}`);
}
});
console.log('\n' + '═'.repeat(70));
console.log('✅ Chaos engineering tests complete!');
return {
totalTests: testResults.length,
passed: successfulTests,
failed: testResults.length - successfulTests,
results: testResults
};
}
// Run chaos engineering tests
runChaosEngineeringTests();
A production-grade distributed data processing workflow with Redis-based state persistence, enabling workflow recovery, resumption, and distributed execution across multiple instances.
Large-scale data migration pipeline that can be resumed from any checkpoint, run across multiple server instances, and provide real-time progress visibility. Perfect for long-running workflows that need resilience against failures.
import {
stableWorkflow,
PHASE_DECISION_ACTIONS,
REQUEST_METHODS,
VALID_REQUEST_PROTOCOLS,
type STABLE_WORKFLOW_PHASE,
type StatePersistenceOptions
} from '@emmvish/stable-request';
// ============================================================================
// STATE PERSISTENCE LAYER
// ============================================================================
/**
* Simulated Redis-like storage
* In production, use actual Redis client (ioredis or node-redis)
*/
class StateStorage {
private storage: Map = new Map();
async setex(key: string, ttl: number, value: string): Promise {
const expiresAt = Date.now() + (ttl * 1000);
this.storage.set(key, { value, expiresAt });
console.log(` 💾 Stored state: ${key}`);
}
async get(key: string): Promise {
const entry = this.storage.get(key);
if (!entry || Date.now() > entry.expiresAt) return null;
console.log(` 📥 Loaded state: ${key}`);
return entry.value;
}
async del(key: string): Promise {
this.storage.delete(key);
}
}
const stateStore = new StateStorage();
/**
* Redis persistence function with distributed locking
*/
async function persistToRedis({ executionContext, params, buffer }: StatePersistenceOptions): Promise> {
const { workflowId, phaseId, branchId } = executionContext;
const { ttl = 86400, enableLocking = false, namespace = 'workflow' } = params || {};
// Generate hierarchical state key
const stateKey = `${namespace}:${workflowId}:${branchId || 'main'}:${phaseId || 'global'}`;
const lockKey = `lock:${stateKey}`;
const isStoring = buffer && Object.keys(buffer).length > 0;
if (enableLocking) {
await stateStore.setex(lockKey, 5, `${Date.now()}-${Math.random()}`);
}
try {
if (isStoring) {
// STORE MODE
const stateWithMeta = {
...buffer,
_meta: {
workflowId,
phaseId,
timestamp: new Date().toISOString(),
version: (buffer._meta?.version || 0) + 1
}
};
await stateStore.setex(stateKey, ttl, JSON.stringify(stateWithMeta));
// Audit log
const auditKey = `${namespace}:audit:${workflowId}:${Date.now()}`;
await stateStore.setex(auditKey, ttl * 2, JSON.stringify({
action: 'state_saved',
phaseId,
timestamp: new Date().toISOString()
}));
} else {
// LOAD MODE
const data = await stateStore.get(stateKey);
return data ? JSON.parse(data) : {};
}
} finally {
if (enableLocking) {
await stateStore.del(lockKey);
}
}
return {};
}
/**
* Checkpoint persistence for phase completion tracking
*/
async function createCheckpoint({ executionContext, params, buffer }: StatePersistenceOptions): Promise> {
const { workflowId, phaseId } = executionContext;
const { ttl = 86400 } = params || {};
const checkpointKey = `checkpoint:${workflowId || 'default'}`;
if (buffer && Object.keys(buffer).length > 0) {
const existingData = await stateStore.get(checkpointKey);
const existing = existingData ? JSON.parse(existingData) : {};
const checkpointData = {
...existing,
completedPhases: [...new Set([...(existing.completedPhases || []), ...(buffer.completedPhases || [])])],
lastPhase: phaseId || existing.lastPhase,
lastUpdated: new Date().toISOString(),
progress: buffer.progress || existing.progress || 0,
processedRecords: buffer.recordsProcessed || existing.processedRecords || 0
};
await stateStore.setex(checkpointKey, ttl, JSON.stringify(checkpointData));
console.log(` ✅ Checkpoint saved (Progress: ${checkpointData.progress}%)`);
} else {
const data = await stateStore.get(checkpointKey);
return data ? JSON.parse(data) : { completedPhases: [], processedRecords: 0 };
}
return {};
}
// ============================================================================
// WORKFLOW STATE
// ============================================================================
interface WorkflowState {
sourceRecords: any[];
transformedRecords: any[];
validatedRecords: any[];
migratedRecords: any[];
failedRecords: any[];
completedPhases: string[];
currentPhase: string;
progress: number;
recordsProcessed: number;
totalRecords: number;
startTime: number;
lastUpdateTime: number;
attemptCount: number;
errors: string[];
canResume: boolean;
resumeFromPhase: string | null;
}
const workflowState: WorkflowState = {
sourceRecords: [],
transformedRecords: [],
validatedRecords: [],
migratedRecords: [],
failedRecords: [],
completedPhases: [],
currentPhase: '',
progress: 0,
recordsProcessed: 0,
totalRecords: 0,
startTime: Date.now(),
lastUpdateTime: Date.now(),
attemptCount: 0,
errors: [],
canResume: true,
resumeFromPhase: null
};
// ============================================================================
// WORKFLOW DEFINITION
// ============================================================================
const WORKFLOW_ID = `migration-${Date.now()}`;
const SOURCE_API = 'jsonplaceholder.typicode.com';
const migrationPhases: STABLE_WORKFLOW_PHASE[] = [
// PHASE 1: Data Extraction
{
id: 'extract-source-data',
requests: [
{
id: 'extract-users',
requestOptions: {
reqData: {
hostname: SOURCE_API,
path: '/posts/1',
method: REQUEST_METHODS.GET
},
resReq: false,
preExecution: {
preExecutionHook: ({ commonBuffer }) => {
console.log('\n📥 Phase 1: Extracting source data...');
const buffer = commonBuffer as WorkflowState;
// Generate mock data
buffer.sourceRecords = Array.from({ length: 100 }, (_, i) => ({
id: i + 1,
name: `User ${i + 1}`,
email: `user${i + 1}@example.com`,
username: `user${i + 1}`,
company: { name: `Company ${i % 10}` },
address: { geo: { lat: '0.0', lng: '0.0' } }
}));
buffer.totalRecords = buffer.sourceRecords.length;
buffer.progress = 20;
buffer.recordsProcessed = buffer.sourceRecords.length;
console.log(` ✅ Extracted ${buffer.sourceRecords.length} records`);
return {};
},
applyPreExecutionConfigOverride: false
}
}
}
],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
const buffer = sharedBuffer as WorkflowState;
if (buffer.completedPhases.includes('extract-source-data')) {
console.log(` ⏩ Phase already completed, skipping...`);
return { action: PHASE_DECISION_ACTIONS.SKIP, skipToPhaseId: 'transform-data' };
}
if (phaseResult.success && buffer.sourceRecords.length > 0) {
buffer.completedPhases.push('extract-source-data');
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
},
statePersistence: {
persistenceFunction: persistToRedis,
persistenceParams: { ttl: 3600, enableLocking: true, namespace: 'migration' },
loadBeforeHooks: true,
storeAfterHooks: true
}
},
// PHASE 2: Data Transformation
{
id: 'transform-data',
requests: [
{
id: 'transform-records',
requestOptions: {
reqData: {
hostname: SOURCE_API,
path: '/posts/1',
method: REQUEST_METHODS.GET
},
resReq: false,
preExecution: {
preExecutionHook: ({ commonBuffer }) => {
console.log('\n🔄 Phase 2: Transforming data...');
const buffer = commonBuffer as WorkflowState;
buffer.transformedRecords = buffer.sourceRecords.map(record => ({
id: record.id,
externalId: record.id,
name: record.name,
email: record.email?.toLowerCase(),
username: record.username,
company: record.company?.name,
metadata: {
importedAt: new Date().toISOString(),
source: 'jsonplaceholder',
workflowId: WORKFLOW_ID
}
}));
buffer.progress = 40;
console.log(` ✅ Transformed ${buffer.transformedRecords.length} records`);
return {};
},
applyPreExecutionConfigOverride: false
}
}
}
],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
const buffer = sharedBuffer as WorkflowState;
if (buffer.completedPhases.includes('transform-data')) {
console.log(` ⏩ Phase already completed, skipping...`);
return { action: PHASE_DECISION_ACTIONS.SKIP, skipToPhaseId: 'validate-data' };
}
if (phaseResult.success && buffer.transformedRecords.length > 0) {
buffer.completedPhases.push('transform-data');
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
if (buffer.attemptCount < 2) {
buffer.attemptCount++;
return { action: PHASE_DECISION_ACTIONS.REPLAY };
}
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
},
allowReplay: true,
maxReplayCount: 2,
statePersistence: {
persistenceFunction: persistToRedis,
persistenceParams: { ttl: 3600, enableLocking: true, namespace: 'migration' },
loadBeforeHooks: true,
storeAfterHooks: true
}
},
// PHASE 3: Data Validation
{
id: 'validate-data',
requests: [
{
id: 'validate-records',
requestOptions: {
reqData: {
hostname: SOURCE_API,
path: '/posts/1',
method: REQUEST_METHODS.GET
},
resReq: false,
preExecution: {
preExecutionHook: ({ commonBuffer }) => {
console.log('\n🔍 Phase 3: Validating data...');
const buffer = commonBuffer as WorkflowState;
buffer.validatedRecords = buffer.transformedRecords.filter(record => {
return record.email && record.email.includes('@') &&
record.name && record.name.length >= 3;
});
buffer.failedRecords = buffer.transformedRecords.filter(record => {
return !record.email || !record.email.includes('@') ||
!record.name || record.name.length < 3;
});
buffer.progress = 70;
console.log(` ✅ Validated: ${buffer.validatedRecords.length} passed, ${buffer.failedRecords.length} failed`);
return {};
},
applyPreExecutionConfigOverride: false
}
}
}
],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
const buffer = sharedBuffer as WorkflowState;
if (buffer.completedPhases.includes('validate-data')) {
console.log(` ⏩ Phase already completed, skipping...`);
return { action: PHASE_DECISION_ACTIONS.SKIP, skipToPhaseId: 'migrate-data' };
}
if (phaseResult.success && buffer.validatedRecords.length > 0) {
buffer.completedPhases.push('validate-data');
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
},
statePersistence: {
persistenceFunction: persistToRedis,
persistenceParams: { ttl: 3600, enableLocking: true, namespace: 'migration' },
loadBeforeHooks: true,
storeAfterHooks: true
}
}
];
// ============================================================================
// WORKFLOW EXECUTION
// ============================================================================
console.log('🚀 Starting Distributed Workflow with State Persistence');
console.log(`Workflow ID: ${WORKFLOW_ID}\n`);
const result = await stableWorkflow(migrationPhases, {
workflowId: WORKFLOW_ID,
commonRequestData: {
hostname: SOURCE_API,
protocol: VALID_REQUEST_PROTOCOLS.HTTPS
},
enableNonLinearExecution: true,
stopOnFirstPhaseError: false,
maxWorkflowIterations: 100,
sharedBuffer: workflowState,
commonStatePersistence: {
persistenceFunction: createCheckpoint,
persistenceParams: { ttl: 7200 },
loadBeforeHooks: true,
storeAfterHooks: true
},
handlePhaseCompletion: async ({ phaseResult, sharedBuffer }) => {
const buffer = sharedBuffer as WorkflowState;
console.log(`\n✅ Phase "${phaseResult.phaseId}" completed`);
console.log(` Duration: ${phaseResult.executionTime}ms`);
console.log(` Progress: ${buffer.progress}%`);
}
});
console.log('\n📊 Workflow Results:');
console.log(`Success: ${result.success}`);
console.log(`Total Phases: ${result.totalPhases}`);
console.log(`Duration: ${result.executionTime}ms`);
console.log(`Records Processed: ${workflowState.recordsProcessed}`);
/**
* Resume workflow from last checkpoint
* This function demonstrates how to recover and resume a failed workflow
*/
async function resumeWorkflow(workflowId: string) {
// Load checkpoint from storage
const checkpointData = await stateStore.get(`checkpoint:${workflowId}`);
const checkpoint = checkpointData ? JSON.parse(checkpointData) : {
completedPhases: [],
processedRecords: 0
};
console.log(`📂 Resuming workflow: ${workflowId}`);
console.log(` Last phase: ${checkpoint.lastPhase || 'start'}`);
console.log(` Completed: ${checkpoint.completedPhases.join(', ') || 'none'}`);
// Restore state from checkpoint
const restoredState: WorkflowState = {
...workflowState,
completedPhases: checkpoint.completedPhases,
processedRecords: checkpoint.processedRecords,
progress: checkpoint.progress || 0
};
// Execute workflow - completed phases will be skipped automatically
const result = await stableWorkflow(migrationPhases, {
workflowId,
commonRequestData: {
hostname: SOURCE_API,
protocol: VALID_REQUEST_PROTOCOLS.HTTPS
},
enableNonLinearExecution: true,
sharedBuffer: restoredState,
commonStatePersistence: {
persistenceFunction: createCheckpoint,
persistenceParams: { ttl: 7200 },
loadBeforeHooks: true,
storeAfterHooks: true
}
});
console.log(`\n✅ Workflow resumed and ${result.success ? 'completed' : 'failed'}`);
return result;
}
// Example usage: Can be called multiple times to resume from last checkpoint
// await resumeWorkflow('migration-12345');