Skip to content

@protoutil/pubsub

Kafka transport entry point for @protoutil/pubsub.

Terminal window
npm install @protoutil/pubsub @confluentinc/kafka-javascript

@confluentinc/kafka-javascript is an optional peer dependency. The core package does not import it directly; the Kafka entry point uses the real Confluent client when the application imports @protoutil/pubsub/kafka.

import { KafkaJS } from "@confluentinc/kafka-javascript";
import { createKafkaScheduler, createKafkaTransport } from "@protoutil/pubsub/kafka";
import { createPublisher, createRouter } from "@protoutil/pubsub";
const kafka = new KafkaJS.Kafka({
"bootstrap.servers": "localhost:9092",
});
const scheduler = createKafkaScheduler({
client: kafka,
options: {
schedulesTopic: "protoutil.pubsub.schedules",
historyTopic: "protoutil.pubsub.schedule_history",
consumerGroup: "protoutil.pubsub.scheduler",
historyRetentionMs: 604_800_000,
deliveryConcurrency: 16,
deliveryRetryDelayMs: 1_000,
},
});
const transport = createKafkaTransport({
client: kafka,
scheduler,
publishTimeoutMs: 10_000,
defaultSource: "billing-service",
});
const publisher = createPublisher(BillingEvents, transport, {
source: "billing-service",
topic: {
invoiceCreated: "billing.invoice.created",
},
});
const router = createRouter(BillingEvents, transport, {
topic: {
invoiceCreated: "billing.invoice.created",
},
deadLetterTopic: "billing.__deadletter",
});
router.service({
async invoiceCreated(request, ctx) {
await processInvoice(request.invoiceId);
await ctx.ack();
},
});
const subscription = await router.subscribe({
consumerGroup: "billing-workers",
concurrency: 10,
});
await publisher.invoiceCreated({
invoiceId: "inv_123",
customerId: "cus_456",
});
await subscription.unsubscribe();
await transport.close();

Application event code stays transport-neutral. Switching from Kafka to another backer should only change the transport construction.

The transport does not require topics at construction. The router resolves subscribe topics when router.subscribe() runs, and delayed delivery only works when you pass an explicit scheduler.

router.subscribe() returns a subscription handle. Call subscription.unsubscribe() when that subscriber should stop receiving messages.

transport.close() closes the backing Kafka admin, producer, scheduler consumer, and active subscriber consumers. Call it from application shutdown hooks when the process is exiting or when the whole transport is no longer needed.

Use both methods at their own scopes:

  • subscription.unsubscribe() stops one router subscription.
  • transport.close() closes the Kafka clients owned by the transport.

If you prefer one lifecycle handle, pass signal to createKafkaTransport(), createKafkaScheduler(), and router.subscribe({ signal }), then call abort() once to stop subscriptions and close connections.

Kafka has no native durable notBefore or retry-delay primitive, so the transport owns a scheduler topic pattern:

  • schedulesTopic stores durable schedule state keyed by CloudEvent id.
  • historyTopic is append-only history for future scheduler auditing and handoff records.
  • consumerGroup identifies scheduler workers that coordinate ownership of schedule partitions.
  • historyRetentionMs configures retention for the scheduler history topic.
  • deliveryConcurrency bounds how many due schedules a scheduler worker tries to deliver at once.
  • deliveryRetryDelayMs controls in-process backoff after a scheduler delivery failure.
  • The schedules topic must have cleanup.policy=compact.
  • The schedules topic must have retention.ms=-1.

The transport creates this topology by default before the first publish or subscribe. If the application disables auto-creation, the same configs still have to exist before delayed publish can be considered durable.

For production, provision the scheduler topics deliberately:

  • use enough partitions for the expected scheduled-event volume
  • set the schedules topic cleanup.policy=compact
  • set the schedules topic retention.ms=-1
  • set a replication factor appropriate for the cluster
  • set historyRetentionMs or an equivalent broker-level retention on the history topic

Delayed events are stored as Kafka messages where the value is the serialized io.cloudevents.v1.CloudEvent protobuf bytes and the scheduler metadata lives in headers:

  • CloudEvent id
  • scheduler record version
  • target delivery topic
  • RFC 3339 notBefore
  • one-based delivery attempt count

On startup, the scheduler reads the compacted schedules topic from the beginning and rebuilds timers from the persisted headers and CloudEvent value. Tombstone messages cancel pending timers, so delivered schedules are removed from active state after the target event is produced.

Scheduled delivery is durable and at least once. A crash near the delivery boundary can duplicate a delivery, so handlers should be idempotent.

