Skip to content

@connectum/events-amqp

AMQP 0-9-1 adapter for the Connectum EventBus. Provides persistent at-least-once delivery through RabbitMQ (or compatible brokers like LavinMQ) using topic exchanges, competing consumers, dead letter exchanges, and metadata propagation via AMQP message headers.

Layer: 2 (Broker Adapters)

Related Guides

Full API Reference

Complete TypeScript API documentation: API Reference (coming soon)

Installation

bash
pnpm add @connectum/events-amqp

Peer dependency: @connectum/events

Transitive dependency: amqplib (installed automatically)

Quick Start

typescript
import { createEventBus } from '@connectum/events';
import { AmqpAdapter } from '@connectum/events-amqp';

const adapter = AmqpAdapter({
  url: 'amqp://guest:guest@localhost:5672',
});

const bus = createEventBus({
  adapter,
  routes: [myRoutes],
  group: 'my-service',
});

await bus.start();

// Publish a typed event
await bus.publish(UserCreatedSchema, { userId: '123', email: '[email protected]' });

// Graceful shutdown
await bus.stop();

API Reference

AmqpAdapter(options)

Factory function that creates an EventAdapter for AMQP 0-9-1 brokers (RabbitMQ, LavinMQ).

typescript
function AmqpAdapter(options: AmqpAdapterOptions): EventAdapter;

Pass the result to createEventBus({ adapter }).

AmqpAdapterOptions

OptionTypeDefaultDescription
urlstring(required)AMQP connection URL (e.g., "amqp://guest:guest@localhost:5672")
socketOptionsRecord<string, unknown>undefinedLow-level socket options passed to amqplib.connect() (TLS certificates, timeouts)
exchangestring"connectum.events"Exchange name. Created automatically on connect() if it does not exist
exchangeType"topic" | "direct" | "fanout" | "headers""topic"Exchange type. "topic" enables wildcard routing keys
exchangeOptionsAmqpExchangeOptionsundefinedExchange declaration options
queueOptionsAmqpQueueOptionsundefinedQueue declaration options
consumerOptionsAmqpConsumerOptionsundefinedConsumer tuning options
publisherOptionsAmqpPublisherOptionsundefinedPublisher tuning options

AmqpExchangeOptions

OptionTypeDefaultDescription
durablebooleantrueSurvive broker restarts
autoDeletebooleanfalseDelete exchange when all queues unbind

AmqpQueueOptions

OptionTypeDefaultDescription
durablebooleantrueSurvive broker restarts (persists queue metadata)
messageTtlnumberundefinedPer-queue message time-to-live in milliseconds
maxLengthnumberundefinedMaximum number of messages in the queue
deadLetterExchangestringundefinedExchange to route rejected/expired messages to
deadLetterRoutingKeystringundefinedRouting key used when publishing to the dead letter exchange

AmqpConsumerOptions

OptionTypeDefaultDescription
prefetchnumber10Channel-level prefetch count (QoS). Controls how many unacknowledged messages a consumer receives at once
exclusivebooleanfalseExclusive consumer -- only this connection can consume from the queue

AmqpPublisherOptions

OptionTypeDefaultDescription
persistentbooleantrueMark messages as persistent (deliveryMode: 2). Messages survive broker restarts when the queue is durable
mandatorybooleanfalseReturn unroutable messages to the publisher via the return event

Configuration Examples

Minimal

typescript
const adapter = AmqpAdapter({
  url: 'amqp://localhost:5672',
});

Full Configuration

typescript
const adapter = AmqpAdapter({
  url: 'amqp://user:[email protected]:5672/my-vhost',
  exchange: 'orders.events',
  exchangeType: 'topic',
  exchangeOptions: {
    durable: true,
    autoDelete: false,
  },
  queueOptions: {
    durable: true,
    messageTtl: 86_400_000,        // 24 hours
    maxLength: 1_000_000,
    deadLetterExchange: 'orders.dlx',
    deadLetterRoutingKey: 'orders.dead',
  },
  consumerOptions: {
    prefetch: 50,
    exclusive: false,
  },
  publisherOptions: {
    persistent: true,
    mandatory: false,
  },
});

TLS Connection

typescript
import { readFileSync } from 'node:fs';

const adapter = AmqpAdapter({
  url: 'amqps://user:[email protected]:5671',
  socketOptions: {
    cert: readFileSync('/path/to/client-cert.pem'),
    key: readFileSync('/path/to/client-key.pem'),
    ca: [readFileSync('/path/to/ca-cert.pem')],
    rejectUnauthorized: true,
  },
});

Virtual Host (vhost)

AMQP virtual hosts provide logical isolation within a single broker. Specify the vhost in the URL path:

typescript
// Default vhost "/"
const adapter = AmqpAdapter({ url: 'amqp://localhost:5672' });

// Named vhost
const adapter = AmqpAdapter({ url: 'amqp://user:pass@localhost:5672/production' });

// URL-encoded vhost (if name contains special characters)
const adapter = AmqpAdapter({ url: 'amqp://user:pass@localhost:5672/%2Fmy-vhost' });

