Native Apache Kafka client for JavaScript and TypeScript runtimes. Implements the Kafka binary protocol directly for producing, consuming, and administering Kafka clusters.
bun add @qualithm/kafka-client
# or
npm install @qualithm/kafka-client
import { createKafka, createNodeSocketFactory } from "@qualithm/kafka-client"
const kafka = createKafka({
config: { brokers: ["localhost:9092"], clientId: "my-app" },
socketFactory: createNodeSocketFactory()
})
await kafka.connect()
const producer = kafka.producer()
await producer.send("my-topic", [
{ key: new TextEncoder().encode("key-1"), value: new TextEncoder().encode("hello") }
])
await producer.close()
await kafka.disconnect()
import { createKafka, createNodeSocketFactory } from "@qualithm/kafka-client"
const kafka = createKafka({
config: { brokers: ["localhost:9092"], clientId: "my-app" },
socketFactory: createNodeSocketFactory()
})
await kafka.connect()
const producer = kafka.producer()
await producer.send("my-topic", [
{ key: new TextEncoder().encode("key-1"), value: new TextEncoder().encode("hello") }
])
await producer.close()
await kafka.disconnect()
import { createKafka, createNodeSocketFactory } from "@qualithm/kafka-client"
const kafka = createKafka({
config: { brokers: ["localhost:9092"], clientId: "my-app" },
socketFactory: createNodeSocketFactory()
})
await kafka.connect()
const consumer = kafka.consumer({ groupId: "my-group" })
consumer.subscribe(["my-topic"])
await consumer.connect()
const records = await consumer.poll()
for (const record of records) {
console.log(new TextDecoder().decode(record.message.value!))
}
await consumer.close()
await kafka.disconnect()
const admin = kafka.admin()
await admin.createTopics({
topics: [{ name: "new-topic", numPartitions: 3, replicationFactor: 1 }],
timeoutMs: 30000
})
const topics = await admin.listTopics()
// Bun
import { createBunSocketFactory } from "@qualithm/kafka-client"
const socketFactory = createBunSocketFactory()
// Node.js
import { createNodeSocketFactory } from "@qualithm/kafka-client"
const socketFactory = createNodeSocketFactory()
// Deno
import { createDenoSocketFactory } from "@qualithm/kafka-client"
const socketFactory = createDenoSocketFactory()
Register compression providers before producing or consuming compressed record batches:
import { registerCompressionProvider, createSnappyProvider } from "@qualithm/kafka-client"
import snappy from "snappy" // bring your own codec
registerCompressionProvider(createSnappyProvider(snappy))
Available: gzipProvider, deflateProvider, createSnappyProvider, createLz4Provider,
createZstdProvider.
import { SchemaRegistry, createAvroSerde } from "@qualithm/kafka-client"
const registry = new SchemaRegistry({ baseUrl: "http://localhost:8081" })
const serde = createAvroSerde<MyType>({ registry, subject: "my-topic-value", codec: avroCodec })
// Serialize for producing
const encoded = await serde.serialize("my-topic", myData)
// Deserialize when consuming
const decoded = await serde.deserialize("my-topic", record.message.value!)
Full API documentation is generated with TypeDoc:
bun run docs
# Output in docs/
See the examples/ directory for runnable examples:
| Example | Description |
|---|---|
basic-usage.ts |
Connect, produce, and consume |
batch-processing.ts |
Batch produce and consume |
produce-consume.ts |
End-to-end produce/consume flow |
error-handling.ts |
Error handling patterns |
bun run examples/basic-usage.ts
bun install
bun run build
bun run test # unit tests
bun run test:integration # integration tests (requires a running broker)
bun run test:coverage # with coverage report
bun run lint
bun run format
bun run typecheck
bun run bench
The package is automatically published to NPM when CI passes on main. Update the version in
package.json before merging to trigger a new release.
Apache-2.0