# Data — Recipes

> End-to-end worked examples for the data modality: an IoT device + dashboard with retained state, and a typed microservice event bus.

Longer, end-to-end examples that combine several `Data` methods into realistic
mini-apps. Each is self-contained. They assume the SDK is installed and a relay
token is available in `CLUTCHCALL_CREDENTIALS`.

  - **[IoT device + dashboard](#recipe-1-iot-device-dashboard)** — Retained boot state, lossy telemetry, reliable alerts, and a dashboard that
    bootstraps from retained values then tracks live.
  - **[Microservice event bus](#recipe-2-typed-event-bus)** — Typed pub/sub across services: producers publish once, many consumers each
    subscribe to the slice of the topic tree they care about.

## Recipe 1 — IoT device + dashboard

A fleet of devices each report:

- **Retained state** at boot (`devices/<id>/state`) so a dashboard joining late
  sees who is online without waiting for a heartbeat.
- **Lossy telemetry** (`sensors/<id>/temperature`) on the datagram lane.
- **Reliable alerts** (`events/alerts/<id>`) on the ordered lane.

  1. **The device publisher**
Announce retained state, loop telemetry on the lossy lane, and emit alerts
    reliably. On shutdown, clear the retained state so the dashboard reads
    "offline."

    
    **TypeScript:**
```ts
    import { Data } from "@clutchcall/sdk/data";

    const enc = (o: unknown) => new TextEncoder().encode(JSON.stringify(o));
    const deviceId = process.argv[2] ?? "device-7";

    const data = new Data({
      token:    process.env.CLUTCHCALL_CREDENTIALS!,
      clientId: deviceId,
    });

    // 1. Retained boot state — sticks for late joiners.
    await data.publish({
      topic:    `devices/${deviceId}/state`,
      payload:  enc({ online: true, version: "1.4.2" }),
      retained: true,
    });

    // 2. Telemetry on the lossy lane; alert reliably past a threshold.
    const timer = setInterval(async () => {
      const temp = 20 + Math.random() * 8;
      await data.publish({
        topic:   `sensors/${deviceId}/temperature`,
        payload: new TextEncoder().encode(temp.toFixed(2)),
      });
      if (temp > 27) {
        await data.publish({
          topic:    `events/alerts/${deviceId}`,
          payload:  enc({ kind: "over_temp", temp }),
          reliable: true,
        });
      }
    }, 1000);

    // 3. Clean shutdown — clear retained state, then close.
    process.on("SIGTERM", async () => {
      clearInterval(timer);
      await data.publish({
        topic:    `devices/${deviceId}/state`,
        payload:  new Uint8Array(0),   // zero-length clears the retained value
        retained: true,
      });
      await data.close();
      process.exit(0);
    });
    ```

    **Python:**
```python
    import json, os, random, signal, sys, threading
    from clutchcall.data import Data

    device_id = sys.argv[1] if len(sys.argv) > 1 else "device-7"
    data = Data(token=os.environ["CLUTCHCALL_CREDENTIALS"], client_id=device_id)

    # 1. Retained boot state.
    data.publish(
        topic=f"devices/{device_id}/state",
        payload=json.dumps({"online": True, "version": "1.4.2"}).encode(),
        retained=True,
    )

    stop = threading.Event()
    signal.signal(signal.SIGTERM, lambda *_: stop.set())

    try:
        while not stop.is_set():
            temp = 20 + random.random() * 8
            data.publish(
                topic=f"sensors/{device_id}/temperature",
                payload=f"{temp:.2f}".encode(),
            )
            if temp > 27:
                data.publish(
                    topic=f"events/alerts/{device_id}",
                    payload=json.dumps({"kind": "over_temp", "temp": temp}).encode(),
                    reliable=True,
                )
            stop.wait(1.0)
    finally:
        # 3. Clear retained state, then close.
        data.publish(topic=f"devices/{device_id}/state", payload=b"", retained=True)
        data.close()
    ```

  2. **The dashboard**
One client, three subscriptions. The retained `devices/+/state` values arrive
    on attach (`msg.retained === true`) so the dashboard bootstraps the fleet
    roster immediately, then tracks live telemetry and alerts.

    
    **TypeScript:**
```ts
    import { Data, type DataMessage } from "@clutchcall/sdk/data";

    const dec = (m: DataMessage) => new TextDecoder().decode(m.payload);
    const online  = new Map<string, boolean>();
    const lastTemp = new Map<string, number>();

    const data = new Data({
      token:    process.env.CLUTCHCALL_CREDENTIALS!,
      clientId: "dashboard",
      onState:  (s) => console.log("session:", s),
    });

    // Roster: retained snapshots arrive first, then live changes.
    await data.subscribe({ topicFilter: "devices/+/state" }, (msg) => {
      const id = msg.topic.split("/")[1];
      online.set(id, msg.payload.length > 0 && JSON.parse(dec(msg)).online === true);
      console.log(msg.retained ? "snapshot" : "update", id, online.get(id));
    });

    // Live telemetry from every device.
    await data.subscribe({ topicFilter: "sensors/+/temperature" }, (msg) => {
      lastTemp.set(msg.fromClientId, Number(dec(msg)));
    });

    // Reliable alerts.
    await data.subscribe({ topicFilter: "events/alerts/#" }, (msg) => {
      console.warn("ALERT", msg.fromClientId, dec(msg));
    });
    ```

    **Python:**
```python
    import json, os
    from clutchcall.data import Data, DataMessage

    online, last_temp = {}, {}
    data = Data(token=os.environ["CLUTCHCALL_CREDENTIALS"], client_id="dashboard")

    def on_state(m: DataMessage) -> None:
        dev = m.topic.split("/")[1]
        online[dev] = bool(m.payload) and json.loads(m.payload).get("online") is True
        print("snapshot" if m.retained else "update", dev, online[dev])

    data.subscribe(topic_filter="devices/+/state", on_message=on_state)
    data.subscribe(
        topic_filter="sensors/+/temperature",
        on_message=lambda m: last_temp.__setitem__(m.from_client_id, float(m.payload)),
    )
    data.subscribe(
        topic_filter="events/alerts/#",
        on_message=lambda m: print("ALERT", m.from_client_id, m.payload),
    )
    ```

> **NOTE:**
> The dashboard never needed a per-device subscription. `fromClientId` plus the
> topic do the demux, and the retained `state` topics make the roster instantly
> correct on (re)connect — the SDK replays subscriptions and the relay re-emits
> retained values after any transient drop.

## Recipe 2 — Typed event bus

Producers publish domain events once to `events/<svc>/<kind>`; many consumers
each subscribe to the subtree they care about and dispatch by topic. No broker
to run — the relay mesh fans out.

  1. **The producer**
Emit events on the reliable lane so nothing is dropped.

    ```ts
    import { Data } from "@clutchcall/sdk/data";

    const data = new Data({ token: process.env.CLUTCHCALL_CREDENTIALS!, clientId: "orders-svc" });
    const emit = (kind: string, body: unknown) =>
      data.publish({
        topic:    `events/orders/${kind}`,
        payload:  new TextEncoder().encode(JSON.stringify(body)),
        reliable: true,
      });

    await emit("placed",  { orderId: "A-91", total: 42_00 });
    await emit("shipped", { orderId: "A-91", carrier: "acme" });
    ```

  2. **The consumer**
One subscription on `events/#`, a dispatch table of handlers keyed by
    sub-filter, and a fallback for anything unmatched. The SDK filters
    client-side, so adding a handler costs nothing on the wire.

    ```ts
    import { Data, topicMatches, type DataMessage } from "@clutchcall/sdk/data";

    const data = new Data({
      token: process.env.CLUTCHCALL_CREDENTIALS!,
      clientId: `consumer-${process.pid}`,
    });

    const routes: [string, (m: DataMessage) => void][] = [
      ["events/orders/shipped", (m) => fulfil(m)],
      ["events/orders/+",       (m) => audit(m)],   // catch-all for orders
      ["events/users/signup",   (m) => welcome(m)],
    ];

    await data.subscribe({ topicFilter: "events/#" }, (msg) => {
      let matched = false;
      for (const [pattern, fn] of routes) {
        if (topicMatches(msg.topic, pattern)) { fn(msg); matched = true; }
      }
      if (!matched) console.log("unhandled:", msg.topic, "from", msg.fromClientId);
    });

    function fulfil(m: DataMessage)  { /* … */ }
    function audit(m: DataMessage)   { /* … */ }
    function welcome(m: DataMessage) { /* … */ }
    ```

  3. **Scale out consumers**
Run as many consumer instances as you like. Each opens its own session and
    subscription; the relay fans every matching event to all of them. Give each
    a distinct `clientId` so producers and audit logs can attribute them.

> **TIP:**
> Reserve top-level segments as bounded domains — `events`, `sensors`, `devices`,
> `fleet`. Because the top-level segment maps to one underlying track, a small,
> stable set keeps fan-out coarse and predictable while `+` / `#` give you all the
> routing flexibility below it.

## Where to go next

  - **[Details](/modalities/data/details)** — Wire model, lanes/QoS, retained semantics, and the architecture.
  - **[SDK Methods](/modalities/data/sdk-methods)** — Every method, parameter, type, and event on the `Data` client.
