Longer, realistic builds that combine several robotics methods. Each is a small but complete app you can lift into your own project.

Recipe 1 — On-robot ROS 2 bridge

Run this beside a robot’s ROS 2 graph. It forwards local telemetry topics onto the mesh as raw CDR, and applies cloud commands back onto the local graph. Local nodes never learn the relay exists — they publish and subscribe their topics as usual; this process is the off-box transport.
1

Connect for this robot

One Robotics client per robot. The robotId builds both namespaces.
2

Forward telemetry up

Subscribe local /odom and /battery_state; push each message’s raw CDR to a robot/<id> telemetry track. Reliable for odometry, best-effort for battery.
3

Apply commands down

Subscribe the robot/<id>/ctl command track; re-publish cmd_vel onto the local DDS. Guard on the wire type name so schema drift is dropped.
import os, signal, threading
from rclpy.node import Node
import rclpy
from rclpy.serialization import serialize_message, deserialize_message
from nav_msgs.msg import Odometry
from sensor_msgs.msg import BatteryState
from geometry_msgs.msg import Twist

from clutchcall.robotics import Robotics, QoSProfile

def main() -> int:
    token    = os.environ["CLUTCHCALL_CREDENTIALS"]
    robot_id = os.environ.get("ROBOT_ID", "spot-12")

    rclpy.init()
    node = Node("clutchcall_bridge")
    r = Robotics(relay_host="relay.clutchcall.dev", robot_id=robot_id, token=token)

    # ── telemetry: ROS → mesh ────────────────────────────────────────────
    odom_pub = r.publish_telemetry(
        topic="odom", type_name="nav_msgs/msg/Odometry",
        qos=QoSProfile(reliability="reliable", depth=10))
    batt_pub = r.publish_telemetry(
        topic="battery_state", type_name="sensor_msgs/msg/BatteryState",
        qos=QoSProfile(reliability="best_effort", depth=1))

    node.create_subscription(Odometry,     "/odom",
        lambda m: odom_pub.write(serialize_message(m)), 10)
    node.create_subscription(BatteryState, "/battery_state",
        lambda m: batt_pub.write(serialize_message(m)), 1)

    # ── commands: mesh → ROS ─────────────────────────────────────────────
    cmd_vel = node.create_publisher(Twist, "/cmd_vel", 1)
    def on_cmd(cdr: bytes, type_name: str) -> None:
        if type_name != "geometry_msgs/msg/Twist":
            return  # schema guard; the relay logs the drop
        cmd_vel.publish(deserialize_message(cdr, Twist))
    r.subscribe_command(topic="cmd_vel", on_message=on_cmd)

    print(f"bridge up: robot_id={robot_id}")

    stop = threading.Event()
    threading.Thread(
        target=lambda: [rclpy.spin_once(node, timeout_sec=0.1)
                        for _ in iter(lambda: not stop.is_set(), False)],
        daemon=True).start()
    signal.signal(signal.SIGTERM, lambda *_: stop.set())
    try:
        stop.wait()
    finally:
        odom_pub.close(); batt_pub.close(); r.close()
        node.destroy_node(); rclpy.shutdown()
    return 0

if __name__ == "__main__":
    raise SystemExit(main())
If the robot speaks Zenoh natively, drop the rclpy parts — connect over Zenoh-over-QUIC and the MoQT side is identical, since both land on the same namespaces and the same type-name-prefixed frame.

Recipe 2 — Cloud teleop console

A browser console that drives one robot and renders its live pose. Telemetry comes up best-effort (latency wins); the drive command goes down reliable so a stop is never dropped. The same QUIC session carries both directions.
import { Robotics } from "@clutchcall/sdk/robotics";

const robotId = "spot-12";
const r = new Robotics({
  relayHost: "relay.clutchcall.dev",
  robotId,
  token: await fetchRelayToken(robotId),   // your control-plane API mints it
});

// ── render live pose (best-effort, freshest sample) ─────────────────────
await r.subscribeTelemetry({ topic: "odom" }, (cdr, typeName) => {
  if (typeName !== "nav_msgs/msg/Odometry") return;
  const { x, y, yaw } = decodeOdom(cdr);
  drawRobot(x, y, yaw);
});

// ── show battery (latched, so a fresh tab sees it instantly) ────────────
await r.subscribeTelemetry({ topic: "battery_state" }, (cdr, typeName) => {
  if (typeName === "sensor_msgs/msg/BatteryState") setBattery(decodeBattery(cdr));
});

// ── drive: reliable lane, with an urgent priority for E-stop ────────────
const cmd = await r.publishCommand({
  topic:    "cmd_vel",
  typeName: "geometry_msgs/msg/Twist",
  qos:      { reliability: "reliable" },
});

joystick.on("move", ({ linear, angular }) => cmd.write(encodeTwist(linear, angular)));
estopButton.on("press", () => cmd.write(encodeTwist(0, 0), /* priority */ 0));

window.addEventListener("beforeunload", () => { cmd.write(encodeTwist(0, 0), 0); r.close(); });
Always send a zero-velocity command on teardown, and give it priority 0 so it jumps any queued frames. A reliable command lane guarantees delivery, but the robot only stops when it receives the stop — make it the last thing you send.

Recipe 3 — Fleet telemetry collector

A headless cloud worker that subscribes to battery and odometry across a whole fleet and writes them to your time-series store. One Robotics client per robot; every client multiplexes its tracks over its own QUIC session, fanned out by the relay mesh.
import { Robotics } from "@clutchcall/sdk/robotics";

interface Sink { write(robot: string, topic: string, typeName: string, cdr: Uint8Array): void }

async function collectFleet(robotIds: string[], sink: Sink): Promise<() => void> {
  const clients = robotIds.map(robotId => {
    const r = new Robotics({
      relayHost: "relay.clutchcall.dev",
      robotId,
      token: process.env.CLUTCHCALL_CREDENTIALS!,
      onError: (e) => console.warn(`[${robotId}] relay error:`, e.message),
    });
    return { robotId, r };
  });

  await Promise.all(clients.flatMap(({ robotId, r }) =>
    ["odom", "battery_state"].map(topic =>
      r.subscribeTelemetry({ topic }, (cdr, typeName) =>
        sink.write(robotId, topic, typeName, cdr)))));

  console.log(`collecting ${robotIds.length} robots × 2 topics`);
  return () => clients.forEach(({ r }) => r.close()); // shutdown handle
}

// usage
const stop = await collectFleet(["spot-12", "spot-13", "spot-14"], myTsdbSink);
process.on("SIGTERM", stop);
Because each robot carries its own auth token scoped to (tenant, robotId), you can revoke one robot’s access without touching the rest of the fleet — the collector just stops receiving that robot’s frames on the next relay re-auth.

Mixed-transport fan-in

The collector above doesn’t care how each robot reached the mesh. A robot on the ROS 2 (CDR) path, one on Zenoh-over-QUIC, and a constrained sensor on MQTT 3.1.1 all publish the same type-name-prefixed frame onto robot/<id>, so a single subscribeTelemetry handler ingests all three without special-casing the source.