Production-Ready Examples

Real-world use cases demonstrating the power of stable-request

01

Multi-Source Data Synchronization Pipeline

Non-Linear Workflow Circuit Breaker Caching Rate Limiting

A comprehensive data synchronization workflow that fetches data from multiple API endpoints, validates and transforms it, then uploads to an internal system with full observability.

Key Features:

  • ✅ Multi-phase workflow orchestration
  • ✅ Concurrent data fetching from multiple sources
  • ✅ Phase decision hooks with REPLAY action
  • ✅ Circuit breaker integration
  • ✅ Response caching with TTL
  • ✅ Rate limiting (50 requests per 10 seconds)
  • ✅ Exponential backoff retry strategy
  • ✅ Pre-execution hooks for data enrichment
  • ✅ Shared buffer for cross-phase state management

Use Case:

Enterprise data synchronization from external APIs with validation, transformation, and batch uploading.

Complete Working Example

import { 
  stableWorkflow, 
  RETRY_STRATEGIES, 
  PHASE_DECISION_ACTIONS,
  REQUEST_METHODS,
  VALID_REQUEST_PROTOCOLS,
  type STABLE_WORKFLOW_PHASE
} from '@emmvish/stable-request';

// Shared state for workflow
interface SyncState {
  syncId: string;
  startTime: number;
  users: any[];
  posts: any[];
  comments: any[];
  enrichedData: any[];
  validationErrors: string[];
  uploadedRecords: number;
  failedRecords: number;
  retryCount: number;
}

const syncState: SyncState = {
  syncId: `sync-${Date.now()}`,
  startTime: Date.now(),
  users: [],
  posts: [],
  comments: [],
  enrichedData: [],
  validationErrors: [],
  uploadedRecords: 0,
  failedRecords: 0,
  retryCount: 0
};

// Circuit breaker configuration for source API
const sourceApiBreaker = {
  failureThresholdPercentage: 50,
  minimumRequests: 3,
  recoveryTimeoutMs: 30000,
  successThresholdPercentage: 60,
  halfOpenMaxRequests: 2,
  trackIndividualAttempts: false
};

// Define the multi-phase workflow
const syncPhases: STABLE_WORKFLOW_PHASE[] = [
  // Phase 1: Fetch data from multiple sources concurrently
  {
    id: 'fetch-source-data',
    concurrentExecution: true,
    requests: [
      {
        id: 'fetch-users',
        requestOptions: {
          reqData: { 
            path: '/users?_limit=5',
            method: REQUEST_METHODS.GET
          },
          resReq: true,
          handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
            const buffer = commonBuffer as SyncState;
            buffer.users = successfulAttemptData.data;
            console.log(`✅ Fetched ${buffer.users.length} users`);
          }
        }
      },
      {
        id: 'fetch-posts',
        requestOptions: {
          reqData: { 
            path: '/posts?_limit=10',
            method: REQUEST_METHODS.GET
          },
          resReq: true,
          handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
            const buffer = commonBuffer as SyncState;
            buffer.posts = successfulAttemptData.data;
            console.log(`✅ Fetched ${buffer.posts.length} posts`);
          }
        }
      },
      {
        id: 'fetch-comments',
        requestOptions: {
          reqData: { 
            path: '/comments?_limit=20',
            method: REQUEST_METHODS.GET
          },
          resReq: true,
          handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
            const buffer = commonBuffer as SyncState;
            buffer.comments = successfulAttemptData.data;
            console.log(`✅ Fetched ${buffer.comments.length} comments`);
          }
        }
      }
    ]
  },

  // Phase 2: Data enrichment and transformation
  {
    id: 'enrich-data',
    requests: [
      {
        id: 'enrich-posts-with-users',
        requestOptions: {
          reqData: { path: '/posts/1', method: REQUEST_METHODS.GET },
          resReq: false,
          preExecution: {
            preExecutionHook: ({ commonBuffer }) => {
              const buffer = commonBuffer as SyncState;
              
              // Enrich posts with user information
              buffer.enrichedData = buffer.posts.map(post => {
                const user = buffer.users.find(u => u.id === post.userId);
                const postComments = buffer.comments.filter(c => c.postId === post.id);
                
                return {
                  postId: post.id,
                  title: post.title,
                  body: post.body,
                  author: user ? {
                    id: user.id,
                    name: user.name,
                    email: user.email,
                    company: user.company?.name
                  } : null,
                  commentCount: postComments.length,
                  enrichedAt: new Date().toISOString(),
                  syncId: buffer.syncId
                };
              });
              
              console.log(`✅ Enriched ${buffer.enrichedData.length} posts`);
              return {};
            },
            applyPreExecutionConfigOverride: false
          }
        }
      }
    ]
  },

  // Phase 3: Data validation with conditional retry
  {
    id: 'validate-data',
    allowReplay: true,
    maxReplayCount: 2,
    requests: [
      {
        id: 'validate-enriched-data',
        requestOptions: {
          reqData: { path: '/posts/1', method: REQUEST_METHODS.GET },
          resReq: false,
          preExecution: {
            preExecutionHook: ({ commonBuffer }) => {
              const buffer = commonBuffer as SyncState;
              buffer.validationErrors = [];
              
              // Validate enriched data
              buffer.enrichedData.forEach((item, index) => {
                if (!item.author) {
                  buffer.validationErrors.push(`Record ${index}: Missing author`);
                }
                if (!item.title || item.title.length < 5) {
                  buffer.validationErrors.push(`Record ${index}: Invalid title`);
                }
              });
              
              console.log(`✅ Validation complete: ${buffer.validationErrors.length} errors`);
              return {};
            },
            applyPreExecutionConfigOverride: false
          }
        }
      }
    ],
    phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
      const buffer = sharedBuffer as SyncState;
      
      if (buffer.validationErrors.length > 0 && buffer.retryCount < 2) {
        buffer.retryCount++;
        console.log(`⚠️  Validation errors found, retrying...`);
        return { action: PHASE_DECISION_ACTIONS.REPLAY };
      }
      
      console.log(`✅ Validation passed, proceeding...`);
      return { action: PHASE_DECISION_ACTIONS.CONTINUE };
    }
  },

  // Phase 4: Batch upload
  {
    id: 'upload-data',
    concurrentExecution: true,
    requests: [] // Will be populated dynamically
  }
];

