Skip to content

Commit d702ebd

Browse files
committed
Add frame_order parameter to SyncParallelPipeline
Adds a FrameOrder enum with ARRIVAL (default, existing behavior) and PIPELINE (pushes frames in pipeline definition order). This lets callers guarantee output ordering between parallel pipelines — e.g. ensuring image frames precede audio frames — without needing a separate reordering processor downstream. Updates the 05-sync-speech-and-image example to use FrameOrder.PIPELINE, removing the ImageBeforeAudioReorderer class entirely.
1 parent 26fc238 commit d702ebd

File tree

4 files changed

+215
-89
lines changed

4 files changed

+215
-89
lines changed

changelog/4029.added.2.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Added `frame_order` parameter to `SyncParallelPipeline`. Set `frame_order=FrameOrder.PIPELINE` to push synchronized output frames in pipeline definition order (all frames from the first pipeline, then the second, etc.) instead of the default arrival order.

examples/foundational/05-sync-speech-and-image.py

Lines changed: 9 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,16 @@
1212
from loguru import logger
1313

1414
from pipecat.frames.frames import (
15-
AggregatedTextFrame,
1615
DataFrame,
1716
Frame,
1817
LLMContextFrame,
1918
LLMFullResponseStartFrame,
2019
OutputImageRawFrame,
2120
TextFrame,
22-
TTSAudioRawFrame,
23-
TTSStartedFrame,
24-
TTSStoppedFrame,
25-
TTSTextFrame,
2621
)
2722
from pipecat.pipeline.pipeline import Pipeline
2823
from pipecat.pipeline.runner import PipelineRunner
29-
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
24+
from pipecat.pipeline.sync_parallel_pipeline import FrameOrder, SyncParallelPipeline
3025
from pipecat.pipeline.task import PipelineTask
3126
from pipecat.processors.aggregators.llm_context import LLMContext
3227
from pipecat.processors.aggregators.sentence import SentenceAggregator
@@ -63,61 +58,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
6358
await self.push_frame(frame, direction)
6459

6560

66-
class ImageBeforeAudioReorderer(FrameProcessor):
67-
"""Ensures each image frame precedes its corresponding TTS audio frames.
68-
69-
SyncParallelPipeline guarantees that each image is in the same synchronized
70-
batch as its audio, but doesn't guarantee which branch's output comes first.
71-
This processor detects when TTS frames arrive before their image and holds
72-
them until the image arrives.
73-
74-
All frames pass through immediately unless we detect an ordering problem:
75-
TTS frames arrived without a preceding image for the current batch (identified
76-
by context_id). In that case, the TTS frames are held until the next image
77-
frame, which is pushed first.
78-
"""
79-
80-
def __init__(self):
81-
super().__init__()
82-
self._held_tts_frames = []
83-
self._seen_image = False
84-
self._current_context_id = None
85-
86-
async def process_frame(self, frame: Frame, direction: FrameDirection):
87-
await super().process_frame(frame, direction)
88-
89-
if isinstance(frame, OutputImageRawFrame):
90-
self._seen_image = True
91-
if self._held_tts_frames:
92-
# Image arrived after TTS frames — push image first, then release held frames.
93-
logger.debug("ImageBeforeAudioReorderer: reordered — moved image before audio")
94-
await self.push_frame(frame, direction)
95-
for f in self._held_tts_frames:
96-
await self.push_frame(f, direction)
97-
self._held_tts_frames = []
98-
else:
99-
logger.debug(
100-
"ImageBeforeAudioReorderer: no reorder needed — image was already first"
101-
)
102-
await self.push_frame(frame, direction)
103-
elif isinstance(
104-
frame,
105-
(AggregatedTextFrame, TTSStartedFrame, TTSAudioRawFrame, TTSStoppedFrame, TTSTextFrame),
106-
):
107-
# A new context_id means a new batch — reset image tracking.
108-
context_id = frame.context_id
109-
if context_id and context_id != self._current_context_id:
110-
self._current_context_id = context_id
111-
self._seen_image = False
112-
113-
if self._seen_image:
114-
await self.push_frame(frame, direction)
115-
else:
116-
self._held_tts_frames.append(frame)
117-
else:
118-
await self.push_frame(frame, direction)
119-
120-
12161
class MonthPrepender(FrameProcessor):
12262
def __init__(self):
12363
super().__init__()
@@ -197,22 +137,27 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
197137
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
198138
# wait for the input frame to be processed.
199139
#
140+
# We use `FrameOrder.PIPELINE` so that each synchronized batch of output
141+
# frames is pushed in the order the pipelines are listed: image first,
142+
# then audio. This ensures the transport receives the image before the
143+
# audio frames it should accompany.
144+
#
200145
# Note that `SyncParallelPipeline` requires the last processor in each
201146
# of the pipelines to be synchronous. In this case, we use
202-
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
147+
# `FalImageGenService` and `CartesiaHttpTTSService` which make HTTP
203148
# requests and wait for the response.
204149
pipeline = Pipeline(
205150
[
206151
llm, # LLM
207152
sentence_aggregator, # Aggregates LLM output into full sentences
208153
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
209-
[month_prepender, tts], # Create "Month: sentence" and output audio
210154
[
211155
imagegen, # Generate image
212156
MarkImageForPlaybackSync(), # Mark image as needing sync w/audio during playback
213157
],
158+
[month_prepender, tts], # Create "Month: sentence" and output audio
159+
frame_order=FrameOrder.PIPELINE,
214160
),
215-
ImageBeforeAudioReorderer(), # Ensure each image precedes its audio (important for playback)
216161
transport.output(), # Transport output
217162
]
218163
)

