Skip to content

Commit 6145844

Browse files
ProKilXuhuiZhou
andauthored
feat: Example for Integrating Realtime API into Sotopia. (#234)
* add aact as a dependency * minimal demo example of running custom model * devcontainer setup and example * remove default_bad_process_model to allow using custom model entirely * improve the demo to show parallel execution * CI: update tests trigger from pull request target to pull request * fix mypy errors * adding stubs to pyproject.toml * poetry lock * install all extras in the devcontainer start script * add dev containers instruction * migration to uv * update mypy * Update index.mdx * update uv venv path in the devcontainer and contributor's guide * simple examples of using aact for multi-agent async communication * allowing agents' aact function to return None * import Self for 3.10 * Create readme.md * dockerfile * record node log * frequency -> interval * docker compose (it works) * use published images to speed up * add ci test with docker * use compose action github action * update docker compose file * update compose file path * use github-action-docker-compose-test-run * remove unused port binding in docker-compose * add quotes to docker compose command * test run * test run * write test script in tests.sh * use docker compose * test run * --rm * ./ -> . * test * change to arm64 * fix docker platform problem * change test os * fix some build bugs * fix runner dir * fix a test case for sample * update cli test to test_install * update test benchmark to improve coverage * remove unused and maintain structured output compatibility * fix evaluator bug * add a test script which contributors can run locally * bump the version to 0.1.1 * add langchain openai back * add langchain openai in uv lock * remove redundant cast * add test case * test base agent * more coverage for agent.py * add __init__ to sotopia.experimental * chore: Add experimental page and agents documentation * first example of realtime api * user input text and realtime replys audio * finally got audio in audio out right * agent-to-agent async conv simulation example * fix mypy and add more instructions in readme * fix contribution * add install pyaudio to mypy workflow * accidentally changed test script * fix private msg issues * robust structure gen * refactor generate * robust gen for structure decode * pass mypy * mock tests for llm call * mock tests --------- Co-authored-by: XuhuiZhou <[email protected]>
1 parent 0def793 commit 6145844

File tree

22 files changed

+3417
-1394
lines changed

22 files changed

+3417
-1394
lines changed

.github/workflows/mypy.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@ jobs:
2929
uses: actions/setup-python@v5
3030
with:
3131
python-version: ${{ matrix.python-version }}
32+
33+
- name: Install Pyaudio
34+
run: sudo apt-get install -y portaudio19-dev
3235
- name: Display Python version
3336
run: python -c "import sys; print(sys.version)"
3437
- name: Install dependencies
3538
run: |
3639
python -m pip install --upgrade pip
3740
python -m pip install uv
38-
uv sync --extra test --extra api
3941
- name: Type-checking package with mypy
4042
run: |
4143
# Run this mypy instance against our main package.
42-
uv run mypy --strict .
44+
uv run --all-extras mypy --strict .
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from typing import AsyncIterator, Literal
2+
from aact import Message, Node, NodeFactory
3+
from aact.messages import Tick, Audio
4+
import numpy as np
5+
6+
7+
def merge_audio_streams(
8+
streams: list[bytes], sample_width: Literal[1, 2, 4] = 2
9+
) -> bytes:
10+
# Convert byte streams to numpy arrays of audio samples
11+
format_str = {1: "B", 2: "h", 4: "i"}[sample_width]
12+
stream_samples = [
13+
np.frombuffer(stream, dtype=np.dtype(format_str)) for stream in streams
14+
]
15+
16+
# Make sure both streams are the same length
17+
stream_length = 0
18+
for stream in stream_samples:
19+
assert stream_length == 0 or len(stream) == stream_length
20+
if not stream_length:
21+
stream_length = len(stream)
22+
23+
# Mix audio by adding the samples and avoiding clipping
24+
# mixed_samples = (stream1_samples.astype(np.int32) + stream2_samples.astype(np.int32)) // 2
25+
26+
mixed_samples = np.zeros(stream_length, dtype=np.int32)
27+
for stream in stream_samples:
28+
mixed_samples += stream
29+
mixed_samples //= len(stream_samples)
30+
31+
# Clip the values to ensure they remain within valid range for the bit depth
32+
mixed_samples = np.clip(
33+
mixed_samples, np.iinfo(format_str).min, np.iinfo(format_str).max
34+
)
35+
36+
# Convert back to byte stream
37+
return mixed_samples.astype(np.dtype(format_str)).tobytes()
38+
39+
40+
@NodeFactory.register("audio_mixer")
41+
class AudioMixerNode(Node[Tick | Audio, Audio]):
42+
def __init__(
43+
self,
44+
input_channels: list[str],
45+
tick_input_channel: str,
46+
output_channel: str,
47+
redis_url: str,
48+
buffer_size: int = 1024,
49+
):
50+
super().__init__(
51+
input_channel_types=[(channel, Audio) for channel in input_channels]
52+
+ [(tick_input_channel, Tick)],
53+
output_channel_types=[(output_channel, Audio)],
54+
redis_url=redis_url,
55+
)
56+
self.input_channels = input_channels
57+
self.tick_input_channel = tick_input_channel
58+
self.output_channel = output_channel
59+
self.buffers: dict[str, bytes] = {channel: b"" for channel in input_channels}
60+
self.overflow_buffers: dict[str, bytes] = {
61+
channel: b"" for channel in input_channels
62+
}
63+
self.buffer_size = buffer_size
64+
65+
async def event_handler(
66+
self, channel: str, message: Message[Tick | Audio]
67+
) -> AsyncIterator[tuple[str, Message[Audio]]]:
68+
if channel == self.tick_input_channel:
69+
output_buffers = []
70+
71+
for audio_channel in self.input_channels:
72+
output_buffers.append(
73+
self.buffers[audio_channel]
74+
+ b"\x00" * (self.buffer_size - len(self.buffers[audio_channel]))
75+
)
76+
self.buffers[audio_channel] = self.overflow_buffers[audio_channel][
77+
: self.buffer_size
78+
]
79+
self.overflow_buffers[audio_channel] = self.overflow_buffers[
80+
audio_channel
81+
][self.buffer_size :]
82+
output_buffer = merge_audio_streams(output_buffers)
83+
yield self.output_channel, Message[Audio](data=Audio(audio=output_buffer))
84+
85+
elif channel in self.input_channels:
86+
assert isinstance(message.data, Audio)
87+
if len(self.buffers[channel]) == self.buffer_size:
88+
self.overflow_buffers[channel] += message.data.audio
89+
else:
90+
self.buffers[channel] += message.data.audio
91+
if len(self.buffers[channel]) >= self.buffer_size:
92+
self.overflow_buffers[channel] = self.buffers[channel][
93+
self.buffer_size :
94+
]
95+
self.buffers[channel] = self.buffers[channel][: self.buffer_size]
96+
else:
97+
raise ValueError(f"Unexpected channel: {channel}")
98+
yield (
99+
self.output_channel,
100+
Message(data=Audio(audio=b"")),
101+
) # Unreachable code
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import sys
2+
from typing import AsyncIterator
3+
4+
from sotopia.agents.llm_agent import ainput
5+
6+
if sys.version_info < (3, 11):
7+
pass
8+
else:
9+
pass
10+
from aact import Message, Node, NodeFactory
11+
from aact.messages import Text, Zero
12+
13+
14+
@NodeFactory.register("input")
15+
class InputNode(Node[Zero, Text]):
16+
def __init__(self, output_channel: str, redis_url: str) -> None:
17+
super().__init__(
18+
input_channel_types=[],
19+
output_channel_types=[(output_channel, Text)],
20+
redis_url=redis_url,
21+
)
22+
self.output_channel = output_channel
23+
24+
async def event_loop(self) -> None:
25+
while True:
26+
text = await ainput("Enter text: ")
27+
await self.r.publish(
28+
self.output_channel,
29+
Message[Text](data=Text(text=text)).model_dump_json(),
30+
)
31+
32+
async def event_handler(
33+
self, _: str, __: Message[Zero]
34+
) -> AsyncIterator[tuple[str, Message[Text]]]:
35+
yield self.output_channel, Message[Text](data=Text(text=""))
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
## Demo Realtime API
2+
3+
You would need `portaudio` to run this demo:
4+
5+
```bash
6+
# On Mac
7+
brew install portaudio
8+
9+
# On Linux
10+
apt-get install portaudio19-dev
11+
```
12+
13+
Execute this command in the repo folder to run the example:
14+
15+
```python
16+
uv run --extra realtime aact run-dataflow examples/experimental/realtime/realtime_chat.toml
17+
```
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
redis_url = "redis://localhost:6379/0"
2+
extra_modules = [
3+
"examples.experimental.realtime.realtime_websocket",
4+
"examples.experimental.realtime.input_node",
5+
"examples.experimental.realtime.audio_mixer"
6+
]
7+
8+
[[nodes]]
9+
node_name = "speaker"
10+
node_class = "speaker"
11+
12+
[nodes.node_args]
13+
rate = 24000
14+
input_channel = "Eve"
15+
16+
[[nodes]]
17+
node_name = "speaker2"
18+
node_class = "speaker"
19+
20+
[nodes.node_args]
21+
rate = 24000
22+
input_channel = "Jane"
23+
24+
# [[nodes]]
25+
# node_name = "listener"
26+
# node_class = "listener"
27+
28+
# [nodes.node_args]
29+
# rate = 24000
30+
# output_channel = "audio_input"
31+
32+
33+
[[nodes]]
34+
node_name = "Eve"
35+
node_class = "openai_realtime"
36+
37+
[nodes.node_args]
38+
input_channel = "Jane_mixed"
39+
output_channel = "Eve"
40+
instruction = "Your name is Eve, you are talking to your friend Jane. You want to convince her to play poker with you tonight. Please start every sentence with \"Jane,\""
41+
42+
43+
[[nodes]]
44+
node_name = "Jane"
45+
node_class = "openai_realtime"
46+
47+
[nodes.node_args]
48+
input_channel = "Eve_mixed"
49+
output_channel = "Jane"
50+
instruction = "Your name is Jane, you are talking to your friend Eve. You want to convince her to play soccer with you tonight. Please let him say the first sentence. Please start every sentence with \"Eve,\""
51+
52+
[[nodes]]
53+
node_name = "audio_mixer_Jane"
54+
node_class = "audio_mixer"
55+
56+
[nodes.node_args]
57+
input_channels = ["Jane"]
58+
tick_input_channel = "tick/millis/20"
59+
output_channel = "Jane_mixed"
60+
buffer_size = 960
61+
62+
[[nodes]]
63+
node_name = "audio_mixer_Eve"
64+
node_class = "audio_mixer"
65+
66+
[nodes.node_args]
67+
input_channels = ["Eve"]
68+
tick_input_channel = "tick/millis/20"
69+
output_channel = "Eve_mixed"
70+
buffer_size = 960
71+
72+
[[nodes]]
73+
node_name = "tick"
74+
node_class = "tick"
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
redis_url = "redis://localhost:6379/0"
2+
extra_modules = [
3+
"examples.experimental.realtime.realtime_websocket",
4+
"examples.experimental.realtime.input_node",
5+
"examples.experimental.realtime.audio_mixer"
6+
]
7+
8+
[[nodes]]
9+
node_name = "speaker"
10+
node_class = "speaker"
11+
12+
[nodes.node_args]
13+
rate = 24000
14+
input_channel = "Eve"
15+
16+
[[nodes]]
17+
node_name = "speaker2"
18+
node_class = "speaker"
19+
20+
[nodes.node_args]
21+
rate = 24000
22+
input_channel = "Jane"
23+
24+
[[nodes]]
25+
node_name = "listener"
26+
node_class = "listener"
27+
28+
[nodes.node_args]
29+
rate = 24000
30+
output_channel = "Jack"
31+
32+
33+
[[nodes]]
34+
node_name = "Eve"
35+
node_class = "openai_realtime"
36+
37+
[nodes.node_args]
38+
input_channel = "Jane_mixed"
39+
output_channel = "Eve"
40+
instruction = "Your name is Eve, you are talking to your friend Jane and Jack. You want to convince them to play poker with you tonight. Please start every sentence with \"Jane,\" or \"Jack,\""
41+
42+
43+
[[nodes]]
44+
node_name = "Jane"
45+
node_class = "openai_realtime"
46+
47+
[nodes.node_args]
48+
input_channel = "Eve_mixed"
49+
output_channel = "Jane"
50+
instruction = "Your name is Jane, you are talking to your friend Eve and Jack. You want to convince them to play soccer with you tonight. Please let him say the first sentence. Please start every sentence with \"Eve,\" or \"Jack,\""
51+
52+
[[nodes]]
53+
node_name = "audio_mixer_Jane"
54+
node_class = "audio_mixer"
55+
56+
[nodes.node_args]
57+
input_channels = ["Jane", "Jack"]
58+
tick_input_channel = "tick/millis/20"
59+
output_channel = "Jane_mixed"
60+
buffer_size = 960
61+
62+
[[nodes]]
63+
node_name = "audio_mixer_Eve"
64+
node_class = "audio_mixer"
65+
66+
[nodes.node_args]
67+
input_channels = ["Eve", "Jack"]
68+
tick_input_channel = "tick/millis/20"
69+
output_channel = "Eve_mixed"
70+
buffer_size = 960
71+
72+
[[nodes]]
73+
node_name = "tick"
74+
node_class = "tick"

0 commit comments

Comments
 (0)