// Execute workflow
const result = await stableWorkflow(syncPhases, {
  workflowId: 'data-sync-pipeline',
  commonRequestData: { 
    hostname: 'jsonplaceholder.typicode.com',
    protocol: VALID_REQUEST_PROTOCOLS.HTTPS
  },
  commonAttempts: 3,
  commonWait: 1000,
  commonRetryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
  enableNonLinearExecution: true,
  circuitBreaker: sourceApiBreaker,
  commonCache: { ttl: 300000, enabled: true },
  rateLimit: { maxRequests: 50, windowMs: 10000 },
  sharedBuffer: syncState,
  handlePhaseCompletion: async ({ phaseResult }) => {
    console.log(`✅ Phase ${phaseResult.phaseId} completed in ${phaseResult.executionTime}ms`);
  }
});

console.log(`\n📊 Workflow Results:`);
console.log(`Total Phases: ${result.totalPhases}`);
console.log(`Success: ${result.success}`);
console.log(`Duration: ${result.executionTime}ms`);
02

Resilient Microservice Orchestration

Branched Workflows Circuit Breakers Request Grouping Fallback Strategies

A sophisticated microservice orchestration pattern coordinating user validation, inventory management, payment processing, and notifications with proper failure handling.

Key Features:

  • ✅ Branch workflow execution
  • ✅ Mixed concurrent and sequential branches
  • ✅ Multiple circuit breakers (one per service)
  • ✅ Request grouping with different retry policies
  • ✅ Branch decision hooks (REPLAY and TERMINATE actions)
  • ✅ Workflow termination on critical failures
  • ✅ Graceful handling of non-critical failures
  • ✅ Complex state management across branches

Use Case:

E-commerce order processing coordinating multiple microservices with isolated failure handling and proper SLA requirements.

Complete Working Example

import { 
  stableWorkflow, 
  RETRY_STRATEGIES, 
  PHASE_DECISION_ACTIONS,
  CircuitBreaker,
  REQUEST_METHODS,
  type STABLE_WORKFLOW_BRANCH
} from '@emmvish/stable-request';

// Order processing state
interface OrderContext {
  orderId: string;
  customerId: number;
  items: Array<{ productId: number; quantity: number; price: number }>;
  totalAmount: number;
  customerData?: any;
  inventoryCheck?: { available: boolean; reservationId?: string };
  paymentResult?: { success: boolean; transactionId?: string };
  notificationsSent?: string[];
  startTime: number;
  retryAttempts: {
    payment: number;
    inventory: number;
    notification: number;
  };
  serviceFailures: {
    userService: number;
    inventoryService: number;
    paymentService: number;
    notificationService: number;
  };
  fallbacksUsed: string[];
}

const orderContext: OrderContext = {
  orderId: `ORD-${Date.now()}`,
  customerId: 1,
  items: [
    { productId: 1, quantity: 2, price: 29.99 },
    { productId: 2, quantity: 1, price: 49.99 }
  ],
  totalAmount: 109.97,
  startTime: Date.now(),
  retryAttempts: { payment: 0, inventory: 0, notification: 0 },
  serviceFailures: {
    userService: 0,
    inventoryService: 0,
    paymentService: 0,
    notificationService: 0
  },
  fallbacksUsed: [],
  notificationsSent: []
};

// Circuit breakers for each microservice
const circuitBreakers = {
  userService: new CircuitBreaker({
    failureThresholdPercentage: 60,
    minimumRequests: 2,
    recoveryTimeoutMs: 20000,
    successThresholdPercentage: 50,
    halfOpenMaxRequests: 1
  }),
  inventoryService: new CircuitBreaker({
    failureThresholdPercentage: 50,
    minimumRequests: 2,
    recoveryTimeoutMs: 30000,
    successThresholdPercentage: 60,
    halfOpenMaxRequests: 2
  }),
  paymentService: new CircuitBreaker({
    failureThresholdPercentage: 40,
    minimumRequests: 2,
    recoveryTimeoutMs: 45000,
    successThresholdPercentage: 70,
    halfOpenMaxRequests: 1
  }),
  notificationService: new CircuitBreaker({
    failureThresholdPercentage: 70,
    minimumRequests: 3,
    recoveryTimeoutMs: 15000,
    successThresholdPercentage: 50,
    halfOpenMaxRequests: 3
  })
};

