@connectum/events
Universal event adapter layer for Connectum. Provides proto-first pub/sub with pluggable broker adapters, typed event handlers mirroring ConnectRPC's router pattern, and a composable middleware pipeline with built-in retry and dead letter queue (DLQ) support.
Layer: 1 (Events)
Related Guides
- Events Overview -- architecture and core concepts
- Getting Started -- step-by-step setup
- Middleware -- retry, DLQ, custom middleware
- Custom Topics -- proto options for topic naming
- Adapters -- Memory, NATS, Kafka, Redis, AMQP comparison
Full API Reference
Complete TypeScript API documentation: API Reference (coming soon)
Installation
pnpm add @connectum/eventsPeer dependency: @connectum/core
You also need at least one adapter package for production use:
# Choose one (or more) broker adapters:
pnpm add @connectum/events-nats # NATS JetStream
pnpm add @connectum/events-kafka # Kafka / Redpanda
pnpm add @connectum/events-redis # Redis Streams / Valkey
pnpm add @connectum/events-amqp # AMQP / RabbitMQThe built-in MemoryAdapter is included in @connectum/events for testing.
Quick Start
import { createServer } from '@connectum/core';
import { createEventBus, MemoryAdapter } from '@connectum/events';
import type { EventRoute } from '@connectum/events';
import { UserEventHandlers, UserCreatedSchema } from '#gen/user/v1/user_pb.js';
// 1. Define event handlers (mirrors ConnectRPC router pattern)
const userEvents: EventRoute = (events) => {
events.service(UserEventHandlers, {
onUserCreated: async (msg, ctx) => {
console.log(`User created: ${msg.id}, ${msg.email}`);
await ctx.ack();
},
});
};
// 2. Create an EventBus
const eventBus = createEventBus({
adapter: MemoryAdapter(),
routes: [userEvents],
group: 'my-service',
middleware: {
retry: { maxRetries: 3, backoff: 'exponential' },
dlq: { topic: 'my-service.dlq' },
},
});
// 3. Integrate with Connectum server
const server = createServer({
services: [routes],
eventBus,
shutdown: { autoShutdown: true },
});
await server.start();
// 4. Publish typed events
await eventBus.publish(UserCreatedSchema, {
id: '123',
email: '[email protected]',
name: 'Alice',
});Core Concepts
EventBus
The central component managing adapter lifecycle, event routes, middleware pipeline, and publishing. Created via createEventBus(), it implements EventBusLike for integration with createServer().
EventAdapter
A minimal interface for message brokers. Each adapter (NATS, Kafka, Redis, AMQP, Memory) implements connect(context?), disconnect(), publish(), and subscribe(). The optional AdapterContext parameter on connect() carries service-level information (like serviceName) derived from registered proto service descriptors, enabling adapters to identify themselves to brokers automatically. Broker-specific configuration is passed to the adapter constructor, not to the interface methods.
EventRouter
Mirrors ConnectRPC's ConnectRouter pattern for event handlers. Register typed handlers per proto service:
const myEvents: EventRoute = (events) => {
events.service(OrderEventHandlers, {
onOrderCreated: async (msg, ctx) => { /* ... */ },
onOrderCancelled: async (msg, ctx) => { /* ... */ },
});
};EventContext
Per-event context passed to handlers alongside the deserialized protobuf message. Provides explicit ack() / nack() control, event metadata, and an abort signal for graceful shutdown.
API Reference
createEventBus(options)
Factory function that creates an EventBus instance.
function createEventBus(options: EventBusOptions): EventBus & EventBusLike;EventBusOptions
| Option | Type | Default | Description |
|---|---|---|---|
adapter | EventAdapter | required | Adapter instance (e.g., NatsAdapter, KafkaAdapter, MemoryAdapter) |
routes | EventRoute[] | [] | Event routes to register |
group | string | undefined | Consumer group name for load-balanced consumption |
signal | AbortSignal | undefined | Abort signal for graceful shutdown |
handlerTimeout | number | undefined | Timeout in ms for event handler execution |
drainTimeout | number | 30000 | Max ms to wait for in-flight handlers during stop() |
middleware | MiddlewareConfig | undefined | Middleware configuration (retry, DLQ, custom) |
EventBus
| Method | Description |
|---|---|
start() | Connect adapter, set up subscriptions |
stop() | Drain subscriptions, disconnect adapter |
publish(schema, data, options?) | Publish a typed protobuf event |
PublishOptions
| Option | Type | Default | Description |
|---|---|---|---|
topic | string | schema.typeName | Override topic name |
key | string | undefined | Partition/routing key for ordered delivery |
sync | boolean | false | Wait for broker confirmation |
group | string | undefined | Named group tag for workflow grouping |
metadata | Record<string, string> | undefined | Additional metadata / headers |
EventContext
| Property/Method | Type | Description |
|---|---|---|
signal | AbortSignal | Aborted when server is shutting down |
eventId | string | Unique event identifier |
eventType | string | Event type / topic name |
publishedAt | Date | When the event was published |
attempt | number | Delivery attempt number (1-based) |
metadata | ReadonlyMap<string, string> | Event metadata (headers) |
ack() | Promise<void> | Acknowledge successful processing |
nack(requeue?) | Promise<void> | Negative acknowledge -- request redelivery or send to DLQ |
MiddlewareConfig
| Option | Type | Description |
|---|---|---|
retry | RetryOptions | Retry middleware configuration |
dlq | DlqOptions | Dead letter queue configuration |
custom | EventMiddleware[] | Custom user middleware (executed outermost) |
RetryOptions
| Option | Type | Default | Description |
|---|---|---|---|
maxRetries | number | 3 | Maximum retry attempts |
backoff | "exponential" | "linear" | "fixed" | "exponential" | Backoff strategy |
initialDelay | number | 1000 | Initial delay in ms |
maxDelay | number | 30000 | Maximum delay in ms |
multiplier | number | 2 | Multiplier for exponential backoff |
retryableErrors | (error: unknown) => boolean | undefined | Filter: only retry matching errors |
DlqOptions
| Option | Type | Default | Description |
|---|---|---|---|
topic | string | required | DLQ topic name |
errorSerializer | (error: unknown) => Record<string, unknown> | undefined | Custom error serializer for DLQ metadata |
Middleware
The middleware pipeline uses an onion model (outer to inner):
Custom → DLQ → Retry → Handler- Custom middleware runs outermost, wrapping everything
- DLQ catches errors after all retries are exhausted and publishes to a dead letter topic
- Retry catches handler errors and retries with configurable backoff
See Middleware Guide for detailed configuration and custom middleware examples.
Configuration
Integration with createServer
Pass the eventBus to createServer() for automatic lifecycle management:
const server = createServer({
services: [routes],
eventBus,
shutdown: { autoShutdown: true },
});The server calls eventBus.start() on startup and eventBus.stop() on shutdown.
Consumer Groups
Set group to enable load-balanced consumption across multiple service instances:
const eventBus = createEventBus({
adapter: KafkaAdapter({ brokers: ['localhost:9092'] }),
routes: [orderEvents],
group: 'order-service', // All instances share this group
});Exports Summary
| Export | Description |
|---|---|
createEventBus | EventBus factory function |
deriveServiceName | Derives a service identifier from proto service type names (format: {packages}@{hostname}) |
createEventContext | EventContext factory (advanced) |
EventRouterImpl | EventRouter implementation class |
MemoryAdapter | In-memory adapter for testing |
retryMiddleware | Retry middleware factory |
dlqMiddleware | DLQ middleware factory |
composeMiddleware | Middleware composition utility |
resolveTopicName | Topic resolution from proto method descriptors |
matchPattern | Wildcard pattern matching for topics |
NonRetryableError | Error class that skips retry middleware |
RetryableError | Error class that forces retry regardless of predicate |
Type Exports
| Type | Description |
|---|---|
EventBus | EventBus interface |
EventBusOptions | EventBus configuration |
AdapterContext | Context passed to adapters on connect (contains serviceName) |
EventAdapter | Adapter interface |
EventRouter | Router interface |
EventRoute | Route function type |
EventContext | Per-event context |
EventMiddleware | Middleware function type |
PublishOptions | Publish options |
RetryOptions | Retry middleware options |
DlqOptions | DLQ middleware options |
MiddlewareConfig | Built-in middleware configuration |
RawEvent | Raw event data from adapter |
ServiceEventHandlers | Typed handler map for a service |
TypedEventHandler | Typed handler function |
Learn More
- Events Overview -- architecture and design decisions
- Getting Started -- step-by-step tutorial
- Custom Topics -- proto options for topic naming
- Middleware -- retry, DLQ, custom middleware
- Adapters -- Memory, NATS, Kafka, Redis, AMQP comparison
- with-events-redpanda -- Saga pattern example with Redpanda
- with-events-dlq -- DLQ example with NATS JetStream
Related Packages
- @connectum/core -- Server that hosts the EventBus (peer dependency)
- @connectum/events-nats -- NATS JetStream adapter
- @connectum/events-kafka -- Kafka / Redpanda adapter
- @connectum/events-redis -- Redis Streams / Valkey adapter
- @connectum/events-amqp -- AMQP / RabbitMQ adapter
