# Python Reference

> Every modality, every method, every option.

The Python SDK ships **five modality sub-modules** plus a raw MoQT client.
Each sub-module is its own client class. Mix them freely in one process —
they share the connection underneath.

```python
from clutchcall.voice    import Voice
from clutchcall.streams  import Streams, BroadcastPublisher, BroadcastViewer
from clutchcall.robotics import Robotics
from clutchcall.games    import Games
from clutchcall.data     import Data
from clutchcall.moqt     import MoqtClient
```

## Voice

```python
v = Voice(base_url="https://app.clutchcall.dev",
          api_key=os.environ["CLUTCHCALL_API_KEY"],
          org_id="org_abc")
```

### Calls

```python
call = v.calls.originate(to="+15551234567",
                         from_="+15558675309",
                         trunk_id="trunk_main",
                         agent="healthcare-assistant")
# → Call

v.calls.get(sid)                                # → Call
v.calls.list(org_id=None, cursor=None, limit=None)  # → (calls, cursor?)
v.calls.transfer(sid, to=..., trunk_id=None)    # → Call
v.calls.hangup(sid)                             # → None
v.calls.terminate(sid)                          # → None
```

`Call` exposes:

```python
call.sid:    str
call.status: str       # "dialing" | "ringing" | "in_progress" | "completed" | "failed" | "no_answer"
call.from_:  str
call.to:     str
call.refresh()         # → CallData
call.hangup()
call.transfer(to=..., trunk_id=None)  # → Call
call.on_status(cb)     # → Unsubscribe
```

### AudioBridge

```python
bridge = v.audio_bridge.attach(call.sid, codec="opus",
                               on_uplink=lambda frame, ts_us: my_asr.feed(frame))

bridge.publish_uplink(frame: bytes)
bridge.publish_downlink(frame: bytes)
bridge.on_uplink(cb)
bridge.on_downlink(cb)
bridge.close()
```

### Agents

```python
v.agents.attach(call_sid, agent_name)
v.agents.detach(call_sid)
v.agents.list(org_id=None)
```

## Streams

```python
streams = Streams(base_url="https://app.clutchcall.dev",
                  api_key=KEY, org_id="org_abc")
```

### Control plane

```python
streams.live_inputs.create(name="My Show", codecs=None, recording_profile=None)
                                       # → (LiveInput, stream_key)   # secret returned ONCE
streams.live_inputs.get(id=...)        # → LiveInput
streams.live_inputs.list(org_id=..., cursor=None, limit=None)
streams.live_inputs.rotate_stream_key(id=...)   # → {"stream_key": ...}
streams.live_inputs.delete(id=...)

streams.signing_keys.create(org_id=..., label=...)
streams.signing_keys.list(org_id=...)
streams.signing_keys.retire(id=...)

streams.api_keys.create(org_id=..., label=..., scopes=[...])
streams.api_keys.list(org_id=...)
streams.api_keys.revoke(id=...)

streams.webhooks.create(org_id=..., url=..., events=[...])
streams.webhooks.list(org_id=...)
streams.webhooks.delete(id=...)
streams.events.list_deliveries(webhook_id=..., cursor=None, limit=None)

streams.analytics.viewer_minutes(org_id=..., from_=..., to=..., group_by=None)
streams.analytics.pop_cache(org_id=..., from_=..., to=...)
```

`LiveInput`:

```python
input_.external_input_id: str
input_.signed_playback_url(ttl_seconds=3600, scopes=None)  # → {"url", "expires_at"}
```

### Data plane

```python
pub = BroadcastPublisher.open(input_id=..., stream_key=..., codecs={...})
pub.write(chunk: bytes)
pub.close(reason=None)

viewer = BroadcastViewer.open(signed_url,
                              on_chunk=lambda init, chunk: ...,
                              on_close=lambda reason: ...)
viewer.close()
```

## Robotics

```python
r = Robotics(relay_host="relay.clutchcall.dev",
             token=os.environ["CLUTCHCALL_RELAY_TOKEN"],
             robot_id="turtlebot-7")
```

```python
pub = r.publish_telemetry(topic="odom",
                          type_name="nav_msgs/msg/Odometry",
                          qos=QoSProfile(reliability="reliable", depth=10))
pub.write(cdr_bytes)
pub.close()

pub = r.publish_command(topic="cmd_vel",
                        type_name="geometry_msgs/msg/Twist")
pub.write(twist_bytes)

sub = r.subscribe_telemetry(topic="odom",
                            on_frame=lambda payload, type_name: ...)
sub = r.subscribe_command(topic="cmd_vel",
                          on_frame=lambda payload, type_name: ...)
sub.close()
```

`QoSProfile`:

```python
QoSProfile(reliability="reliable" | "best_effort",
           durability="volatile"  | "transient_local",
           depth=int)
```

## Games

```python
g = Games(relay_host="relay.clutchcall.dev",
          token=...,
          room_id="duel-42",
          player_id="alice")   # omit for authority
```

### Player

```python
sub = g.subscribe_state(on_state=lambda bytes_: render(bytes_))
input_ = g.publish_input()
input_.write(serialize_input(local_input))

events = g.publish_event()
events.send({"kind": "chat", "text": "gg"})
g.subscribe_events(on_event=lambda evt, from_player_id: dispatch(evt))
```

### Authority (no `player_id`)

```python
state = g.publish_state(tick_hz=30)
state.write(serialize_state(world))

g.subscribe_inputs(on_input=lambda player_id, payload: apply_input(player_id, payload))
g.subscribe_events(on_event=lambda evt, from_player_id: ...)
```

## Data

```python
d = Data(relay_host="relay.clutchcall.dev",
         token=os.environ["CLUTCHCALL_DATA_TOKEN"],
         client_id="device-7")
```

```python
d.publish(topic="sensors/room1/temperature",
          payload=b"23.5",
          reliable=False,
          retain=False)

sub = d.subscribe(topic_filter="sensors/+/temperature",
                  on_message=lambda msg: print(msg.topic, msg.payload, msg.from_client_id))
sub.close()
```

`DataMessage`:

```python
msg.topic:          str
msg.payload:        bytes
msg.from_client_id: str
msg.retain:         bool
msg.ts_us:          int
```

## Realtime Tracks

See [Realtime Tracks](/concepts/realtime-tracks). Common methods:

```python
moqt = MoqtClient.connect("quic://relay.clutchcall.dev", token, on_state=...)

moqt.publish_frame(namespace, name, capability=None, schema_tag=None)
moqt.subscribe_frame(namespace, name, on_frame=...)
moqt.publish_audio(namespace, name, capability=None, codec="opus")
moqt.subscribe_audio(namespace, name, on_frame=...)
moqt.publish_video(...) / moqt.subscribe_video(...)
moqt.publish_text(...)  / moqt.subscribe_text(...)

moqt.subscribe_namespace(prefix=[...], on_namespace=lambda suffix, active: ...)
moqt.connection_rtt_us()
moqt.max_datagram_size()
moqt.close()
```

## Legacy RPC

The root `ClutchCallClient` (`dial`, `originate_bulk`, `hangup`,
`barge`, `push_audio`, …) remains for backwards compat. New code should
prefer the [Voice modality](#voice).
