Small, task-focused snippets for the data modality. Each assumes a constructed
client:
import { Data } from "@clutchcall/sdk/data";
const data = new Data({
token: process.env.CLUTCHCALL_CREDENTIALS!,
clientId: "device-7",
});
Publish a reading (lossy, default)
The default lane is the QUIC datagram lane — lowest latency, best for high-rate
telemetry where the freshest value wins.
await data.publish({
topic: "sensors/room1/temperature",
payload: new TextEncoder().encode("23.5"),
});
Publish an event that must arrive (reliable lane)
Flip reliable: true for application events, commands, or settlements — ordered
and retried per publisher.
await data.publish({
topic: "events/orders/shipped",
payload: new TextEncoder().encode(JSON.stringify({ orderId: "A-91" })),
reliable: true,
});
Subscribe with a + (single-segment) wildcard
+ matches exactly one path segment — here, any room.
const sub = await data.subscribe(
{ topicFilter: "sensors/+/temperature" },
(msg) => console.log(msg.topic, new TextDecoder().decode(msg.payload)),
);
Subscribe to a whole subtree with #
# matches the rest of the path and must be the trailing segment. One
subscription covers everything under events/.
await data.subscribe({ topicFilter: "events/#" }, (msg) => {
console.log("event:", msg.topic, "from", msg.fromClientId);
});
Publish retained current state
A retained message is cached by the relay and delivered to every future
subscriber on attach. Use it for “current state.”
await data.publish({
topic: "devices/device-7/state",
payload: new TextEncoder().encode(JSON.stringify({ online: true, version: "1.4.2" })),
retained: true,
});
Clear a retained topic
Publish a zero-length payload with retained: true — typically on clean
shutdown so dashboards flip to “offline” without a heartbeat timeout.
await data.publish({
topic: "devices/device-7/state",
payload: new Uint8Array(0),
retained: true,
});
Tell a bootstrap value from a live one
Retained values re-emitted on attach arrive with msg.retained === true. Render
the snapshot, then track live updates.
await data.subscribe({ topicFilter: "devices/+/state" }, (msg) => {
if (msg.retained) applySnapshot(msg); // cached current state
else applyUpdate(msg); // live change
});
Fan in from a whole fleet to one dashboard
Every message carries fromClientId, so one subscription demuxes many
publishers without a track per device.
const latest = new Map<string, number>();
await data.subscribe({ topicFilter: "sensors/+/temperature" }, (msg) => {
latest.set(msg.fromClientId, Number(new TextDecoder().decode(msg.payload)));
});
Publish and consume JSON
Payloads are opaque bytes — encode and decode JSON yourself.
const enc = (o: unknown) => new TextEncoder().encode(JSON.stringify(o));
const dec = (b: Uint8Array) => JSON.parse(new TextDecoder().decode(b));
await data.publish({ topic: "events/users/signup", payload: enc({ userId: "u-7" }) });
await data.subscribe({ topicFilter: "events/users/#" }, (msg) => {
const body = dec(msg.payload);
console.log("signup", body.userId);
});
Route by topic with a dispatch table
One subscription on a subtree, many handlers keyed by sub-filter — the classic
event-bus shape.
import { topicMatches } from "@clutchcall/sdk/data";
const routes: [string, (m: any) => void][] = [
["events/orders/shipped", (m) => onShipped(m)],
["events/users/signup", (m) => onSignup(m)],
];
await data.subscribe({ topicFilter: "events/#" }, (msg) => {
for (const [pattern, fn] of routes)
if (topicMatches(msg.topic, pattern)) return fn(msg);
});
Stop a single subscription
const sub = await data.subscribe({ topicFilter: "sensors/#" }, handle);
// later…
sub.close();
Watch connection state
Pass onState to react to reconnects. Subscriptions and retained values are
replayed automatically after a drop.
const data = new Data({
token, clientId: "device-7",
onState: (state, reason) => console.log("data session:", state, reason ?? ""),
});
Publish from Python
The Python surface mirrors the TS one with snake_case keywords.
from clutchcall.data import Data
data = Data(token=token, client_id="device-7")
data.publish(topic="sensors/room1/temperature", payload=b"23.5")
sub = data.subscribe(
topic_filter="sensors/+/temperature",
on_message=lambda m: print(m.topic, m.from_client_id, m.payload),
)
Stay wire-compatible without the client
Use the exported codec to build or parse frames yourself.
import { encodeDataFrame, decodeDataFrame } from "@clutchcall/sdk/data";
const frame = encodeDataFrame("device-7", "sensors/room1/temp", payload);
const decoded = decodeDataFrame(frame); // { fromClientId, topic, payload }
Keep the top-level topic segment concrete and meaningful (sensors, events,
devices) — it selects the underlying track, so it is also your coarse fan-out
and routing boundary.