src/pipecat/pipeline/sync_parallel_pipeline.py

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import asyncio
1515
from dataclasses import dataclass
16+
from enum import Enum
1617
from itertools import chain
1718
from typing import List
1819

@@ -24,6 +25,25 @@
2425
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
2526

2627

28+
class FrameOrder(Enum):
29+
"""Controls the order in which synchronized frames are pushed downstream.
30+
31+
When multiple parallel pipelines produce output for the same input frame,
32+
this setting determines the order in which those output frames are pushed.
33+
34+
Attributes:
35+
ARRIVAL: Frames are pushed in the order they arrive from any pipeline.
36+
This is the default and matches the behavior of prior versions.
37+
PIPELINE: Frames are pushed in pipeline definition order — all frames
38+
from the first pipeline are pushed, then all frames from the second
39+
pipeline, and so on. Useful when the relative ordering between
40+
pipelines matters (e.g. ensuring image frames precede audio frames).
41+
"""
42+
43+
ARRIVAL = "arrival"
44+
PIPELINE = "pipeline"
45+
46+
2747
@dataclass
2848
class SyncFrame(ControlFrame):
2949
"""Control frame used to synchronize parallel pipeline processing.
@@ -109,20 +129,30 @@ class SyncParallelPipeline(BasePipeline):
109129
110130
The pipeline uses SyncFrame control frames to coordinate between parallel paths
111131
and ensure all paths have completed processing before moving to the next frame.
132+
133+
By default, output frames are pushed in the order they arrive from any pipeline
134+
(``FrameOrder.ARRIVAL``). Set ``frame_order=FrameOrder.PIPELINE`` to push frames
135+
in pipeline definition order instead — all output from the first pipeline, then
136+
the second, and so on.
112137
"""
113138

114-
def __init__(self, *args):
139+
def __init__(self, *args, frame_order: FrameOrder = FrameOrder.ARRIVAL):
115140
"""Initialize the synchronous parallel pipeline.
116141
117142
Args:
118-
*args: Variable number of processor lists, each representing a parallel pipeline path.
119-
Each argument should be a list of FrameProcessor instances.
143+
*args: Variable number of processor lists, each representing a parallel
144+
pipeline path. Each argument should be a list of FrameProcessor instances.
145+
frame_order: Controls the order in which synchronized output frames are
146+
pushed. ``FrameOrder.ARRIVAL`` (default) pushes frames in the order they arrive.
147+
``FrameOrder.PIPELINE`` pushes all frames from the first pipeline
148+
before the second, and so on.
120149
121150
Raises:
122151
Exception: If no arguments are provided.
123152
TypeError: If any argument is not a list of processors.
124153
"""
125154
super().__init__()
155+
self._frame_order = frame_order
126156

127157
if len(args) == 0:
128158
raise Exception(f"SyncParallelPipeline needs at least one argument")
@@ -215,6 +245,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
215245
to maintain proper ordering and prevent duplicate processing. Uses SyncFrame
216246
control frames to coordinate between parallel paths.
217247
248+
When ``frame_order`` is ``FrameOrder.ARRIVAL``, output frames are pushed in
249+
the order they arrive from any pipeline (via a shared queue). When it is
250+
``FrameOrder.PIPELINE``, each pipeline collects its output into a separate
251+
list and the lists are drained in pipeline definition order.
252+
218253
Args:
219254
frame: The frame to process.
220255
direction: The direction of frame flow.
@@ -235,60 +270,88 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
235270
await self.push_frame(frame, direction)
236271
return
237272

