End-to-end worked examples for the data modality: an IoT device + dashboard with retained state, and a typed microservice event bus.
Longer, end-to-end examples that combine several Data methods into realistic
mini-apps. Each is self-contained. They assume the SDK is installed and a relay
token is available in CLUTCHCALL_CREDENTIALS.
IoT device + dashboard
Retained boot state, lossy telemetry, reliable alerts, and a dashboard that
bootstraps from retained values then tracks live.
Microservice event bus
Typed pub/sub across services: producers publish once, many consumers each
subscribe to the slice of the topic tree they care about.
Retained state at boot (devices/<id>/state) so a dashboard joining late
sees who is online without waiting for a heartbeat.
Lossy telemetry (sensors/<id>/temperature) on the datagram lane.
Reliable alerts (events/alerts/<id>) on the ordered lane.
1
The device publisher
Announce retained state, loop telemetry on the lossy lane, and emit alerts
reliably. On shutdown, clear the retained state so the dashboard reads
“offline.”
import { Data } from "@clutchcall/sdk/data";const enc = (o: unknown) => new TextEncoder().encode(JSON.stringify(o));const deviceId = process.argv[2] ?? "device-7";const data = new Data({ token: process.env.CLUTCHCALL_CREDENTIALS!, clientId: deviceId,});// 1. Retained boot state — sticks for late joiners.await data.publish({ topic: `devices/${deviceId}/state`, payload: enc({ online: true, version: "1.4.2" }), retained: true,});// 2. Telemetry on the lossy lane; alert reliably past a threshold.const timer = setInterval(async () => { const temp = 20 + Math.random() * 8; await data.publish({ topic: `sensors/${deviceId}/temperature`, payload: new TextEncoder().encode(temp.toFixed(2)), }); if (temp > 27) { await data.publish({ topic: `events/alerts/${deviceId}`, payload: enc({ kind: "over_temp", temp }), reliable: true, }); }}, 1000);// 3. Clean shutdown — clear retained state, then close.process.on("SIGTERM", async () => { clearInterval(timer); await data.publish({ topic: `devices/${deviceId}/state`, payload: new Uint8Array(0), // zero-length clears the retained value retained: true, }); await data.close(); process.exit(0);});
2
The dashboard
One client, three subscriptions. The retained devices/+/state values arrive
on attach (msg.retained === true) so the dashboard bootstraps the fleet
roster immediately, then tracks live telemetry and alerts.
import { Data, type DataMessage } from "@clutchcall/sdk/data";const dec = (m: DataMessage) => new TextDecoder().decode(m.payload);const online = new Map<string, boolean>();const lastTemp = new Map<string, number>();const data = new Data({ token: process.env.CLUTCHCALL_CREDENTIALS!, clientId: "dashboard", onState: (s) => console.log("session:", s),});// Roster: retained snapshots arrive first, then live changes.await data.subscribe({ topicFilter: "devices/+/state" }, (msg) => { const id = msg.topic.split("/")[1]; online.set(id, msg.payload.length > 0 && JSON.parse(dec(msg)).online === true); console.log(msg.retained ? "snapshot" : "update", id, online.get(id));});// Live telemetry from every device.await data.subscribe({ topicFilter: "sensors/+/temperature" }, (msg) => { lastTemp.set(msg.fromClientId, Number(dec(msg)));});// Reliable alerts.await data.subscribe({ topicFilter: "events/alerts/#" }, (msg) => { console.warn("ALERT", msg.fromClientId, dec(msg));});
The dashboard never needed a per-device subscription. fromClientId plus the
topic do the demux, and the retained state topics make the roster instantly
correct on (re)connect — the SDK replays subscriptions and the relay re-emits
retained values after any transient drop.
Producers publish domain events once to events/<svc>/<kind>; many consumers
each subscribe to the subtree they care about and dispatch by topic. No broker
to run — the relay mesh fans out.
1
The producer
Emit events on the reliable lane so nothing is dropped.
import { Data } from "@clutchcall/sdk/data";const data = new Data({ token: process.env.CLUTCHCALL_CREDENTIALS!, clientId: "orders-svc" });const emit = (kind: string, body: unknown) => data.publish({ topic: `events/orders/${kind}`, payload: new TextEncoder().encode(JSON.stringify(body)), reliable: true, });await emit("placed", { orderId: "A-91", total: 42_00 });await emit("shipped", { orderId: "A-91", carrier: "acme" });
2
The consumer
One subscription on events/#, a dispatch table of handlers keyed by
sub-filter, and a fallback for anything unmatched. The SDK filters
client-side, so adding a handler costs nothing on the wire.
import { Data, topicMatches, type DataMessage } from "@clutchcall/sdk/data";const data = new Data({ token: process.env.CLUTCHCALL_CREDENTIALS!, clientId: `consumer-${process.pid}`,});const routes: [string, (m: DataMessage) => void][] = [ ["events/orders/shipped", (m) => fulfil(m)], ["events/orders/+", (m) => audit(m)], // catch-all for orders ["events/users/signup", (m) => welcome(m)],];await data.subscribe({ topicFilter: "events/#" }, (msg) => { let matched = false; for (const [pattern, fn] of routes) { if (topicMatches(msg.topic, pattern)) { fn(msg); matched = true; } } if (!matched) console.log("unhandled:", msg.topic, "from", msg.fromClientId);});function fulfil(m: DataMessage) { /* … */ }function audit(m: DataMessage) { /* … */ }function welcome(m: DataMessage) { /* … */ }
3
Scale out consumers
Run as many consumer instances as you like. Each opens its own session and
subscription; the relay fans every matching event to all of them. Give each
a distinct clientId so producers and audit logs can attribute them.
Reserve top-level segments as bounded domains — events, sensors, devices,
fleet. Because the top-level segment maps to one underlying track, a small,
stable set keeps fan-out coarse and predictable while + / # give you all the
routing flexibility below it.