// Define the branched workflow
const orderBranches: STABLE_WORKFLOW_BRANCH[] = [
  // Branch 1: User Validation (Critical)
  {
    id: 'user-validation',
    markConcurrentBranch: false,
    phases: [
      {
        id: 'fetch-user-details',
        requests: [
          {
            id: 'get-user',
            groupId: 'critical',
            requestOptions: {
              reqData: {
                path: `/users/${orderContext.customerId}`,
                method: REQUEST_METHODS.GET
              },
              resReq: true,
              circuitBreaker: circuitBreakers.userService,
              handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
                const buffer = commonBuffer as OrderContext;
                buffer.customerData = successfulAttemptData.data;
                console.log(`✅ Customer validated: ${buffer.customerData.name}`);
              },
              handleErrors: async ({ errorLog, commonBuffer }) => {
                const buffer = commonBuffer as OrderContext;
                buffer.serviceFailures.userService++;
                console.log(`⚠️  User Service attempt failed`);
              }
            }
          }
        ]
      },
      {
        id: 'verify-user-status',
        requests: [
          {
            id: 'check-user-status',
            groupId: 'critical',
            requestOptions: {
              reqData: {
                path: `/users/${orderContext.customerId}/todos?_limit=1`,
                method: REQUEST_METHODS.GET
              },
              resReq: false,
              preExecution: {
                preExecutionHook: ({ commonBuffer }) => {
                  const buffer = commonBuffer as OrderContext;
                  console.log(`✅ Account status verified for ${buffer.customerData?.name}`);
                  return {};
                },
                applyPreExecutionConfigOverride: false,
                continueOnPreExecutionHookFailure: false
              }
            }
          }
        ]
      }
    ]
  },

  // Branch 2: Inventory Management (Critical with fallback)
  {
    id: 'inventory-management',
    markConcurrentBranch: true,
    allowReplay: true,
    maxReplayCount: 2,
    phases: [
      {
        id: 'check-inventory',
        requests: [
          {
            id: 'verify-stock',
            groupId: 'critical',
            requestOptions: {
              reqData: {
                path: '/posts?_limit=5',
                method: REQUEST_METHODS.GET
              },
              resReq: true,
              circuitBreaker: circuitBreakers.inventoryService,
              handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
                const buffer = commonBuffer as OrderContext;
                buffer.inventoryCheck = {
                  available: true,
                  reservationId: `RES-${Date.now()}`
                };
                console.log(`✅ Inventory available, reserved: ${buffer.inventoryCheck.reservationId}`);
              }
            }
          }
        ]
      }
    ],
    branchDecisionHook: async ({ branchResult, sharedBuffer }) => {
      const buffer = sharedBuffer as OrderContext;
      
      if (!branchResult.success && buffer.retryAttempts.inventory < 2) {
        buffer.retryAttempts.inventory++;
        console.log(`🔄 Retrying inventory check (attempt ${buffer.retryAttempts.inventory})...`);
        return { action: PHASE_DECISION_ACTIONS.REPLAY };
      }
      
      if (!branchResult.success) {
        console.log(`❌ Inventory check failed after retries`);
        return { action: PHASE_DECISION_ACTIONS.TERMINATE };
      }
      
      return { action: PHASE_DECISION_ACTIONS.CONTINUE };
    }
  },

  // Branch 3: Payment Processing (Critical)
  {
    id: 'payment-processing',
    markConcurrentBranch: true,
    phases: [
      {
        id: 'authorize-payment',
        requests: [
          {
            id: 'process-payment',
            groupId: 'critical',
            requestOptions: {
              reqData: {
                path: '/posts/1',
                method: REQUEST_METHODS.GET
              },
              resReq: true,
              circuitBreaker: circuitBreakers.paymentService,
              handleSuccessfulAttemptData: async ({ successfulAttemptData, commonBuffer }) => {
                const buffer = commonBuffer as OrderContext;
                buffer.paymentResult = {
                  success: true,
                  transactionId: `TXN-${Date.now()}`
                };
                console.log(`✅ Payment processed: ${buffer.paymentResult.transactionId}`);
              }
            }
          }
        ]
      }
    ]
  },

  // Branch 4: Notification Service (Optional)
  {
    id: 'notification-service',
    markConcurrentBranch: true,
    phases: [
      {
        id: 'send-notifications',
        requests: [
          {
            id: 'email-notification',
            groupId: 'optional',
            requestOptions: {
              reqData: {
                path: '/comments/1',
                method: REQUEST_METHODS.GET
              },
              resReq: false,
              circuitBreaker: circuitBreakers.notificationService,
              preExecution: {
                preExecutionHook: ({ commonBuffer }) => {
                  const buffer = commonBuffer as OrderContext;
                  buffer.notificationsSent = ['email', 'sms'];
                  console.log(`✅ Notifications sent to customer`);
                  return {};
                },
                applyPreExecutionConfigOverride: false
              }
            }
          }
        ]
      }
    ]
  }
];

// Execute branched workflow
const result = await stableWorkflow([], {
  workflowId: 'order-processing',
  commonRequestData: { 
    hostname: 'jsonplaceholder.typicode.com',
    protocol: 'https'
  },
  enableBranchExecution: true,
  branches: orderBranches,
  concurrentBranchExecution: true,
  requestGroups: [
    {
      id: 'critical',
      commonConfig: {
        commonAttempts: 5,
        commonWait: 1000,
        commonRetryStrategy: RETRY_STRATEGIES.EXPONENTIAL
      }
    },
    {
      id: 'optional',
      commonConfig: {
        commonAttempts: 1,
        commonWait: 500
      }
    }
  ],
  sharedBuffer: orderContext,
  handleBranchCompletion: async ({ branchId, success }) => {
    if (!success && branchId !== 'notification-service') {
      console.error(`❌ Critical branch ${branchId} failed`);
    }
  }
});

console.log(`\n📊 Order Processing Results:`);
console.log(`Success: ${result.success}`);
console.log(`Order ID: ${orderContext.orderId}`);
console.log(`Duration: ${result.executionTime}ms`);
03

Production API Health Monitoring

Health Checks SLA Tracking Alerting Metrics

A comprehensive health monitoring system that tracks service availability, response times, and SLA compliance with real-time alerting.

Key Features:

  • ✅ Individual service health checks with stableRequest
  • ✅ Circuit breaker per service
  • ✅ Exponential backoff retry strategy
  • ✅ Response caching with 5-second TTL
  • ✅ SLA threshold validation (200ms - 2000ms)
  • ✅ Consecutive failure tracking with alerting
  • ✅ Critical vs optional service differentiation
  • ✅ Real-time performance metrics

Use Case:

Production monitoring system tracking API health with automatic alerting on SLA breaches or consecutive failures.

Complete Working Example

import { 
  stableRequest, 
  RETRY_STRATEGIES,
  CircuitBreaker,
  VALID_REQUEST_PROTOCOLS,
  REQUEST_METHODS
} from '@emmvish/stable-request';

// Service endpoint configuration
interface ServiceEndpoint {
  name: string;
  url: string;
  critical: boolean;
  slaThresholdMs: number;
  healthCheckPath: string;
}

interface HealthCheckResult {
  service: string;
  status: 'healthy' | 'degraded' | 'down';
  responseTime: number;
  timestamp: string;
  consecutiveFailures: number;
  circuitBreakerState: string;
  slaCompliant: boolean;
}

// Define monitored services
const services: ServiceEndpoint[] = [
  {
    name: 'User Authentication API',
    url: 'https://jsonplaceholder.typicode.com',
    critical: true,
    slaThresholdMs: 200,
    healthCheckPath: '/users/1'
  },
  {
    name: 'Payment Gateway',
    url: 'https://jsonplaceholder.typicode.com',
    critical: true,
    slaThresholdMs: 500,
    healthCheckPath: '/posts/1'
  },
  {
    name: 'Notification Service',
    url: 'https://jsonplaceholder.typicode.com',
    critical: false,
    slaThresholdMs: 1000,
    healthCheckPath: '/comments/1'
  }
];

// Initialize circuit breakers for each service
const circuitBreakers = new Map();
services.forEach(service => {
  circuitBreakers.set(service.name, new CircuitBreaker({
    failureThresholdPercentage: service.critical ? 50 : 70,
    minimumRequests: 3,
    recoveryTimeoutMs: service.critical ? 10000 : 20000,
    successThresholdPercentage: 60,
    halfOpenMaxRequests: 2
  }));
});

// Track consecutive failures per service
const failureTracker = new Map();