The public event envelope remains CloudEvents 1.0. The scheduler record is Kafka transport state, not a replacement event envelope.

Kafka subscriber consumers default to manual offset commits. The transport commits an offset only after the handler disposition has been handled:

  • ack commits the consumed offset.
  • retry persists a durable retry schedule, then commits the consumed offset.
  • reject and dead_letter publish to deadLetterTopic when configured, then commit the consumed offset.

If deadLetterTopic is not configured, reject and dead_letter are committed without a secondary publish.

Dead-letter Kafka messages keep the original CloudEvent payload and include Kafka headers for the disposition, original topic, original partition, and original offset. Those headers are transport diagnostics; application handlers should continue to use CloudEvent data and extensions.

Malformed subscribed Kafka records that cannot be decoded as io.cloudevents.v1.CloudEvent are reported through the parseFailed interceptor operation and committed so one poison record cannot pin the consumer group forever.

Retry attempts are tracked in Kafka transport headers and passed to handlers as ctx.attempt. The first delivery is attempt 1; each durable retry increments the value before redelivery.

When router.subscribe({ maxAttempts }) is set and a handler retries on the final allowed attempt, Kafka publishes the original CloudEvent to deadLetterTopic when configured, fires the retryExhausted interceptor operation, commits the consumed offset, and does not create another retry schedule.

Use interceptors for transport-local diagnostics without changing the transport-neutral application API:

const scheduler = createKafkaScheduler({
client: kafka,
options: {
schedulesTopic: "protoutil.pubsub.schedules",
historyTopic: "protoutil.pubsub.schedule_history",
},
});
const transport = createKafkaTransport({
client: kafka,
scheduler,
interceptors: [
(next) => async (ctx) => {
switch (ctx.operation) {
case "scheduled":
logger.info(ctx.event, "scheduled pubsub event");
break;
case "delivered":
logger.info(ctx.event, "delivered scheduled pubsub event");
break;
case "retried":
logger.info(ctx.event, "scheduled retry for pubsub event");
break;
case "retryExhausted":
logger.warn(ctx.event, "pubsub retry attempts exhausted");
break;
case "deadLettered":
logger.warn(ctx.event, "published pubsub event to dead letter topic");
break;
case "deliveryFailed":
logger.error(ctx.event, "failed to deliver scheduled pubsub event");
break;
case "parseFailed":
logger.error(ctx.event, "failed to parse Kafka pubsub record");
break;
}
return next(ctx);
},
],
});

The current Kafka transport supports:

  • lazy setup and explicit teardown for real Confluent admin, producer, and consumer clients
  • scheduler topic creation with the required compacted/unlimited-retention config
  • immediate CloudEvent publish to the resolved Kafka topic
  • durable notBefore schedule publish to the compacted schedules topic and delivery no earlier than notBefore
  • durable retry scheduling from ctx.retry({ delay }) and redelivery no earlier than the requested delay
  • retry attempt tracking through transport headers and ctx.attempt
  • portable router.subscribe({ maxAttempts }) enforcement for retry exhaustion
  • versioned scheduler records in Kafka headers
  • router.subscribe({ concurrency }) mapped to Kafka partition concurrency when provided
  • scheduled publish recovery after transport restart
  • schedule replacement by CloudEvent id
  • tombstone replay preventing delivered schedules from redelivering after restart
  • multiple scheduler workers using a shared scheduler consumer group
  • manual offset commits after dispositions are handled
  • optional dead-letter topic publishing for reject and dead_letter
  • dead-letter diagnostic headers for original topic, partition, offset, and disposition
  • poison-record parse failure reporting and offset commits
  • interceptors for scheduled, retried, recovered, delivered, tombstoned, dead-lettered, failed, parse-failed, and committed operations
  • lifecycle interceptor errors are caught and do not break delivery flow
  • publish and handle interceptors can modify requests, deliveries, and dispositions
  • subscriber consumption of CloudEvent protobuf records from configured topics
  • subscription.unsubscribe() to stop one subscriber without replacing the transport

The Docker-backed test suite starts a real Kafka broker with Docker Compose and verifies public pubsub behavior against Kafka itself:

  • publish and consume through createPublisher() and createRouter()
  • subscription.unsubscribe() stops later deliveries
  • notBefore does not deliver early
  • scheduled publishes recover after transport restart
  • duplicate schedule ids use the latest schedule
  • tombstoned schedules do not redeliver after restart
  • multiple scheduler instances do not duplicate scheduled delivery
  • dead-letter dispositions publish to a configured dead-letter topic
  • ctx.retry({ delay }) does not redeliver early
  • maxAttempts sends exhausted retries to dead-letter instead of scheduling again
  • interceptor failures do not break scheduled delivery