273+
use_pipeline_order = self._frame_order == FrameOrder.PIPELINE
274+
238275
# The last processor of each pipeline needs to be synchronous otherwise
239-
# this element won't work. Since, we know it should be synchronous we
276+
# this element won't work. Since we know it should be synchronous we
240277
# push a SyncFrame. Since frames are ordered we know this frame will be
241278
# pushed after the synchronous processor has pushed its data allowing us
242279
# to synchronize all the internal pipelines by waiting for the
243280
# SyncFrame in all of them.
281+
#
282+
# In ARRIVAL mode, output frames are put onto a shared main_queue as
283+
# they arrive. In PIPELINE mode, they are accumulated in a per-pipeline
284+
# list and returned so the caller can drain them in definition order.
244285
async def wait_for_sync(
245286
obj, main_queue: asyncio.Queue, frame: Frame, direction: FrameDirection
246-
):
287+
) -> list[Frame]:
247288
processor = obj["processor"]
248289
queue = obj["queue"]
290+
output_frames: list[Frame] = []
249291

250292
await processor.process_frame(frame, direction)
251293

252294
if isinstance(frame, EndFrame):
253295
new_frame = await queue.get()
254296
if isinstance(new_frame, EndFrame):
255-
await main_queue.put(new_frame)
297+
if use_pipeline_order:
298+
output_frames.append(new_frame)
299+
else:
300+
await main_queue.put(new_frame)
256301
else:
257302
while not isinstance(new_frame, EndFrame):
258-
await main_queue.put(new_frame)
303+
if use_pipeline_order:
304+
output_frames.append(new_frame)
305+
else:
306+
await main_queue.put(new_frame)
259307
queue.task_done()
260308
new_frame = await queue.get()
261309
else:
262310
await processor.process_frame(SyncFrame(), direction)
263311
new_frame = await queue.get()
264312
while not isinstance(new_frame, SyncFrame):
265-
await main_queue.put(new_frame)
313+
if use_pipeline_order:
314+
output_frames.append(new_frame)
315+
else:
316+
await main_queue.put(new_frame)
266317
queue.task_done()
267318
new_frame = await queue.get()
268319

320+
return output_frames
321+
269322
if direction == FrameDirection.UPSTREAM:
270323
# If we get an upstream frame we process it in each sink.
271-
await asyncio.gather(
324+
frames_per_pipeline = await asyncio.gather(
272325
*[wait_for_sync(s, self._up_queue, frame, direction) for s in self._sinks]
273326
)
274327
elif direction == FrameDirection.DOWNSTREAM:
275328
# If we get a downstream frame we process it in each source.
276-
await asyncio.gather(
329+
frames_per_pipeline = await asyncio.gather(
277330
*[wait_for_sync(s, self._down_queue, frame, direction) for s in self._sources]
278331
)
279332

280-
seen_ids = set()
281-
while not self._up_queue.empty():
282-
frame = await self._up_queue.get()
283-
if frame.id not in seen_ids:
284-
await self.push_frame(frame, FrameDirection.UPSTREAM)
285-
seen_ids.add(frame.id)
286-
self._up_queue.task_done()
287-
288-
seen_ids = set()
289-
while not self._down_queue.empty():
290-
frame = await self._down_queue.get()
291-
if frame.id not in seen_ids:
292-
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
293-
seen_ids.add(frame.id)
294-
self._down_queue.task_done()
333+
if use_pipeline_order:
334+
# Push frames in pipeline definition order, deduplicating by id.
335+
seen_ids = set()
336+
for pipeline_frames in frames_per_pipeline:
337+
for f in pipeline_frames:
338+
if f.id not in seen_ids:
339+
await self.push_frame(f, direction)
340+
seen_ids.add(f.id)
341+
else:
342+
# ARRIVAL mode: drain the shared queues in the order frames arrived.
343+
seen_ids = set()
344+
while not self._up_queue.empty():
345+
frame = await self._up_queue.get()
346+
if frame.id not in seen_ids:
347+
await self.push_frame(frame, FrameDirection.UPSTREAM)
348+
seen_ids.add(frame.id)
349+
self._up_queue.task_done()
350+
351+
seen_ids = set()
352+
while not self._down_queue.empty():
353+
frame = await self._down_queue.get()
354+
if frame.id not in seen_ids:
355+
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
356+
seen_ids.add(frame.id)
357+
self._down_queue.task_done()

0 commit comments

Comments
 (0)