// Perform health check using stableRequest
async function checkServiceHealth(service: ServiceEndpoint): Promise {
  const startTime = Date.now();
  const circuitBreaker = circuitBreakers.get(service.name)!;
  
  console.log(`🔍 Checking ${service.name}...`);
  
  try {
    await stableRequest({
      reqData: {
        hostname: service.url.replace(/^https?:\/\//, ''),
        protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
        path: service.healthCheckPath,
        method: REQUEST_METHODS.GET,
        headers: {
          'User-Agent': 'HealthMonitor/1.0',
          'Accept': 'application/json'
        },
        timeout: service.slaThresholdMs * 2
      },
      
      // Retry configuration based on criticality
      attempts: service.critical ? 3 : 2,
      wait: 1000,
      retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
      maxAllowedWait: 5000,
      
      // Circuit breaker to prevent cascade failures
      circuitBreaker: circuitBreaker,
      
      // Cache successful health checks (5 second TTL)
      cache: { ttl: 5000, enabled: true },
      
      // Response validation
      resReq: true,
      responseAnalyzer: ({ data }) => {
        return data && typeof data === 'object' && Object.keys(data).length > 0;
      },
      
      // Performance tracking
      handleSuccessfulAttemptData: async ({ successfulAttemptData }) => {
        const responseTime = successfulAttemptData.executionTime || 0;
        const slaCompliant = responseTime <= service.slaThresholdMs;
        
        if (!slaCompliant) {
          console.log(`  ⚠️  SLA Warning: ${responseTime}ms (threshold: ${service.slaThresholdMs}ms)`);
        } else {
          console.log(`  ✅ Healthy - ${responseTime}ms`);
        }
      },
      
      // Error handling
      finalErrorAnalyzer: async ({ error }) => {
        console.log(`  ❌ Health check failed: ${error}`);
        
        const currentFailures = failureTracker.get(service.name) || 0;
        failureTracker.set(service.name, currentFailures + 1);
        
        if (service.critical && currentFailures >= 2) {
          console.log(`  🚨 ALERT: Critical service has ${currentFailures + 1} consecutive failures!`);
        }
        
        return false; // Throw error
      }
    });
    
    // Success - reset failure counter
    failureTracker.set(service.name, 0);
    
    const responseTime = Date.now() - startTime;
    const slaCompliant = responseTime <= service.slaThresholdMs;
    
    return {
      service: service.name,
      status: slaCompliant ? 'healthy' : 'degraded',
      responseTime,
      timestamp: new Date().toISOString(),
      consecutiveFailures: 0,
      circuitBreakerState: 'CLOSED',
      slaCompliant
    };
    
  } catch (error) {
    const consecutiveFailures = failureTracker.get(service.name) || 0;
    
    return {
      service: service.name,
      status: 'down',
      responseTime: Date.now() - startTime,
      timestamp: new Date().toISOString(),
      consecutiveFailures,
      circuitBreakerState: 'OPEN',
      slaCompliant: false
    };
  }
}

// Monitor all services
async function monitorServices() {
  console.log('🏥 Starting Health Monitoring\n');
  
  const results = await Promise.all(
    services.map(service => checkServiceHealth(service))
  );
  
  console.log('\n📊 Health Monitoring Summary:');
  results.forEach(health => {
    const icon = health.status === 'healthy' ? '✅' : 
                 health.status === 'degraded' ? '⚠️' : '❌';
    console.log(`${icon} ${health.service}: ${health.status} (${health.responseTime}ms)`);
  });
}

// Run health checks periodically
setInterval(() => monitorServices(), 10000);
monitorServices(); // Run immediately
04

Batch Image Processing Pipeline

Batch Processing Concurrency Control Rate Limiting Progress Tracking

Process large batches of images through an external API with concurrency control, rate limiting, and progress tracking.

Key Features:

  • ✅ Batch request processing with stableApiGateway
  • ✅ Concurrency limiting (max 10 simultaneous)
  • ✅ Rate limiting to respect API quotas
  • ✅ Progress tracking and reporting
  • ✅ Retry on transient failures
  • ✅ Error aggregation and reporting

Complete Working Example

import { 
  stableApiGateway, 
  RETRY_STRATEGIES,
  RateLimiter,
  ConcurrencyLimiter,
  REQUEST_METHODS,
  VALID_REQUEST_PROTOCOLS,
  type API_GATEWAY_REQUEST
} from '@emmvish/stable-request';

// Image processing job types
interface ImageJob {
  id: string;
  imageUrl: string;
  operations: string[];
  priority: 'high' | 'normal' | 'low';
  size: 'small' | 'medium' | 'large';
}

// Simulated image processing jobs
const imageBatch: ImageJob[] = [
  { id: 'img-001', imageUrl: '/photos/1', operations: ['resize', 'thumbnail'], priority: 'high', size: 'large' },
  { id: 'img-002', imageUrl: '/photos/2', operations: ['watermark'], priority: 'normal', size: 'medium' },
  { id: 'img-003', imageUrl: '/photos/3', operations: ['compress'], priority: 'low', size: 'small' },
  { id: 'img-004', imageUrl: '/photos/4', operations: ['resize', 'compress'], priority: 'high', size: 'large' },
  { id: 'img-005', imageUrl: '/photos/5', operations: ['thumbnail'], priority: 'normal', size: 'small' },
  { id: 'img-006', imageUrl: '/photos/6', operations: ['watermark', 'compress'], priority: 'low', size: 'medium' },
  { id: 'img-007', imageUrl: '/photos/7', operations: ['resize'], priority: 'high', size: 'large' },
  { id: 'img-008', imageUrl: '/photos/8', operations: ['thumbnail', 'watermark'], priority: 'normal', size: 'medium' },
  { id: 'img-009', imageUrl: '/photos/9', operations: ['compress'], priority: 'low', size: 'small' },
  { id: 'img-010', imageUrl: '/photos/10', operations: ['resize', 'watermark'], priority: 'high', size: 'large' }
];

// Initialize rate limiter (20 requests per second)
const rateLimiter = new RateLimiter(1000, 20);

// Initialize concurrency limiter (max 5 concurrent requests)
const concurrencyLimiter = new ConcurrencyLimiter(5);

// Track processing results
const processingResults = {
  successful: [] as string[],
  failed: [] as string[],
  processingTimes: [] as number[]
};

// Process batch of images
async function processBatchImages() {
  const startTime = Date.now();
  
  console.log('🖼️  Starting Batch Image Processing');
  console.log('═'.repeat(70));
  console.log(`Total Images: ${imageBatch.length}`);
  console.log(`Rate Limit: 20 req/sec | Concurrency Limit: 5 concurrent`);
  console.log('');
  
  // Convert jobs to API requests
  const requests: API_GATEWAY_REQUEST[] = imageBatch.map(job => ({
    id: job.id,
    groupId: job.priority,
    requestOptions: {
      reqData: {
        path: job.imageUrl,
        method: REQUEST_METHODS.GET
      },
      resReq: true,
      
      // Retry configuration based on priority
      attempts: job.priority === 'high' ? 5 : job.priority === 'normal' ? 3 : 2,
      wait: job.priority === 'high' ? 2000 : 1000,
      retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
      maxAllowedWait: 10000,
      
      // Apply rate and concurrency limiting
      rateLimiter: rateLimiter,
      concurrencyLimiter: concurrencyLimiter,
      
      // Success tracking
      handleSuccessfulAttemptData: async ({ successfulAttemptData }) => {
        processingResults.successful.push(job.id);
        processingResults.processingTimes.push(successfulAttemptData.executionTime || 0);
        
        const priorityIcon = job.priority === 'high' ? '🔴' : 
                           job.priority === 'normal' ? '🟡' : '🟢';
        console.log(`  ${priorityIcon} ${job.id}: Processed ${job.operations.join(', ')} (${successfulAttemptData.executionTime}ms)`);
      },
      
      // Error handling
      finalErrorAnalyzer: async ({ error }) => {
        processingResults.failed.push(job.id);
        console.log(`  ❌ ${job.id}: Processing failed - ${error}`);
        return true; // Suppress error to continue with other images
      }
    }
  }));
  
  console.log('🔄 Processing images...\n');
  
  // Execute batch processing with stableApiGateway
  const results = await stableApiGateway(requests, {
    concurrentExecution: true,
    
    // Common configuration
    commonRequestData: {
      hostname: 'jsonplaceholder.typicode.com',
      protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
      headers: {
        'Content-Type': 'application/json',
        'X-Batch-ID': `BATCH-${Date.now()}`,
        'User-Agent': 'ImageProcessor/2.0'
      },
      timeout: 10000
    },
    
    // Request groups with different retry strategies
    requestGroups: [
      {
        id: 'high',
        commonConfig: {
          commonAttempts: 5,
          commonWait: 2000,
          commonRetryStrategy: RETRY_STRATEGIES.EXPONENTIAL
        }
      },
      {
        id: 'normal',
        commonConfig: {
          commonAttempts: 3,
          commonWait: 1000,
          commonRetryStrategy: RETRY_STRATEGIES.LINEAR
        }
      },
      {
        id: 'low',
        commonConfig: {
          commonAttempts: 2,
          commonWait: 500,
          commonRetryStrategy: RETRY_STRATEGIES.FIXED
        }
      }
    ],
    
    stopOnFirstError: false,
    commonAttempts: 3,
    commonWait: 1000
  });
  
  const duration = Date.now() - startTime;
  
  // Generate processing report
  console.log('\n' + '═'.repeat(70));
  console.log('📊 BATCH PROCESSING SUMMARY');
  console.log('═'.repeat(70));
  
  const successfulResults = results.filter(r => r.success);
  const failedResults = results.filter(r => !r.success);
  
  console.log(`\nProcessing Results:`);
  console.log(`  ✅ Successful: ${successfulResults.length}/${imageBatch.length}`);
  console.log(`  ❌ Failed: ${failedResults.length}/${imageBatch.length}`);
  console.log(`  ⏱️  Total Duration: ${duration}ms`);
  
  if (processingResults.processingTimes.length > 0) {
    const avgTime = processingResults.processingTimes.reduce((a, b) => a + b, 0) / 
                    processingResults.processingTimes.length;
    console.log(`  📈 Average Time per Image: ${avgTime.toFixed(0)}ms`);
  }
  
  console.log('\n✨ Batch processing complete!');
  
  return {
    totalJobs: imageBatch.length,
    successful: successfulResults.length,
    failed: failedResults.length,
    duration,
    averageTime: processingResults.processingTimes.reduce((a, b) => a + b, 0) / 
                 processingResults.processingTimes.length
  };
}

// Run the batch processing
processBatchImages();
05

Feature Flag Testing with Trial Mode

A/B Testing Trial Mode Feature Flags Testing

Test feature flags and new implementations without making real API calls using trial mode with configurable success rates.

Key Features:

  • ✅ Trial mode for testing without real requests
  • ✅ Configurable success/failure probabilities
  • ✅ A/B testing simulation
  • ✅ Mock data injection
  • ✅ Performance testing under various conditions

Complete Working Example

import { 
  stableRequest, 
  RETRY_STRATEGIES,
  REQUEST_METHODS,
  VALID_REQUEST_PROTOCOLS
} from '@emmvish/stable-request';

// Test scenarios
interface ChaosScenario {
  name: string;
  description: string;
  failureProbability: number;
  retryFailureProbability?: number;
  expectedBehavior: string;
}

// Define chaos engineering scenarios
const chaosScenarios: ChaosScenario[] = [
  {
    name: 'Healthy System',
    description: 'No failures - baseline test',
    failureProbability: 0,
    expectedBehavior: 'All requests succeed immediately'
  },
  {
    name: 'Intermittent Failures',
    description: '30% initial failure rate with recovery',
    failureProbability: 0.3,
    retryFailureProbability: 0.1,
    expectedBehavior: 'Some requests fail initially but recover on retry'
  },
  {
    name: 'High Failure Rate',
    description: '70% failure rate with some recovery',
    failureProbability: 0.7,
    retryFailureProbability: 0.4,
    expectedBehavior: 'Most requests require multiple retries'
  },
  {
    name: 'Persistent Failures',
    description: '50% failure rate that persists',
    failureProbability: 0.5,
    retryFailureProbability: 0.9,
    expectedBehavior: 'Significant failures even after retries'
  },
  {
    name: 'Complete Outage',
    description: '100% failure rate - total unavailability',
    failureProbability: 1.0,
    retryFailureProbability: 1.0,
    expectedBehavior: 'All requests fail after exhausting retries'
  }
];

// Track test results
interface TestResult {
  scenario: string;
  success: boolean;
  attempts: number;
  duration: number;
  error?: string;
}

const testResults: TestResult[] = [];

// Run chaos test for a single scenario
async function runChaosTest(scenario: ChaosScenario): Promise {
  console.log(`\n🧪 Testing: ${scenario.name}`);
  console.log(`   ${scenario.description}`);
  console.log(`   Failure Rate: ${(scenario.failureProbability * 100).toFixed(0)}% initial`);
  
  const startTime = Date.now();
  let attemptCount = 0;
  
  try {
    await stableRequest({
      reqData: {
        hostname: 'api.example.com',
        protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
        path: '/chaos-test',
        method: REQUEST_METHODS.GET,
        headers: {
          'X-Test-Scenario': scenario.name
        },
        timeout: 5000
      },
      
      // Configure retry behavior
      attempts: 5,
      wait: 500,
      retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
      maxAllowedWait: 3000,
      
      // Enable trial mode for failure simulation
      trialMode: {
        enabled: true,
        reqFailureProbability: scenario.failureProbability,
        retryFailureProbability: scenario.retryFailureProbability
      },
      
      // Track all attempts
      handleSuccessfulAttemptData: async ({ successfulAttemptData }) => {
        attemptCount = parseInt(successfulAttemptData.attempt?.split('/')[0] || '1');
        if (attemptCount > 1) {
          console.log(`   ✅ Recovered on attempt ${attemptCount} (${successfulAttemptData.executionTime}ms)`);
        } else {
          console.log(`   ✅ Succeeded immediately (${successfulAttemptData.executionTime}ms)`);
        }
      },
      
      // Error analysis
      finalErrorAnalyzer: async ({ error }) => {
        console.log(`   ❌ Failed: ${error}`);
        return false; // Re-throw
      },
      
      resReq: true
    });
    
    const duration = Date.now() - startTime;
    
    return {
      scenario: scenario.name,
      success: true,
      attempts: attemptCount,
      duration
    };
    
  } catch (error) {
    const duration = Date.now() - startTime;
    
    return {
      scenario: scenario.name,
      success: false,
      attempts: attemptCount,
      duration,
      error: error instanceof Error ? error.message : String(error)
    };
  }
}

// Run all chaos engineering tests
async function runChaosEngineeringTests() {
  console.log('🌪️  Starting Chaos Engineering Tests');
  console.log('═'.repeat(70));
  console.log('Testing system resilience under various failure scenarios');
  console.log(`Total Scenarios: ${chaosScenarios.length}`);
  console.log('═'.repeat(70));
  
  // Run each scenario
  for (const scenario of chaosScenarios) {
    const result = await runChaosTest(scenario);
    testResults.push(result);
    await new Promise(resolve => setTimeout(resolve, 200));
  }
  
  // Generate comprehensive resilience report
  console.log('\n' + '═'.repeat(70));
  console.log('📊 CHAOS ENGINEERING RESULTS');
  console.log('═'.repeat(70));
  
  const successfulTests = testResults.filter(r => r.success).length;
  console.log(`\nOverall Statistics:`);
  console.log(`  Tests Passed: ${successfulTests}/${testResults.length}`);
  console.log(`  Tests Failed: ${testResults.length - successfulTests}/${testResults.length}`);
  
  console.log(`\nResilience Analysis:`);
  testResults.forEach((result, index) => {
    const scenario = chaosScenarios[index];
    const icon = result.success ? '✅' : '❌';
    
    console.log(`\n${icon} ${result.scenario}:`);
    console.log(`   Expected: ${scenario.expectedBehavior}`);
    console.log(`   Result: ${result.success ? 'SUCCESS' : 'FAILED'}`);
    console.log(`   Attempts Used: ${result.attempts}`);
    console.log(`   Duration: ${result.duration}ms`);
    
    if (result.error) {
      console.log(`   Error: ${result.error}`);
    }
  });
  
  console.log('\n' + '═'.repeat(70));
  console.log('✅ Chaos engineering tests complete!');
  
  return {
    totalTests: testResults.length,
    passed: successfulTests,
    failed: testResults.length - successfulTests,
    results: testResults
  };
}

// Run chaos engineering tests
runChaosEngineeringTests();
06

Distributed Workflow State Persistence

State Persistence Workflow Recovery Distributed Systems Redis Checkpointing

A production-grade distributed data processing workflow with Redis-based state persistence, enabling workflow recovery, resumption, and distributed execution across multiple instances.

Key Features:

  • ✅ State persistence to Redis with TTL
  • ✅ Workflow recovery and resumption after failures
  • ✅ Multi-stage data pipeline with checkpoints
  • ✅ Distributed lock mechanisms for safety
  • ✅ State versioning and audit trails
  • ✅ Real-time progress tracking across instances
  • ✅ Automatic cleanup of completed workflows
  • ✅ Phase completion tracking and skip logic
  • ✅ Hierarchical state keys for organization
  • ✅ Batch processing with concurrent migrations

Use Case:

Large-scale data migration pipeline that can be resumed from any checkpoint, run across multiple server instances, and provide real-time progress visibility. Perfect for long-running workflows that need resilience against failures.

Complete Working Example - State Persistence

import { 
  stableWorkflow, 
  PHASE_DECISION_ACTIONS,
  REQUEST_METHODS,
  VALID_REQUEST_PROTOCOLS,
  type STABLE_WORKFLOW_PHASE,
  type StatePersistenceOptions
} from '@emmvish/stable-request';

// ============================================================================
// STATE PERSISTENCE LAYER
// ============================================================================

/**
 * Simulated Redis-like storage
 * In production, use actual Redis client (ioredis or node-redis)
 */
class StateStorage {
  private storage: Map = new Map();

  async setex(key: string, ttl: number, value: string): Promise {
    const expiresAt = Date.now() + (ttl * 1000);
    this.storage.set(key, { value, expiresAt });
    console.log(`    💾 Stored state: ${key}`);
  }

  async get(key: string): Promise {
    const entry = this.storage.get(key);
    if (!entry || Date.now() > entry.expiresAt) return null;
    console.log(`    📥 Loaded state: ${key}`);
    return entry.value;
  }

  async del(key: string): Promise {
    this.storage.delete(key);
  }
}

const stateStore = new StateStorage();

/**
 * Redis persistence function with distributed locking
 */
async function persistToRedis({ executionContext, params, buffer }: StatePersistenceOptions): Promise> {
  const { workflowId, phaseId, branchId } = executionContext;
  const { ttl = 86400, enableLocking = false, namespace = 'workflow' } = params || {};
  
  // Generate hierarchical state key
  const stateKey = `${namespace}:${workflowId}:${branchId || 'main'}:${phaseId || 'global'}`;
  const lockKey = `lock:${stateKey}`;
  
  const isStoring = buffer && Object.keys(buffer).length > 0;
  
  if (enableLocking) {
    await stateStore.setex(lockKey, 5, `${Date.now()}-${Math.random()}`);
  }
  
  try {
    if (isStoring) {
      // STORE MODE
      const stateWithMeta = {
        ...buffer,
        _meta: {
          workflowId,
          phaseId,
          timestamp: new Date().toISOString(),
          version: (buffer._meta?.version || 0) + 1
        }
      };
      
      await stateStore.setex(stateKey, ttl, JSON.stringify(stateWithMeta));
      
      // Audit log
      const auditKey = `${namespace}:audit:${workflowId}:${Date.now()}`;
      await stateStore.setex(auditKey, ttl * 2, JSON.stringify({
        action: 'state_saved',
        phaseId,
        timestamp: new Date().toISOString()
      }));
    } else {
      // LOAD MODE
      const data = await stateStore.get(stateKey);
      return data ? JSON.parse(data) : {};
    }
  } finally {
    if (enableLocking) {
      await stateStore.del(lockKey);
    }
  }
  
  return {};
}

/**
 * Checkpoint persistence for phase completion tracking
 */
async function createCheckpoint({ executionContext, params, buffer }: StatePersistenceOptions): Promise> {
  const { workflowId, phaseId } = executionContext;
  const { ttl = 86400 } = params || {};
  
  const checkpointKey = `checkpoint:${workflowId || 'default'}`;
  
  if (buffer && Object.keys(buffer).length > 0) {
    const existingData = await stateStore.get(checkpointKey);
    const existing = existingData ? JSON.parse(existingData) : {};
    
    const checkpointData = {
      ...existing,
      completedPhases: [...new Set([...(existing.completedPhases || []), ...(buffer.completedPhases || [])])],
      lastPhase: phaseId || existing.lastPhase,
      lastUpdated: new Date().toISOString(),
      progress: buffer.progress || existing.progress || 0,
      processedRecords: buffer.recordsProcessed || existing.processedRecords || 0
    };
    
    await stateStore.setex(checkpointKey, ttl, JSON.stringify(checkpointData));
    console.log(`    ✅ Checkpoint saved (Progress: ${checkpointData.progress}%)`);
  } else {
    const data = await stateStore.get(checkpointKey);
    return data ? JSON.parse(data) : { completedPhases: [], processedRecords: 0 };
  }
  
  return {};
}

// ============================================================================
// WORKFLOW STATE
// ============================================================================

interface WorkflowState {
  sourceRecords: any[];
  transformedRecords: any[];
  validatedRecords: any[];
  migratedRecords: any[];
  failedRecords: any[];
  completedPhases: string[];
  currentPhase: string;
  progress: number;
  recordsProcessed: number;
  totalRecords: number;
  startTime: number;
  lastUpdateTime: number;
  attemptCount: number;
  errors: string[];
  canResume: boolean;
  resumeFromPhase: string | null;
}

const workflowState: WorkflowState = {
  sourceRecords: [],
  transformedRecords: [],
  validatedRecords: [],
  migratedRecords: [],
  failedRecords: [],
  completedPhases: [],
  currentPhase: '',
  progress: 0,
  recordsProcessed: 0,
  totalRecords: 0,
  startTime: Date.now(),
  lastUpdateTime: Date.now(),
  attemptCount: 0,
  errors: [],
  canResume: true,
  resumeFromPhase: null
};

// ============================================================================
// WORKFLOW DEFINITION
// ============================================================================

const WORKFLOW_ID = `migration-${Date.now()}`;
const SOURCE_API = 'jsonplaceholder.typicode.com';

const migrationPhases: STABLE_WORKFLOW_PHASE[] = [
  // PHASE 1: Data Extraction
  {
    id: 'extract-source-data',
    requests: [
      {
        id: 'extract-users',
        requestOptions: {
          reqData: { 
            hostname: SOURCE_API,
            path: '/posts/1',
            method: REQUEST_METHODS.GET
          },
          resReq: false,
          preExecution: {
            preExecutionHook: ({ commonBuffer }) => {
              console.log('\n📥 Phase 1: Extracting source data...');
              const buffer = commonBuffer as WorkflowState;
              
              // Generate mock data
              buffer.sourceRecords = Array.from({ length: 100 }, (_, i) => ({
                id: i + 1,
                name: `User ${i + 1}`,
                email: `user${i + 1}@example.com`,
                username: `user${i + 1}`,
                company: { name: `Company ${i % 10}` },
                address: { geo: { lat: '0.0', lng: '0.0' } }
              }));
              
              buffer.totalRecords = buffer.sourceRecords.length;
              buffer.progress = 20;
              buffer.recordsProcessed = buffer.sourceRecords.length;
              console.log(`  ✅ Extracted ${buffer.sourceRecords.length} records`);
              return {};
            },
            applyPreExecutionConfigOverride: false
          }
        }
      }
    ],
    phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
      const buffer = sharedBuffer as WorkflowState;
      
      if (buffer.completedPhases.includes('extract-source-data')) {
        console.log(`  ⏩ Phase already completed, skipping...`);
        return { action: PHASE_DECISION_ACTIONS.SKIP, skipToPhaseId: 'transform-data' };
      }
      
      if (phaseResult.success && buffer.sourceRecords.length > 0) {
        buffer.completedPhases.push('extract-source-data');
        return { action: PHASE_DECISION_ACTIONS.CONTINUE };
      }
      
      return { action: PHASE_DECISION_ACTIONS.TERMINATE };
    },
    statePersistence: {
      persistenceFunction: persistToRedis,
      persistenceParams: { ttl: 3600, enableLocking: true, namespace: 'migration' },
      loadBeforeHooks: true,
      storeAfterHooks: true
    }
  },

  // PHASE 2: Data Transformation
  {
    id: 'transform-data',
    requests: [
      {
        id: 'transform-records',
        requestOptions: {
          reqData: { 
            hostname: SOURCE_API,
            path: '/posts/1',
            method: REQUEST_METHODS.GET
          },
          resReq: false,
          preExecution: {
            preExecutionHook: ({ commonBuffer }) => {
              console.log('\n🔄 Phase 2: Transforming data...');
              const buffer = commonBuffer as WorkflowState;
              
              buffer.transformedRecords = buffer.sourceRecords.map(record => ({
                id: record.id,
                externalId: record.id,
                name: record.name,
                email: record.email?.toLowerCase(),
                username: record.username,
                company: record.company?.name,
                metadata: {
                  importedAt: new Date().toISOString(),
                  source: 'jsonplaceholder',
                  workflowId: WORKFLOW_ID
                }
              }));
              
              buffer.progress = 40;
              console.log(`  ✅ Transformed ${buffer.transformedRecords.length} records`);
              return {};
            },
            applyPreExecutionConfigOverride: false
          }
        }
      }
    ],
    phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
      const buffer = sharedBuffer as WorkflowState;
      
      if (buffer.completedPhases.includes('transform-data')) {
        console.log(`  ⏩ Phase already completed, skipping...`);
        return { action: PHASE_DECISION_ACTIONS.SKIP, skipToPhaseId: 'validate-data' };
      }
      
      if (phaseResult.success && buffer.transformedRecords.length > 0) {
        buffer.completedPhases.push('transform-data');
        return { action: PHASE_DECISION_ACTIONS.CONTINUE };
      }
      
      if (buffer.attemptCount < 2) {
        buffer.attemptCount++;
        return { action: PHASE_DECISION_ACTIONS.REPLAY };
      }
      
      return { action: PHASE_DECISION_ACTIONS.TERMINATE };
    },
    allowReplay: true,
    maxReplayCount: 2,
    statePersistence: {
      persistenceFunction: persistToRedis,
      persistenceParams: { ttl: 3600, enableLocking: true, namespace: 'migration' },
      loadBeforeHooks: true,
      storeAfterHooks: true
    }
  },

  // PHASE 3: Data Validation
  {
    id: 'validate-data',
    requests: [
      {
        id: 'validate-records',
        requestOptions: {
          reqData: { 
            hostname: SOURCE_API,
            path: '/posts/1',
            method: REQUEST_METHODS.GET
          },
          resReq: false,
          preExecution: {
            preExecutionHook: ({ commonBuffer }) => {
              console.log('\n🔍 Phase 3: Validating data...');
              const buffer = commonBuffer as WorkflowState;
              
              buffer.validatedRecords = buffer.transformedRecords.filter(record => {
                return record.email && record.email.includes('@') && 
                       record.name && record.name.length >= 3;
              });
              
              buffer.failedRecords = buffer.transformedRecords.filter(record => {
                return !record.email || !record.email.includes('@') || 
                       !record.name || record.name.length < 3;
              });
              
              buffer.progress = 70;
              console.log(`  ✅ Validated: ${buffer.validatedRecords.length} passed, ${buffer.failedRecords.length} failed`);
              return {};
            },
            applyPreExecutionConfigOverride: false
          }
        }
      }
    ],
    phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
      const buffer = sharedBuffer as WorkflowState;
      
      if (buffer.completedPhases.includes('validate-data')) {
        console.log(`  ⏩ Phase already completed, skipping...`);
        return { action: PHASE_DECISION_ACTIONS.SKIP, skipToPhaseId: 'migrate-data' };
      }
      
      if (phaseResult.success && buffer.validatedRecords.length > 0) {
        buffer.completedPhases.push('validate-data');
        return { action: PHASE_DECISION_ACTIONS.CONTINUE };
      }
      
      return { action: PHASE_DECISION_ACTIONS.TERMINATE };
    },
    statePersistence: {
      persistenceFunction: persistToRedis,
      persistenceParams: { ttl: 3600, enableLocking: true, namespace: 'migration' },
      loadBeforeHooks: true,
      storeAfterHooks: true
    }
  }
];

