Server-side MQTT protocol codec and connection state machine for JavaScript and TypeScript runtimes. Accepts connections from MQTT clients, parses bytes into typed packets, encodes packets into bytes, and manages per-connection protocol state.
bun add @qualithm/mqtt-wire
# or
npm install @qualithm/mqtt-wire
import * as net from "node:net"
import { MqttWire, PacketType } from "@qualithm/mqtt-wire"
const server = net.createServer((socket) => {
const wire = new MqttWire({
onSend: (data) => socket.write(data),
onConnect: (connect) => {
console.log(`Client connected: ${connect.clientId}`)
return {
type: PacketType.CONNACK,
sessionPresent: false,
reasonCode: 0x00
}
},
onPublish: (packet) => {
const payload = new TextDecoder().decode(packet.payload)
console.log(`[${packet.topic}] ${payload}`)
},
onSubscribe: (packet) => ({
type: PacketType.SUBACK,
packetId: packet.packetId,
reasonCodes: packet.subscriptions.map((s) => s.options.qos)
}),
onDisconnect: () => console.log("Client disconnected")
})
socket.on("data", (chunk) => wire.receive(chunk))
socket.on("close", () => wire.close())
})
server.listen(1883, () => console.log("MQTT server on port 1883"))
import {
BinaryReader,
BinaryWriter,
decodeVariableByteInteger,
encodeVariableByteIntegerToArray
} from "@qualithm/mqtt-wire"
// Encode a variable byte integer
const encoded = encodeVariableByteIntegerToArray(16384)
console.log(encoded) // Uint8Array [0x80, 0x80, 0x01]
// Decode it back
const decoded = decodeVariableByteInteger(encoded, 0)
if (decoded.ok) {
console.log(decoded.value.value) // 16384
}
// Build a packet manually
const writer = new BinaryWriter()
writer
.writeUint8(0x10) // CONNECT packet type
.writeVariableByteInteger(12) // Remaining length
.writeMqttString("MQTT") // Protocol name
.writeUint8(5) // Protocol version
const packet = writer.toUint8Array()
MqttWire uses lifecycle hooks for error reporting — receive() does not throw protocol errors.
import { MqttWire, ProtocolError, StateError, type DecodeResult } from "@qualithm/mqtt-wire"
// Protocol errors from receive() are reported via the onError hook
const wire = new MqttWire({
onSend: (data) => socket.write(data),
onConnect: (connect) => ({
/* ... */
}),
onError: (error) => {
// error is a ProtocolError with an MQTT reason code
console.error(`protocol error: ${error.message}`, {
reasonCode: error.reasonCode
})
socket.destroy()
}
})
// receive() handles protocol errors internally; guard against unexpected failures
socket.on("data", (chunk) => {
wire.receive(chunk).catch((err) => {
console.error("unexpected receive error", err)
socket.destroy()
})
})
// StateError is thrown by outbound methods when called in the wrong state
try {
await wire.publish("topic", payload)
} catch (error) {
if (error instanceof StateError) {
console.error(`state error: ${error.message}`, { state: error.state })
}
}
// Codec functions return Result types (no exceptions)
const result: DecodeResult<number> = decodeVariableByteInteger(data, 0)
if (result.ok) {
console.log(result.value)
} else {
console.error(`[${result.error.code}] ${result.error.message}`)
}
Full API documentation is generated with TypeDoc:
bun run docs
# Output in docs/
See the examples/ directory for runnable examples:
| Example | Description |
|---|---|
node-tcp.ts |
Node.js TCP server |
bun-tcp.ts |
Bun TCP server |
deno-tcp.ts |
Deno TCP server |
websocket.ts |
WebSocket server |
basic-usage.ts |
Low-level codec utilities |
error-handling.ts |
Result type patterns |
bun run examples/node-tcp.ts
bun install
bun run build
bun run test # unit tests
bun run test:coverage # with coverage report
bun run lint
bun run format
bun run typecheck
bun run bench
The package is automatically published to NPM when CI passes on main. Update the version in
package.json before merging to trigger a new release.
Apache-2.0