Skip to content

Middleware

The EventBus middleware pipeline wraps event handlers in a composable onion model. Built-in middleware provides retry with configurable backoff and dead letter queue (DLQ) routing. You can add custom middleware for logging, metrics, validation, or any cross-cutting concern.

Pipeline Order

Middleware executes from outermost to innermost:

Custom[0] → Custom[1] → ... → DLQ → Retry → Handler

Each middleware receives three arguments:

typescript
type EventMiddleware = (
  event: RawEvent,
  ctx: EventContext,
  next: EventMiddlewareNext,
) => Promise<void>;

Call next() to pass control to the inner middleware. Errors thrown from next() propagate outward through the chain.

Retry Middleware

Retries failed event handlers with configurable backoff strategy. If all attempts are exhausted, the error is re-thrown for the next middleware in the chain (typically DLQ).

Configuration

typescript
const eventBus = createEventBus({
  adapter,
  routes: [myEvents],
  middleware: {
    retry: {
      maxRetries: 3,
      backoff: 'exponential',
      initialDelay: 1000,
      maxDelay: 30_000,
      multiplier: 2,
    },
  },
});

RetryOptions

OptionTypeDefaultDescription
maxRetriesnumber3Maximum retry attempts before giving up
backoff"exponential" | "linear" | "fixed""exponential"Backoff strategy between retries
initialDelaynumber1000Initial delay in milliseconds
maxDelaynumber30000Maximum delay cap in milliseconds
multipliernumber2Multiplier for exponential backoff
retryableErrors(error: unknown) => booleanundefinedFilter function: only retry if it returns true

Backoff Strategies

StrategyDelay FormulaExample (initialDelay=1000, multiplier=2)
exponentialinitialDelay * multiplier^(attempt-1)1000ms, 2000ms, 4000ms, 8000ms
linearinitialDelay * attempt1000ms, 2000ms, 3000ms, 4000ms
fixedinitialDelay1000ms, 1000ms, 1000ms, 1000ms

All strategies are capped at maxDelay.

Selective Retry

Use retryableErrors to only retry specific error types:

typescript
middleware: {
  retry: {
    maxRetries: 3,
    backoff: 'exponential',
    retryableErrors: (error) => {
      // Only retry transient errors
      if (error instanceof Error) {
        return error.message.includes('ECONNREFUSED')
            || error.message.includes('timeout');
      }
      return false;
    },
  },
}

Non-retryable errors are thrown immediately without delay.

Typed Error Classes

Instead of (or in addition to) the retryableErrors predicate, you can use typed error classes for declarative retry control:

typescript
import { NonRetryableError, RetryableError } from '@connectum/events';

// Handler that uses typed errors
const orderEvents: EventRoute = (events) => {
  events.service(OrderEventHandlers, {
    onOrderCreated: async (msg, ctx) => {
      // Validation errors -- never retry
      if (!msg.orderId) {
        throw new NonRetryableError('Missing orderId');
      }

      try {
        await db.insertOrder(msg);
      } catch (err) {
        // Transient DB errors -- always retry
        if (isConnectionError(err)) {
          throw new RetryableError('DB connection lost', { cause: err });
        }
        throw err; // Other errors use predicate or default behavior
      }
    },
  });
};

Priority order (first match wins):

PriorityCheckBehavior
1NonRetryableErrorSkip retry, throw immediately
2RetryableErrorForce retry (ignores retryableErrors predicate)
3retryableErrors predicateRetry if predicate returns true
4DefaultRetry all errors

Both classes use Symbol.for() branding, so they work across module boundaries and realms.

DLQ Middleware

When a handler fails after all retries are exhausted, the DLQ middleware publishes the failed event to a dedicated dead letter topic and acknowledges the original event. This prevents poison messages from blocking the queue.

Configuration

typescript
const eventBus = createEventBus({
  adapter,
  routes: [myEvents],
  middleware: {
    retry: { maxRetries: 3, backoff: 'exponential' },
    dlq: { topic: 'my-service.dlq' },
  },
});

DlqOptions

OptionTypeDefaultDescription
topicstringrequiredTopic name for dead letter events
errorSerializer(error: unknown) => Record<string, unknown>undefinedCustom error serializer for DLQ metadata