LavinMQ

LavinMQ is a lightweight, high-performance AMQP 0-9-1 broker compatible with RabbitMQ. The AmqpAdapter works with LavinMQ without modification:

typescript
const adapter = AmqpAdapter({
  url: 'amqp://guest:guest@lavinmq-host:5672',
});

Adapter Lifecycle

The AmqpAdapter follows the EventAdapter interface lifecycle managed by createEventBus():

connect() → publish() / subscribe() → disconnect()
MethodDescription
connect(context?)Opens an AMQP connection, creates a channel, asserts the exchange. Uses context.serviceName as clientProperties.connection_name if not set explicitly.
disconnect()Cancels all active consumers, closes the channel and connection
publish()Publishes a serialized event to the exchange with eventType as the routing key. Metadata is propagated as AMQP message headers
subscribe()Declares queues (named or auto-delete), binds them to the exchange with topic patterns, and starts consuming with explicit ack/nack

Automatic Exchange Creation

On connect(), the adapter asserts the configured exchange (default: "connectum.events" with type "topic"). If the exchange does not exist, it is created. For production, you may want to pre-create exchanges with specific policies via RabbitMQ management.

AMQP Concepts

Understanding a few AMQP concepts helps configure the adapter effectively:

Exchanges

An exchange receives published messages and routes them to bound queues based on the routing key and exchange type. The adapter uses a topic exchange by default, which supports wildcard routing patterns.

Queues

A queue stores messages until they are consumed. The adapter creates queues automatically:

  • With group: named queue {exchange}.{group} -- durable, shared across instances (competing consumers)
  • Without group: anonymous auto-delete queue -- exclusive to this consumer, deleted on disconnect

Routing Keys

Events are published with eventType as the routing key. For example, publishing a user.created event results in the AMQP routing key user.created.

Wildcard Binding

When using a topic exchange, AMQP supports two wildcard tokens in binding patterns:

EventBus PatternAMQP Binding KeyDescription
user.createduser.createdExact match
user.*user.*Matches exactly one word (user.created, user.deleted)
user.>user.#Matches zero or more words (user, user.created, user.profile.updated)

The adapter translates > (EventBus multi-segment wildcard) to # (AMQP multi-word wildcard) automatically.

Consumer Groups (Competing Consumers)

When a group name is set on the EventBus, all instances sharing the same group bind to the same named queue. RabbitMQ distributes messages round-robin across consumers on that queue, ensuring each message is processed by exactly one instance.

typescript
// Two instances share load for order events
const bus = createEventBus({
  adapter: AmqpAdapter({ url: 'amqp://localhost:5672' }),
  routes: [orderRoutes],
  group: 'order-service', // Same group = competing consumers
});

Dead Letter Exchange (DLX)

AMQP natively supports dead letter exchanges. When a message is rejected (nack without requeue) or expires, RabbitMQ routes it to the configured dead letter exchange:

typescript
const adapter = AmqpAdapter({
  url: 'amqp://localhost:5672',
  queueOptions: {
    deadLetterExchange: 'my-service.dlx',
    deadLetterRoutingKey: 'my-service.dead',
  },
});

This works alongside the EventBus-level DLQ middleware. For broker-native DLQ handling, configure queueOptions.deadLetterExchange. For application-level DLQ, use the middleware.dlq option in createEventBus().

At-Least-Once Delivery

The adapter uses manual acknowledgment (AMQP noAck: false). Each message must be acknowledged by the handler. If the handler throws an error, the message is negatively acknowledged (nack) and requeued for redelivery (subject to maxDeliver / retry limits).

Delivery Attempts

AMQP does not natively track delivery attempt counts. The adapter infers the attempt number from the redelivered flag on the message:

  • redelivered: falseattempt: 1 (first delivery)
  • redelivered: trueattempt: 2 (redelivery)

For fine-grained retry control, use the EventBus retry middleware which tracks attempts independently.

Metadata Propagation

Event metadata is transmitted as AMQP message headers, enabling cross-service context propagation:

typescript
// Publishing with metadata
await bus.publish(OrderCreatedSchema, orderData, {
  metadata: {
    'x-correlation-id': correlationId,
    'x-tenant-id': tenantId,
  },
});

On the consumer side, metadata is available through EventContext:

typescript
const routes = (events: EventRouter) => {
  events.service(OrderEventHandlers, {
    async onOrderCreated(event, ctx) {
      const correlationId = ctx.metadata.get('x-correlation-id');
      // ...
    },
  });
};

Exports Summary

ExportDescription
AmqpAdapterFactory function that creates an AMQP adapter
toAmqpPatternConverts EventBus wildcard pattern (>) to AMQP routing key pattern (#)
AmqpAdapterOptionsConfiguration options type
AmqpExchangeOptionsExchange declaration options type
AmqpQueueOptionsQueue declaration options type
AmqpConsumerOptionsConsumer tuning options type
AmqpPublisherOptionsPublisher tuning options type

Learn More