@qualithm/kafka-client - v0.1.6
    Preparing search index...

    @qualithm/kafka-client - v0.1.6

    Kafka Client

    CI codecov npm

    Native Apache Kafka client for JavaScript and TypeScript runtimes. Implements the Kafka binary protocol directly for producing, consuming, and administering Kafka clusters.

    • Zero native dependencies — pure TypeScript binary protocol implementation
    • Multi-runtime — Bun, Node.js 20+, and Deno
    • Producer — batching, partitioning (murmur2/round-robin/custom), retries, idempotent mode
    • Consumer — group coordination, offset management, rebalance listeners, auto-commit
    • Admin — topic/partition CRUD, config describe/alter
    • SASL authentication — PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
    • SSL/TLS — mutual TLS support via runtime socket adapters
    • Serialisation — built-in JSON/string, pluggable Avro and Protobuf via Schema Registry
    • Compression — gzip, snappy, lz4, zstd
    bun add @qualithm/kafka-client
    # or
    npm install @qualithm/kafka-client
    import { createKafka, createNodeSocketFactory } from "@qualithm/kafka-client"

    const kafka = createKafka({
    config: { brokers: ["localhost:9092"], clientId: "my-app" },
    socketFactory: createNodeSocketFactory()
    })
    await kafka.connect()

    const producer = kafka.producer()
    await producer.send("my-topic", [
    { key: new TextEncoder().encode("key-1"), value: new TextEncoder().encode("hello") }
    ])
    await producer.close()
    await kafka.disconnect()
    import { createKafka, createNodeSocketFactory } from "@qualithm/kafka-client"

    const kafka = createKafka({
    config: { brokers: ["localhost:9092"], clientId: "my-app" },
    socketFactory: createNodeSocketFactory()
    })
    await kafka.connect()

    const producer = kafka.producer()
    await producer.send("my-topic", [
    { key: new TextEncoder().encode("key-1"), value: new TextEncoder().encode("hello") }
    ])
    await producer.close()
    await kafka.disconnect()
    import { createKafka, createNodeSocketFactory } from "@qualithm/kafka-client"

    const kafka = createKafka({
    config: { brokers: ["localhost:9092"], clientId: "my-app" },
    socketFactory: createNodeSocketFactory()
    })
    await kafka.connect()

    const consumer = kafka.consumer({ groupId: "my-group" })
    consumer.subscribe(["my-topic"])
    await consumer.connect()

    const records = await consumer.poll()
    for (const record of records) {
    console.log(new TextDecoder().decode(record.message.value!))
    }

    await consumer.close()
    await kafka.disconnect()
    const admin = kafka.admin()
    await admin.createTopics({
    topics: [{ name: "new-topic", numPartitions: 3, replicationFactor: 1 }],
    timeoutMs: 30000
    })
    const topics = await admin.listTopics()
    // Bun
    import { createBunSocketFactory } from "@qualithm/kafka-client"
    const socketFactory = createBunSocketFactory()

    // Node.js
    import { createNodeSocketFactory } from "@qualithm/kafka-client"
    const socketFactory = createNodeSocketFactory()

    // Deno
    import { createDenoSocketFactory } from "@qualithm/kafka-client"
    const socketFactory = createDenoSocketFactory()

    Register compression providers before producing or consuming compressed record batches:

    import { registerCompressionProvider, createSnappyProvider } from "@qualithm/kafka-client"
    import snappy from "snappy" // bring your own codec

    registerCompressionProvider(createSnappyProvider(snappy))

    Available: gzipProvider, deflateProvider, createSnappyProvider, createLz4Provider, createZstdProvider.

    import { SchemaRegistry, createAvroSerde } from "@qualithm/kafka-client"

    const registry = new SchemaRegistry({ baseUrl: "http://localhost:8081" })
    const serde = createAvroSerde<MyType>({ registry, subject: "my-topic-value", codec: avroCodec })

    // Serialize for producing
    const encoded = await serde.serialize("my-topic", myData)

    // Deserialize when consuming
    const decoded = await serde.deserialize("my-topic", record.message.value!)

    Full API documentation is generated with TypeDoc:

    bun run docs
    # Output in docs/

    See the examples/ directory for runnable examples:

    Example Description
    basic-usage.ts Connect, produce, and consume
    batch-processing.ts Batch produce and consume
    produce-consume.ts End-to-end produce/consume flow
    error-handling.ts Error handling patterns
    bun run examples/basic-usage.ts
    
    • Bun (recommended), Node.js 20+, or Deno
    bun install
    
    bun run build
    
    bun run test              # unit tests
    bun run test:integration # integration tests (requires a running broker)
    bun run test:coverage # with coverage report
    bun run lint
    bun run format
    bun run typecheck
    bun run bench
    

    The package is automatically published to NPM when CI passes on main. Update the version in package.json before merging to trigger a new release.

    Apache-2.0