Longer, realistic builds that combine several robotics methods. Each is a small
but complete app you can lift into your own project.
Recipe 1 — On-robot ROS 2 bridge
Run this beside a robot’s ROS 2 graph. It forwards local telemetry topics onto
the mesh as raw CDR, and applies cloud commands back onto the local graph. Local
nodes never learn the relay exists — they publish and subscribe their topics as
usual; this process is the off-box transport.
Connect for this robot
One Robotics client per robot. The robotId builds both namespaces.
Forward telemetry up
Subscribe local /odom and /battery_state; push each message’s raw CDR to
a robot/<id> telemetry track. Reliable for odometry, best-effort for
battery.
Apply commands down
Subscribe the robot/<id>/ctl command track; re-publish cmd_vel onto the
local DDS. Guard on the wire type name so schema drift is dropped.
import os, signal, threading
from rclpy.node import Node
import rclpy
from rclpy.serialization import serialize_message, deserialize_message
from nav_msgs.msg import Odometry
from sensor_msgs.msg import BatteryState
from geometry_msgs.msg import Twist
from clutchcall.robotics import Robotics, QoSProfile
def main() -> int:
token = os.environ["CLUTCHCALL_CREDENTIALS"]
robot_id = os.environ.get("ROBOT_ID", "spot-12")
rclpy.init()
node = Node("clutchcall_bridge")
r = Robotics(relay_host="relay.clutchcall.dev", robot_id=robot_id, token=token)
# ── telemetry: ROS → mesh ────────────────────────────────────────────
odom_pub = r.publish_telemetry(
topic="odom", type_name="nav_msgs/msg/Odometry",
qos=QoSProfile(reliability="reliable", depth=10))
batt_pub = r.publish_telemetry(
topic="battery_state", type_name="sensor_msgs/msg/BatteryState",
qos=QoSProfile(reliability="best_effort", depth=1))
node.create_subscription(Odometry, "/odom",
lambda m: odom_pub.write(serialize_message(m)), 10)
node.create_subscription(BatteryState, "/battery_state",
lambda m: batt_pub.write(serialize_message(m)), 1)
# ── commands: mesh → ROS ─────────────────────────────────────────────
cmd_vel = node.create_publisher(Twist, "/cmd_vel", 1)
def on_cmd(cdr: bytes, type_name: str) -> None:
if type_name != "geometry_msgs/msg/Twist":
return # schema guard; the relay logs the drop
cmd_vel.publish(deserialize_message(cdr, Twist))
r.subscribe_command(topic="cmd_vel", on_message=on_cmd)
print(f"bridge up: robot_id={robot_id}")
stop = threading.Event()
threading.Thread(
target=lambda: [rclpy.spin_once(node, timeout_sec=0.1)
for _ in iter(lambda: not stop.is_set(), False)],
daemon=True).start()
signal.signal(signal.SIGTERM, lambda *_: stop.set())
try:
stop.wait()
finally:
odom_pub.close(); batt_pub.close(); r.close()
node.destroy_node(); rclpy.shutdown()
return 0
if __name__ == "__main__":
raise SystemExit(main())
If the robot speaks Zenoh natively, drop the rclpy parts — connect over
Zenoh-over-QUIC and the MoQT side is identical, since both land on the same
namespaces and the same type-name-prefixed frame.
Recipe 2 — Cloud teleop console
A browser console that drives one robot and renders its live pose. Telemetry
comes up best-effort (latency wins); the drive command goes down reliable so a
stop is never dropped. The same QUIC session carries both directions.
import { Robotics } from "@clutchcall/sdk/robotics";
const robotId = "spot-12";
const r = new Robotics({
relayHost: "relay.clutchcall.dev",
robotId,
token: await fetchRelayToken(robotId), // your control-plane API mints it
});
// ── render live pose (best-effort, freshest sample) ─────────────────────
await r.subscribeTelemetry({ topic: "odom" }, (cdr, typeName) => {
if (typeName !== "nav_msgs/msg/Odometry") return;
const { x, y, yaw } = decodeOdom(cdr);
drawRobot(x, y, yaw);
});
// ── show battery (latched, so a fresh tab sees it instantly) ────────────
await r.subscribeTelemetry({ topic: "battery_state" }, (cdr, typeName) => {
if (typeName === "sensor_msgs/msg/BatteryState") setBattery(decodeBattery(cdr));
});
// ── drive: reliable lane, with an urgent priority for E-stop ────────────
const cmd = await r.publishCommand({
topic: "cmd_vel",
typeName: "geometry_msgs/msg/Twist",
qos: { reliability: "reliable" },
});
joystick.on("move", ({ linear, angular }) => cmd.write(encodeTwist(linear, angular)));
estopButton.on("press", () => cmd.write(encodeTwist(0, 0), /* priority */ 0));
window.addEventListener("beforeunload", () => { cmd.write(encodeTwist(0, 0), 0); r.close(); });
Always send a zero-velocity command on teardown, and give it priority 0 so it
jumps any queued frames. A reliable command lane guarantees delivery, but the
robot only stops when it receives the stop — make it the last thing you send.
Recipe 3 — Fleet telemetry collector
A headless cloud worker that subscribes to battery and odometry across a whole
fleet and writes them to your time-series store. One Robotics client per
robot; every client multiplexes its tracks over its own QUIC session, fanned out
by the relay mesh.
import { Robotics } from "@clutchcall/sdk/robotics";
interface Sink { write(robot: string, topic: string, typeName: string, cdr: Uint8Array): void }
async function collectFleet(robotIds: string[], sink: Sink): Promise<() => void> {
const clients = robotIds.map(robotId => {
const r = new Robotics({
relayHost: "relay.clutchcall.dev",
robotId,
token: process.env.CLUTCHCALL_CREDENTIALS!,
onError: (e) => console.warn(`[${robotId}] relay error:`, e.message),
});
return { robotId, r };
});
await Promise.all(clients.flatMap(({ robotId, r }) =>
["odom", "battery_state"].map(topic =>
r.subscribeTelemetry({ topic }, (cdr, typeName) =>
sink.write(robotId, topic, typeName, cdr)))));
console.log(`collecting ${robotIds.length} robots × 2 topics`);
return () => clients.forEach(({ r }) => r.close()); // shutdown handle
}
// usage
const stop = await collectFleet(["spot-12", "spot-13", "spot-14"], myTsdbSink);
process.on("SIGTERM", stop);
Because each robot carries its own auth token scoped to (tenant, robotId), you
can revoke one robot’s access without touching the rest of the fleet — the
collector just stops receiving that robot’s frames on the next relay re-auth.
Mixed-transport fan-in
The collector above doesn’t care how each robot reached the mesh. A robot on the
ROS 2 (CDR) path, one on Zenoh-over-QUIC, and a constrained sensor on
MQTT 3.1.1 all publish the same type-name-prefixed frame onto robot/<id>,
so a single subscribeTelemetry handler ingests all three without special-casing
the source.