// ============================================================================
// WORKFLOW EXECUTION
// ============================================================================

console.log('🚀 Starting Distributed Workflow with State Persistence');
console.log(`Workflow ID: ${WORKFLOW_ID}\n`);

const result = await stableWorkflow(migrationPhases, {
  workflowId: WORKFLOW_ID,
  commonRequestData: { 
    hostname: SOURCE_API,
    protocol: VALID_REQUEST_PROTOCOLS.HTTPS
  },
  enableNonLinearExecution: true,
  stopOnFirstPhaseError: false,
  maxWorkflowIterations: 100,
  sharedBuffer: workflowState,
  commonStatePersistence: {
    persistenceFunction: createCheckpoint,
    persistenceParams: { ttl: 7200 },
    loadBeforeHooks: true,
    storeAfterHooks: true
  },
  handlePhaseCompletion: async ({ phaseResult, sharedBuffer }) => {
    const buffer = sharedBuffer as WorkflowState;
    console.log(`\n✅ Phase "${phaseResult.phaseId}" completed`);
    console.log(`   Duration: ${phaseResult.executionTime}ms`);
    console.log(`   Progress: ${buffer.progress}%`);
  }
});

console.log('\n📊 Workflow Results:');
console.log(`Success: ${result.success}`);
console.log(`Total Phases: ${result.totalPhases}`);
console.log(`Duration: ${result.executionTime}ms`);
console.log(`Records Processed: ${workflowState.recordsProcessed}`);

