Skip to content

Adapters

The EventBus uses pluggable adapters to communicate with different message brokers. Each adapter implements the EventAdapter interface, handling connection management, serialization at the wire level, and subscription lifecycle. Broker-specific configuration is isolated in the adapter constructor.

Adapter Comparison

FeatureMemoryNATS JetStreamKafkaRedis StreamsAMQP / RabbitMQ
Package@connectum/events@connectum/events-nats@connectum/events-kafka@connectum/events-redis@connectum/events-amqp
BrokerNone (in-process)NATS 2.x+Apache KafkaRedis 5+RabbitMQ 3.x+
Compatible with----RedpandaValkeyLavinMQ
Client library--@nats-io/transport-nodeKafkaJSioredisamqplib
PersistenceNoYes (JetStream)Yes (log-based)Yes (AOF/RDB)Yes (durable queues)
Consumer groupsNoYes (durable consumers)Yes (native)Yes (XREADGROUP)Yes (competing consumers)
OrderingPer-publishPer-subjectPer-partitionPer-streamPer-queue
Wildcard topicsYes (*, >)Yes (NATS native)NoNoYes (*, #)
Delivery guaranteeAt-most-onceAt-least-onceAt-least-onceAt-least-onceAt-least-once
Ideal forUnit tests, devLow-latency, cloud-nativeHigh-throughput, event sourcingExisting Redis stackComplex routing, enterprise integration

Memory Adapter

Built into @connectum/events. Delivers events synchronously in-process with no external dependencies. Supports wildcard patterns (* and >).

Use case: Unit testing, local development, prototyping.

typescript
import { MemoryAdapter } from '@connectum/events';

const adapter = MemoryAdapter();

No configuration options. Connect and disconnect are no-ops (they only toggle an internal connected flag).

Not for Production

MemoryAdapter provides no persistence, no consumer groups, and at-most-once delivery. Events are lost on process restart. Use it only for testing.

NATS JetStream Adapter

Provides persistent at-least-once delivery through NATS JetStream with durable consumers, wildcard routing, and metadata propagation via NATS headers.

bash
pnpm add @connectum/events-nats
typescript
import { NatsAdapter } from '@connectum/events-nats';

const adapter = NatsAdapter({
  servers: 'nats://localhost:4222',
  stream: 'orders',
  consumerOptions: {
    deliverPolicy: 'new',
    ackWait: 30_000,
    maxDeliver: 5,
  },
});

NatsAdapterOptions

OptionTypeDefaultDescription
serversstring | string[]requiredNATS server URL(s)
streamstring"events"JetStream stream name. Subjects are prefixed with {stream}.
connectionOptionsPartial<NodeConnectionOptions>undefinedAdvanced NATS connection config
consumerOptionsNatsConsumerOptionsundefinedJetStream consumer tuning

NatsConsumerOptions

OptionTypeDefaultDescription
deliverPolicy"new" | "all" | "last""new"Where new consumers start reading
ackWaitnumber30000Ack timeout in ms before redelivery
maxDelivernumber5Max delivery attempts before server-side discard

When to Use NATS

  • Cloud-native microservices needing lightweight, low-latency messaging
  • Wildcard routing for flexible topic hierarchies
  • Cluster-native with built-in clustering and no external dependencies (like ZooKeeper)
  • JetStream provides persistence, replay, and exactly-once semantics

Kafka Adapter

KafkaJS-based adapter for Apache Kafka and Kafka-compatible brokers like Redpanda.

bash
pnpm add @connectum/events-kafka
typescript
import { KafkaAdapter } from '@connectum/events-kafka';

const adapter = KafkaAdapter({
  brokers: ['localhost:9092'],
  clientId: 'order-service',
  consumerOptions: {
    sessionTimeout: 30_000,
    fromBeginning: false,
  },
});

KafkaAdapterOptions

OptionTypeDefaultDescription
brokersstring[]requiredKafka broker addresses
clientIdstring"connectum"Client identifier for the producer/consumer
kafkaConfigOmit<Partial<KafkaConfig>, "brokers" | "clientId">undefinedAdditional KafkaJS configuration overrides
producerOptions.compressionCompressionTypesundefinedMessage compression type
consumerOptions.sessionTimeoutnumber30000Session timeout in ms
consumerOptions.fromBeginningbooleanfalseStart consuming from beginning of topics
consumerOptions.allowAutoTopicCreationbooleanfalseAllow automatic topic creation when subscribing

Redpanda Compatibility

Redpanda implements the Kafka wire protocol. The @connectum/events-kafka adapter works with Redpanda out of the box -- no configuration changes needed:

typescript
const adapter = KafkaAdapter({
  brokers: (process.env.REDPANDA_BROKERS ?? 'localhost:9092').split(','),
  clientId: 'my-service',
});

See the with-events-redpanda example for a full saga pattern with Redpanda and Redpanda Console.

When to Use Kafka

  • High-throughput event streaming with millions of events/second
  • Event sourcing with persistent log-based storage
  • Existing Kafka infrastructure or need for the Kafka ecosystem (Connect, Streams, ksqlDB)
  • Redpanda deployments for Kafka-compatible, ZooKeeper-free operation

Redis Streams Adapter

Uses Redis Streams (XADD / XREADGROUP / XACK) for durable, ordered event delivery with consumer groups.

bash
pnpm add @connectum/events-redis
typescript
import { RedisAdapter } from '@connectum/events-redis';

const adapter = RedisAdapter({
  url: 'redis://localhost:6379',
  brokerOptions: {
    maxLen: 100_000,
    blockMs: 5_000,
    count: 10,
  },
});

RedisAdapterOptions

OptionTypeDefaultDescription
urlstringundefinedRedis connection URL (e.g., redis://localhost:6379)
redisOptionsRedisOptionsundefinedioredis connection options (alternative to url)
brokerOptionsRedisBrokerOptionsundefinedRedis Streams tuning

RedisBrokerOptions

OptionTypeDefaultDescription
maxLennumberundefinedMaximum stream length (MAXLEN approximate for XADD)
blockMsnumber5000Block timeout in ms for XREADGROUP
countnumber10Messages per XREADGROUP call

Valkey Compatibility

Valkey is an open-source Redis fork that implements the same Streams API. The @connectum/events-redis adapter works with Valkey without modification:

typescript
const adapter = RedisAdapter({
  url: 'redis://valkey-host:6379',
});

When to Use Redis Streams

  • Existing Redis/Valkey infrastructure you want to reuse for messaging
  • Simple streaming without the complexity of a dedicated broker
  • Ordered delivery within a single stream
  • Moderate throughput with low operational overhead

AMQP / RabbitMQ Adapter

Uses the AMQP 0-9-1 protocol via amqplib for durable messaging with topic exchanges, competing consumers, and native dead letter exchange (DLX) support.

bash
pnpm add @connectum/events-amqp
typescript
import { AmqpAdapter } from '@connectum/events-amqp';

const adapter = AmqpAdapter({
  url: 'amqp://guest:guest@localhost:5672',
  exchange: 'orders.events',
  queueOptions: {
    durable: true,
    deadLetterExchange: 'orders.dlx',
  },
  consumerOptions: {
    prefetch: 50,
  },
});

AmqpAdapterOptions

OptionTypeDefaultDescription
urlstringrequiredAMQP connection URL
socketOptionsRecord<string, unknown>undefinedSocket options for TLS and advanced config
exchangestring"connectum.events"Exchange name (auto-created on connect)
exchangeType"topic" | "direct" | "fanout" | "headers""topic"Exchange type
exchangeOptionsAmqpExchangeOptionsundefinedExchange declaration options
queueOptionsAmqpQueueOptionsundefinedQueue declaration options (durable, TTL, max length, DLX)
consumerOptionsAmqpConsumerOptionsundefinedConsumer tuning (prefetch, exclusive)
publisherOptionsAmqpPublisherOptionsundefinedPublisher options (persistent, mandatory)

LavinMQ Compatibility

LavinMQ is a lightweight, high-performance AMQP 0-9-1 broker compatible with RabbitMQ. The AmqpAdapter works with LavinMQ without modification:

typescript
const adapter = AmqpAdapter({
  url: 'amqp://guest:guest@lavinmq-host:5672',
});

When to Use AMQP / RabbitMQ

  • Complex routing with topic exchanges, headers-based routing, or multi-exchange topologies
  • Enterprise integration patterns (DLX, TTL, priority queues, message deduplication)
  • Existing RabbitMQ infrastructure or AMQP-compatible brokers (LavinMQ)
  • Wildcard routing with per-queue ordering and competing consumers
  • Mature ecosystem with management UI, federation, and shovel plugins

Choosing an Adapter

Decision Tree

Quick Reference

ScenarioRecommended Adapter
Unit / integration testsMemoryAdapter
Cloud-native, Kubernetes-firstNatsAdapter
High-throughput event streamingKafkaAdapter
Redpanda deploymentKafkaAdapter
Already running Redis/ValkeyRedisAdapter
Already running RabbitMQ/LavinMQAmqpAdapter
Wildcard topic routing neededNatsAdapter, AmqpAdapter, or MemoryAdapter
Complex routing, enterprise integrationAmqpAdapter
Event sourcing / audit logKafkaAdapter
Minimal infrastructureNatsAdapter (single binary)

Automatic Client Identification

When the EventBus starts, it derives a service name from registered proto service descriptors and passes it to the adapter via AdapterContext. Adapters use this for broker-level client identification, which improves observability in broker dashboards and monitoring tools.

The derived name follows the format {packageNames}@{hostname}:

Registered ServicesDerived Name
order.v1.OrderEventServiceorder.v1@pod-abc123
order.v1.OrderEventService + payment.v1.PaymentEventServiceorder.v1/payment.v1@pod-abc123

Each adapter maps this to the appropriate broker concept:

AdapterBroker ConceptConfig Override
KafkaclientIdKafkaAdapterOptions.clientId
NATSConnection name (visible in /connz)connectionOptions.name
RedisCLIENT SETNAMEredisOptions.connectionName
AMQPConnection name (clientProperties.connection_name, visible in Management UI)socketOptions
MemoryNot used--

Explicit adapter options always take priority over the derived name. If you set clientId, connectionOptions.name, or redisOptions.connectionName directly, the adapter uses your value.

EventAdapter Interface

All adapters implement this interface:

typescript
interface EventAdapter {
  /** Adapter name (e.g., "nats", "kafka", "redis", "amqp", "memory") */
  readonly name: string;

  /** Connect to the message broker */
  connect(context?: AdapterContext): Promise<void>;

  /** Disconnect from the message broker */
  disconnect(): Promise<void>;

  /** Publish a serialized event to a topic */
  publish(eventType: string, payload: Uint8Array, options?: PublishOptions): Promise<void>;

  /** Subscribe to event patterns with a raw handler */
  subscribe(
    patterns: string[],
    handler: RawEventHandler,
    options?: RawSubscribeOptions,
  ): Promise<EventSubscription>;
}

Implementing a Custom Adapter

To integrate with a broker not covered by the built-in adapters, implement the EventAdapter interface:

typescript
import type { EventAdapter, EventSubscription, PublishOptions, RawEventHandler, RawSubscribeOptions } from '@connectum/events';

export function MyBrokerAdapter(options: MyOptions): EventAdapter {
  return {
    name: 'my-broker',

    async connect() {
      // Establish connection to broker
    },

    async disconnect() {
      // Clean up connections and subscriptions
    },

    async publish(eventType, payload, publishOptions) {
      // Serialize and send to broker
    },

    async subscribe(patterns, handler, subscribeOptions): Promise<EventSubscription> {
      // Set up subscription, deliver events to handler
      return {
        async unsubscribe() {
          // Clean up this subscription
        },
      };
    },
  };
}