DLQ Metadata

When an event is routed to the DLQ, the middleware attaches diagnostic metadata:

KeyDescription
dlq.original-topicTopic the event was originally published to
dlq.original-idUnique event ID from the original message
dlq.errorError message from the last failed handler invocation
dlq.attemptNumber of delivery attempts before DLQ routing

Monitoring the DLQ

Subscribe directly to the DLQ topic using the adapter to monitor and process failed events:

typescript
adapter.subscribe(
  ['my-service.dlq'],
  async (rawEvent) => {
    const originalTopic = rawEvent.metadata.get('dlq.original-topic') ?? 'unknown';
    const error = rawEvent.metadata.get('dlq.error') ?? 'unknown';
    console.error(`DLQ event from ${originalTopic}: ${error}`);
    // Log, alert, or attempt recovery
  },
  { group: 'dlq-monitor' },
);

Retry + DLQ Together

The most common pattern combines retry and DLQ: retry transient errors, route persistent failures to DLQ.

typescript
const eventBus = createEventBus({
  adapter: NatsAdapter({ servers: 'nats://localhost:4222', stream: 'orders' }),
  routes: [orderEvents],
  group: 'order-service',
  middleware: {
    retry: { maxRetries: 2, backoff: 'fixed', initialDelay: 200 },
    dlq: { topic: 'dead-letter-queue' },
  },
});

Execution flow on handler error:

Custom Middleware

Writing Custom Middleware

A middleware is any function matching the EventMiddleware signature:

typescript
import type { EventMiddleware } from '@connectum/events';

const loggingMiddleware: EventMiddleware = async (event, ctx, next) => {
  const start = performance.now();
  console.log(`[${ctx.eventType}] Processing event ${ctx.eventId}`);

  try {
    await next();
    const duration = performance.now() - start;
    console.log(`[${ctx.eventType}] Completed in ${duration.toFixed(1)}ms`);
  } catch (error) {
    const duration = performance.now() - start;
    console.error(`[${ctx.eventType}] Failed after ${duration.toFixed(1)}ms:`, error);
    throw error; // Re-throw to let retry/DLQ handle it
  }
};

Registering Custom Middleware

Pass custom middleware in the middleware.custom array. They execute outermost, wrapping retry and DLQ:

typescript
const eventBus = createEventBus({
  adapter,
  routes: [myEvents],
  middleware: {
    custom: [loggingMiddleware, metricsMiddleware],
    retry: { maxRetries: 3 },
    dlq: { topic: 'my-service.dlq' },
  },
});

Execution order: loggingMiddleware → metricsMiddleware → DLQ → retry → handler

Metrics Middleware Example

typescript
import type { EventMiddleware } from '@connectum/events';

const metricsMiddleware: EventMiddleware = async (event, ctx, next) => {
  const labels = { event_type: ctx.eventType, attempt: String(ctx.attempt) };

  try {
    await next();
    eventCounter.inc({ ...labels, status: 'success' });
  } catch (error) {
    eventCounter.inc({ ...labels, status: 'error' });
    throw error;
  }
};

Validation Middleware Example

typescript
import type { EventMiddleware } from '@connectum/events';

const payloadSizeMiddleware: EventMiddleware = async (event, ctx, next) => {
  const MAX_PAYLOAD_SIZE = 1024 * 1024; // 1 MB

  if (event.payload.length > MAX_PAYLOAD_SIZE) {
    console.warn(`Event ${ctx.eventId} payload exceeds 1MB, skipping`);
    await ctx.nack(false); // Don't requeue
    return;
  }

  await next();
};

Advanced: Manual Composition

For advanced use cases, you can compose middleware manually using composeMiddleware():

typescript
import { composeMiddleware, retryMiddleware, dlqMiddleware } from '@connectum/events';
import type { EventMiddleware } from '@connectum/events';

const middlewares: EventMiddleware[] = [
  loggingMiddleware,
  retryMiddleware({ maxRetries: 3 }),
  dlqMiddleware({ topic: 'my.dlq' }, adapter),
];

const composed = composeMiddleware(middlewares, async (rawEvent, ctx) => {
  // Final handler
  console.log(`Processing ${ctx.eventId}`);
  await ctx.ack();
});