Workflow Recovery Pattern

/**
 * Resume workflow from last checkpoint
 * This function demonstrates how to recover and resume a failed workflow
 */
async function resumeWorkflow(workflowId: string) {
  // Load checkpoint from storage
  const checkpointData = await stateStore.get(`checkpoint:${workflowId}`);
  const checkpoint = checkpointData ? JSON.parse(checkpointData) : {
    completedPhases: [],
    processedRecords: 0
  };
  
  console.log(`📂 Resuming workflow: ${workflowId}`);
  console.log(`   Last phase: ${checkpoint.lastPhase || 'start'}`);
  console.log(`   Completed: ${checkpoint.completedPhases.join(', ') || 'none'}`);
  
  // Restore state from checkpoint
  const restoredState: WorkflowState = {
    ...workflowState,
    completedPhases: checkpoint.completedPhases,
    processedRecords: checkpoint.processedRecords,
    progress: checkpoint.progress || 0
  };
  
  // Execute workflow - completed phases will be skipped automatically
  const result = await stableWorkflow(migrationPhases, {
    workflowId,
    commonRequestData: { 
      hostname: SOURCE_API,
      protocol: VALID_REQUEST_PROTOCOLS.HTTPS
    },
    enableNonLinearExecution: true,
    sharedBuffer: restoredState,
    commonStatePersistence: {
      persistenceFunction: createCheckpoint,
      persistenceParams: { ttl: 7200 },
      loadBeforeHooks: true,
      storeAfterHooks: true
    }
  });
  
  console.log(`\n✅ Workflow resumed and ${result.success ? 'completed' : 'failed'}`);
  return result;
}

// Example usage: Can be called multiple times to resume from last checkpoint
// await resumeWorkflow('migration-12345');

Ready to Get Started?

Explore the documentation to learn how to implement these patterns in your own applications.