Skip to content

@connectum/events-kafka

Apache Kafka / Redpanda adapter for the Connectum EventBus. Implements the EventAdapter interface using KafkaJS, providing production-grade pub/sub with consumer groups, message compression, and topic-pattern subscriptions.

Layer: 2 (Broker Adapters)

Related Guides

Full API Reference

Complete TypeScript API documentation: API Reference (coming soon)

Installation

bash
pnpm add @connectum/events-kafka

Peer dependency: @connectum/events

Transitive dependency: kafkajs ^2.2.4 (installed automatically)

Quick Start

Topic Creation

Kafka does not create topics automatically by default (allowAutoTopicCreation: false). You must create topics before publishing. Use kafka-topics --create or set allowAutoTopicCreation: true for development.

typescript
import { createEventBus } from '@connectum/events';
import { KafkaAdapter } from '@connectum/events-kafka';

const adapter = KafkaAdapter({
  brokers: ['localhost:9092'],
  clientId: 'my-service',
  // Enable auto-creation for development (not recommended for production)
  // consumerOptions: { allowAutoTopicCreation: true },
});

const eventBus = createEventBus({
  adapter,
  routes: [myEventRoutes],
  group: 'my-consumer-group',
});

await eventBus.start();
await eventBus.publish(UserCreatedSchema, { id: '1', email: '[email protected]', name: 'Alice' });
await eventBus.stop();

API Reference

KafkaAdapter(options)

Factory function that creates an EventAdapter for Apache Kafka or Redpanda brokers.

typescript
import { KafkaAdapter } from '@connectum/events-kafka';

function KafkaAdapter(options: KafkaAdapterOptions): EventAdapter;

Pass the result to createEventBus({ adapter: ... }).

KafkaAdapterOptions

OptionTypeDefaultDescription
brokersstring[]--Required. Kafka broker addresses (e.g., ["localhost:9092"])
clientIdstringderived from protoClient ID for this producer/consumer. Defaults to the service name derived from proto descriptors (e.g., order.v1@pod-abc123), falling back to "connectum" when no services are registered. See Automatic Client Identification.
kafkaConfigOmit<Partial<KafkaConfig>, "brokers" | "clientId">--Additional KafkaJS configuration overrides (merged with brokers and clientId)
producerOptions.compressionCompressionTypes--Compression type for produced messages (e.g., CompressionTypes.GZIP)
consumerOptions.sessionTimeoutnumber30000Session timeout in milliseconds
consumerOptions.fromBeginningbooleanfalseWhether to start consuming from the beginning of topics
consumerOptions.allowAutoTopicCreationbooleanfalseAllow automatic topic creation when subscribing

EventAdapter Interface

The returned adapter implements the standard EventAdapter interface from @connectum/events:

typescript
interface EventAdapter {
  readonly name: string;                        // "kafka"
  connect(context?: AdapterContext): Promise<void>;  // Connect producer to broker
  disconnect(): Promise<void>;                  // Disconnect all consumers and producer
  publish(eventType: string, payload: Uint8Array, options?: PublishOptions): Promise<void>;
  subscribe(patterns: string[], handler: RawEventHandler, options?: RawSubscribeOptions): Promise<EventSubscription>;
}

The AdapterContext is provided automatically by the EventBus and contains a serviceName derived from proto service descriptors. The Kafka adapter uses it as the default clientId when none is explicitly configured.

Topic Pattern Matching

The adapter converts NATS-style wildcard patterns to Kafka-compatible regular expressions for topic subscription:

PatternMatchesDescription
user.createduser.createdExact topic match
user.*user.created, user.deletedSingle segment wildcard
user.>user.created, user.profile.updatedMulti-segment wildcard (greedy)

Literal patterns (without * or >) are passed directly as Kafka topic names. Wildcard patterns are converted to RegExp for Kafka's regex-based subscription.

Message Headers

The adapter automatically manages message headers:

Published headers (set automatically):

HeaderDescription
x-event-idUnique UUID for the event
x-published-atISO 8601 timestamp of publication

Custom metadata passed via PublishOptions.metadata is encoded as additional Kafka headers.

On the consumer side, all headers are parsed and exposed via RawEvent.metadata (with internal headers stripped).

Redpanda Compatibility

Redpanda is a Kafka-compatible streaming platform. The KafkaAdapter works with Redpanda out of the box -- simply point brokers to your Redpanda cluster:

typescript
const adapter = KafkaAdapter({
  brokers: ['localhost:19092'],
  clientId: 'my-service',
});

No additional configuration is needed. Redpanda supports the Kafka protocol natively, including consumer groups, topic patterns, and message compression.

Configuration

Minimal Configuration

typescript
const adapter = KafkaAdapter({
  brokers: ['localhost:9092'],
});

Full Configuration

typescript
import { CompressionTypes } from 'kafkajs';
import { KafkaAdapter } from '@connectum/events-kafka';

const adapter = KafkaAdapter({
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  clientId: 'order-service',
  kafkaConfig: {
    connectionTimeout: 3000,
    requestTimeout: 25000,
    retry: {
      initialRetryTime: 100,
      retries: 8,
    },
    ssl: true,
    sasl: {
      mechanism: 'plain',
      username: 'user',
      password: 'password',
    },
  },
  producerOptions: {
    compression: CompressionTypes.GZIP,
  },
  consumerOptions: {
    sessionTimeout: 30000,
    fromBeginning: false,
  },
});

With EventBus Middleware

typescript
import { createEventBus } from '@connectum/events';
import { KafkaAdapter } from '@connectum/events-kafka';

const eventBus = createEventBus({
  adapter: KafkaAdapter({
    brokers: ['localhost:9092'],
    clientId: 'order-service',
  }),
  routes: [orderEvents],
  group: 'order-service',
  middleware: {
    retry: { maxRetries: 3, backoff: 'exponential', initialDelay: 1000 },
    dlq: { topic: 'order-service.dlq' },
  },
});

With Server Integration

typescript
import { createServer } from '@connectum/core';
import { createEventBus } from '@connectum/events';
import { KafkaAdapter } from '@connectum/events-kafka';

const eventBus = createEventBus({
  adapter: KafkaAdapter({ brokers: ['localhost:9092'] }),
  routes: [orderEvents],
  group: 'order-service',
});

const server = createServer({
  services: [routes],
  eventBus,
  shutdown: { autoShutdown: true },
});

await server.start();

Exports Summary

ExportDescription
KafkaAdapterAdapter factory function
KafkaAdapterOptionsOptions type (TypeScript only)