Skip to content

@connectum/events

Universal event adapter layer for Connectum. Provides proto-first pub/sub with pluggable broker adapters, typed event handlers mirroring ConnectRPC's router pattern, and a composable middleware pipeline with built-in retry and dead letter queue (DLQ) support.

Layer: 1 (Events)

Related Guides

Full API Reference

Complete TypeScript API documentation: API Reference (coming soon)

Installation

bash
pnpm add @connectum/events

Peer dependency: @connectum/core

You also need at least one adapter package for production use:

bash
# Choose one (or more) broker adapters:
pnpm add @connectum/events-nats    # NATS JetStream
pnpm add @connectum/events-kafka   # Kafka / Redpanda
pnpm add @connectum/events-redis   # Redis Streams / Valkey
pnpm add @connectum/events-amqp    # AMQP / RabbitMQ

The built-in MemoryAdapter is included in @connectum/events for testing.

Quick Start

typescript
import { createServer } from '@connectum/core';
import { createEventBus, MemoryAdapter } from '@connectum/events';
import type { EventRoute } from '@connectum/events';
import { UserEventHandlers, UserCreatedSchema } from '#gen/user/v1/user_pb.js';

// 1. Define event handlers (mirrors ConnectRPC router pattern)
const userEvents: EventRoute = (events) => {
  events.service(UserEventHandlers, {
    onUserCreated: async (msg, ctx) => {
      console.log(`User created: ${msg.id}, ${msg.email}`);
      await ctx.ack();
    },
  });
};

// 2. Create an EventBus
const eventBus = createEventBus({
  adapter: MemoryAdapter(),
  routes: [userEvents],
  group: 'my-service',
  middleware: {
    retry: { maxRetries: 3, backoff: 'exponential' },
    dlq: { topic: 'my-service.dlq' },
  },
});

// 3. Integrate with Connectum server
const server = createServer({
  services: [routes],
  eventBus,
  shutdown: { autoShutdown: true },
});

await server.start();

// 4. Publish typed events
await eventBus.publish(UserCreatedSchema, {
  id: '123',
  email: '[email protected]',
  name: 'Alice',
});

Core Concepts

EventBus

The central component managing adapter lifecycle, event routes, middleware pipeline, and publishing. Created via createEventBus(), it implements EventBusLike for integration with createServer().

EventAdapter

A minimal interface for message brokers. Each adapter (NATS, Kafka, Redis, AMQP, Memory) implements connect(context?), disconnect(), publish(), and subscribe(). The optional AdapterContext parameter on connect() carries service-level information (like serviceName) derived from registered proto service descriptors, enabling adapters to identify themselves to brokers automatically. Broker-specific configuration is passed to the adapter constructor, not to the interface methods.

EventRouter

Mirrors ConnectRPC's ConnectRouter pattern for event handlers. Register typed handlers per proto service:

typescript
const myEvents: EventRoute = (events) => {
  events.service(OrderEventHandlers, {
    onOrderCreated: async (msg, ctx) => { /* ... */ },
    onOrderCancelled: async (msg, ctx) => { /* ... */ },
  });
};

EventContext

Per-event context passed to handlers alongside the deserialized protobuf message. Provides explicit ack() / nack() control, event metadata, and an abort signal for graceful shutdown.

API Reference

createEventBus(options)

Factory function that creates an EventBus instance.

typescript
function createEventBus(options: EventBusOptions): EventBus & EventBusLike;

EventBusOptions

OptionTypeDefaultDescription
adapterEventAdapterrequiredAdapter instance (e.g., NatsAdapter, KafkaAdapter, MemoryAdapter)
routesEventRoute[][]Event routes to register
groupstringundefinedConsumer group name for load-balanced consumption
signalAbortSignalundefinedAbort signal for graceful shutdown
handlerTimeoutnumberundefinedTimeout in ms for event handler execution
drainTimeoutnumber30000Max ms to wait for in-flight handlers during stop()
middlewareMiddlewareConfigundefinedMiddleware configuration (retry, DLQ, custom)

