from clutchcall.voice import Voice
from clutchcall.streams import Streams, BroadcastPublisher, BroadcastViewer
from clutchcall.robotics import Robotics
from clutchcall.games import Games
from clutchcall.data import Data
from clutchcall.moqt import MoqtClient
Voice
v = Voice(base_url="https://app.clutchcall.dev",
api_key=os.environ["CLUTCHCALL_API_KEY"],
org_id="org_abc")
Calls
call = v.calls.originate(to="+15551234567",
from_="+15558675309",
trunk_id="trunk_main",
agent="healthcare-assistant")
# → Call
v.calls.get(sid) # → Call
v.calls.list(org_id=None, cursor=None, limit=None) # → (calls, cursor?)
v.calls.transfer(sid, to=..., trunk_id=None) # → Call
v.calls.hangup(sid) # → None
v.calls.terminate(sid) # → None
Call exposes:
call.sid: str
call.status: str # "dialing" | "ringing" | "in_progress" | "completed" | "failed" | "no_answer"
call.from_: str
call.to: str
call.refresh() # → CallData
call.hangup()
call.transfer(to=..., trunk_id=None) # → Call
call.on_status(cb) # → Unsubscribe
AudioBridge
bridge = v.audio_bridge.attach(call.sid, codec="opus",
on_uplink=lambda frame, ts_us: my_asr.feed(frame))
bridge.publish_uplink(frame: bytes)
bridge.publish_downlink(frame: bytes)
bridge.on_uplink(cb)
bridge.on_downlink(cb)
bridge.close()
Agents
v.agents.attach(call_sid, agent_name)
v.agents.detach(call_sid)
v.agents.list(org_id=None)
Streams
streams = Streams(base_url="https://app.clutchcall.dev",
api_key=KEY, org_id="org_abc")
Control plane
streams.live_inputs.create(name="My Show", codecs=None, recording_profile=None)
# → (LiveInput, stream_key) # secret returned ONCE
streams.live_inputs.get(id=...) # → LiveInput
streams.live_inputs.list(org_id=..., cursor=None, limit=None)
streams.live_inputs.rotate_stream_key(id=...) # → {"stream_key": ...}
streams.live_inputs.delete(id=...)
streams.signing_keys.create(org_id=..., label=...)
streams.signing_keys.list(org_id=...)
streams.signing_keys.retire(id=...)
streams.api_keys.create(org_id=..., label=..., scopes=[...])
streams.api_keys.list(org_id=...)
streams.api_keys.revoke(id=...)
streams.webhooks.create(org_id=..., url=..., events=[...])
streams.webhooks.list(org_id=...)
streams.webhooks.delete(id=...)
streams.events.list_deliveries(webhook_id=..., cursor=None, limit=None)
streams.analytics.viewer_minutes(org_id=..., from_=..., to=..., group_by=None)
streams.analytics.pop_cache(org_id=..., from_=..., to=...)
LiveInput:
input_.external_input_id: str
input_.signed_playback_url(ttl_seconds=3600, scopes=None) # → {"url", "expires_at"}
Data plane
pub = BroadcastPublisher.open(input_id=..., stream_key=..., codecs={...})
pub.write(chunk: bytes)
pub.close(reason=None)
viewer = BroadcastViewer.open(signed_url,
on_chunk=lambda init, chunk: ...,
on_close=lambda reason: ...)
viewer.close()
Robotics
r = Robotics(relay_host="relay.clutchcall.dev",
token=os.environ["CLUTCHCALL_RELAY_TOKEN"],
robot_id="turtlebot-7")
pub = r.publish_telemetry(topic="odom",
type_name="nav_msgs/msg/Odometry",
qos=QoSProfile(reliability="reliable", depth=10))
pub.write(cdr_bytes)
pub.close()
pub = r.publish_command(topic="cmd_vel",
type_name="geometry_msgs/msg/Twist")
pub.write(twist_bytes)
sub = r.subscribe_telemetry(topic="odom",
on_frame=lambda payload, type_name: ...)
sub = r.subscribe_command(topic="cmd_vel",
on_frame=lambda payload, type_name: ...)
sub.close()
QoSProfile:
QoSProfile(reliability="reliable" | "best_effort",
durability="volatile" | "transient_local",
depth=int)
Games
g = Games(relay_host="relay.clutchcall.dev",
token=...,
room_id="duel-42",
player_id="alice") # omit for authority
Player
sub = g.subscribe_state(on_state=lambda bytes_: render(bytes_))
input_ = g.publish_input()
input_.write(serialize_input(local_input))
events = g.publish_event()
events.send({"kind": "chat", "text": "gg"})
g.subscribe_events(on_event=lambda evt, from_player_id: dispatch(evt))
Authority (no player_id)
state = g.publish_state(tick_hz=30)
state.write(serialize_state(world))
g.subscribe_inputs(on_input=lambda player_id, payload: apply_input(player_id, payload))
g.subscribe_events(on_event=lambda evt, from_player_id: ...)
Data
d = Data(relay_host="relay.clutchcall.dev",
token=os.environ["CLUTCHCALL_DATA_TOKEN"],
client_id="device-7")
d.publish(topic="sensors/room1/temperature",
payload=b"23.5",
reliable=False,
retain=False)
sub = d.subscribe(topic_filter="sensors/+/temperature",
on_message=lambda msg: print(msg.topic, msg.payload, msg.from_client_id))
sub.close()
DataMessage:
msg.topic: str
msg.payload: bytes
msg.from_client_id: str
msg.retain: bool
msg.ts_us: int
Realtime Tracks
See Realtime Tracks. Common methods:moqt = MoqtClient.connect("quic://relay.clutchcall.dev", token, on_state=...)
moqt.publish_frame(namespace, name, capability=None, schema_tag=None)
moqt.subscribe_frame(namespace, name, on_frame=...)
moqt.publish_audio(namespace, name, capability=None, codec="opus")
moqt.subscribe_audio(namespace, name, on_frame=...)
moqt.publish_video(...) / moqt.subscribe_video(...)
moqt.publish_text(...) / moqt.subscribe_text(...)
moqt.subscribe_namespace(prefix=[...], on_namespace=lambda suffix, active: ...)
moqt.connection_rtt_us()
moqt.max_datagram_size()
moqt.close()
Legacy RPC
The rootClutchCallClient (dial, originate_bulk, hangup,
barge, push_audio, …) remains for backwards compat. New code should
prefer the Voice modality.