Run the opt-in load test locally with:

Terminal window
pnpm moon run pubsub:load-test

The load test is not a dependency of build, test, or conformance. It runs the shared pubsub load scenario through the Kafka transport adapter.

Configure the run with environment variables:

VariableDefaultDescription
PUBSUB_LOAD_EVENT_COUNT1000Number of delayed events to publish and consume.
PUBSUB_LOAD_NOT_BEFORE_MS1000Delay before scheduled delivery.
PUBSUB_LOAD_PUBLISH_CONCURRENCY50Concurrent publish calls.
PUBSUB_LOAD_SUBSCRIBE_CONCURRENCY32Requested subscriber concurrency.
PUBSUB_LOAD_TIMEOUT_MSmax(60000, eventCount * 25)Timeout for the delivery assertion.
PUBSUB_LOAD_TEST_TIMEOUT_MS120000Vitest timeout for the load-test file.

Example heavier local run:

Terminal window
PUBSUB_LOAD_EVENT_COUNT=10000 PUBSUB_LOAD_TIMEOUT_MS=300000 pnpm moon run pubsub:load-test

Local Docker is useful for catching obvious scheduler, memory, and broker-pressure issues. Before relying on a new deployment shape, run the same task against broker resources that look like production.

Run the opt-in benchmark locally with:

Terminal window
pnpm moon run pubsub:benchmark

The benchmark is not a dependency of build, test, conformance, or the load test. It runs shared transport-neutral benchmark scenarios through the Kafka adapter so later RabbitMQ and NATS adapters can publish comparable results. Each scenario is measured on both a cold path and a warmed-up path.

Benchmark output is printed as a Markdown table and written to the package-level BENCHMARK.md:

| Transport | Scenario | Mode | Messages | Publish Concurrency | Subscribe Concurrency | Duration ms | Msg/s | p50 ms | p95 ms | p99 ms | Duplicates |
| --- | --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |
| kafka | immediate publish/consume | cold start | 1000 | 50 | 32 | 3319 | 301.21 | 3089.44 | 3209.61 | 3222.48 | 0 |
| kafka | immediate publish/consume | warmed up | 1000 | 50 | 32 | 412 | 2427.18 | 14.82 | 21.45 | 26.90 | 0 |
| kafka | scheduled publish/consume (scheduler only) | warmed up | 1000 | 50 | 32 | 615 | 1626.02 | 29.10 | 41.55 | 48.20 | 0 |

Configure the run with environment variables:

VariableDefaultDescription
PUBSUB_BENCHMARK_EVENT_COUNT1000Number of events per benchmark scenario.
PUBSUB_BENCHMARK_PUBLISH_CONCURRENCY50Concurrent publish calls.
PUBSUB_BENCHMARK_SUBSCRIBE_CONCURRENCY32Requested subscriber concurrency.
PUBSUB_BENCHMARK_NOT_BEFORE_MS1000Delay before scheduled delivery in the scheduled scenario.
PUBSUB_BENCHMARK_SCHEDULER_ONLY_NOT_BEFORE_MS0Delay before scheduled delivery in the scheduler-only scenario.
PUBSUB_BENCHMARK_TIMEOUT_MS120000Per-scenario delivery timeout.
PUBSUB_BENCHMARK_TEST_TIMEOUT_MS300000Vitest timeout for the benchmark file.

Example shorter smoke run:

Terminal window
PUBSUB_BENCHMARK_EVENT_COUNT=100 pnpm moon run pubsub:benchmark

Benchmark methodology and interpretation notes live in BENCHMARK.md.

For a production Kafka deployment:

  • provision scheduler topics with compaction, unlimited schedule retention, adequate partitions, and adequate replication
  • configure deadLetterTopic
  • set router.subscribe({ maxAttempts })
  • make handlers idempotent; delivery is durable and at least once, not exactly once
  • wire interceptors into logs or metrics
  • run pnpm moon run pubsub:load-test at an event count that reflects expected traffic
  • run pnpm moon run pubsub:benchmark when publishing throughput and latency numbers
  • configure Confluent client authentication, TLS, timeouts, retries, and broker settings outside the transport-neutral API

Kafka-specific tuning such as brokers, TLS, SASL, offsets, partitions, and group behavior belongs in the Confluent client configuration passed to this transport, not in the transport-neutral pubsub core.