EventBus

MethodDescription
start()Connect adapter, set up subscriptions
stop()Drain subscriptions, disconnect adapter
publish(schema, data, options?)Publish a typed protobuf event

PublishOptions

OptionTypeDefaultDescription
topicstringschema.typeNameOverride topic name
keystringundefinedPartition/routing key for ordered delivery
syncbooleanfalseWait for broker confirmation
groupstringundefinedNamed group tag for workflow grouping
metadataRecord<string, string>undefinedAdditional metadata / headers

EventContext

Property/MethodTypeDescription
signalAbortSignalAborted when server is shutting down
eventIdstringUnique event identifier
eventTypestringEvent type / topic name
publishedAtDateWhen the event was published
attemptnumberDelivery attempt number (1-based)
metadataReadonlyMap<string, string>Event metadata (headers)
ack()Promise<void>Acknowledge successful processing
nack(requeue?)Promise<void>Negative acknowledge -- request redelivery or send to DLQ

MiddlewareConfig

OptionTypeDescription
retryRetryOptionsRetry middleware configuration
dlqDlqOptionsDead letter queue configuration
customEventMiddleware[]Custom user middleware (executed outermost)

RetryOptions

OptionTypeDefaultDescription
maxRetriesnumber3Maximum retry attempts
backoff"exponential" | "linear" | "fixed""exponential"Backoff strategy
initialDelaynumber1000Initial delay in ms
maxDelaynumber30000Maximum delay in ms
multipliernumber2Multiplier for exponential backoff
retryableErrors(error: unknown) => booleanundefinedFilter: only retry matching errors

DlqOptions

OptionTypeDefaultDescription
topicstringrequiredDLQ topic name
errorSerializer(error: unknown) => Record<string, unknown>undefinedCustom error serializer for DLQ metadata

Middleware

The middleware pipeline uses an onion model (outer to inner):

Custom → DLQ → Retry → Handler
  • Custom middleware runs outermost, wrapping everything
  • DLQ catches errors after all retries are exhausted and publishes to a dead letter topic
  • Retry catches handler errors and retries with configurable backoff

See Middleware Guide for detailed configuration and custom middleware examples.

Configuration

Integration with createServer

Pass the eventBus to createServer() for automatic lifecycle management:

typescript
const server = createServer({
  services: [routes],
  eventBus,
  shutdown: { autoShutdown: true },
});

The server calls eventBus.start() on startup and eventBus.stop() on shutdown.

Consumer Groups

Set group to enable load-balanced consumption across multiple service instances:

typescript
const eventBus = createEventBus({
  adapter: KafkaAdapter({ brokers: ['localhost:9092'] }),
  routes: [orderEvents],
  group: 'order-service', // All instances share this group
});

Exports Summary

ExportDescription
createEventBusEventBus factory function
deriveServiceNameDerives a service identifier from proto service type names (format: {packages}@{hostname})
createEventContextEventContext factory (advanced)
EventRouterImplEventRouter implementation class
MemoryAdapterIn-memory adapter for testing
retryMiddlewareRetry middleware factory
dlqMiddlewareDLQ middleware factory
composeMiddlewareMiddleware composition utility
resolveTopicNameTopic resolution from proto method descriptors
matchPatternWildcard pattern matching for topics
NonRetryableErrorError class that skips retry middleware
RetryableErrorError class that forces retry regardless of predicate

Type Exports

TypeDescription
EventBusEventBus interface
EventBusOptionsEventBus configuration
AdapterContextContext passed to adapters on connect (contains serviceName)
EventAdapterAdapter interface
EventRouterRouter interface
EventRouteRoute function type
EventContextPer-event context
EventMiddlewareMiddleware function type
PublishOptionsPublish options
RetryOptionsRetry middleware options
DlqOptionsDLQ middleware options
MiddlewareConfigBuilt-in middleware configuration
RawEventRaw event data from adapter
ServiceEventHandlersTyped handler map for a service
TypedEventHandlerTyped handler function

Learn More