Skip to content

Commit 06f7da4

Browse files
committed
Clarify SyncParallelPipeline docstrings
Rewrite docstrings to more clearly explain what SyncParallelPipeline does: hold all output until every parallel branch finishes, so frames produced in response to a single input are released together.
1 parent d702ebd commit 06f7da4

File tree

1 file changed

+36
-29
lines changed

1 file changed

+36
-29
lines changed

src/pipecat/pipeline/sync_parallel_pipeline.py

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@
44
# SPDX-License-Identifier: BSD 2-Clause License
55
#
66

7-
"""Synchronous parallel pipeline implementation for concurrent frame processing.
7+
"""Synchronized parallel pipeline that holds output until all branches finish.
88
9-
This module provides a pipeline that processes frames through multiple parallel
10-
pipelines simultaneously, synchronizing their output to maintain frame ordering
11-
and prevent duplicate processing.
9+
A SyncParallelPipeline fans each inbound frame out to multiple parallel pipelines
10+
and waits for every pipeline to finish processing before releasing any of the
11+
resulting output frames. This ensures that all frames produced in response to a
12+
single input frame are emitted together.
13+
14+
System frames (except EndFrame) are exempt from this synchronization — they pass
15+
straight through without waiting, since they are expected to race ahead of
16+
regular data frames.
1217
"""
1318

1419
import asyncio
@@ -46,20 +51,21 @@ class FrameOrder(Enum):
4651

4752
@dataclass
4853
class SyncFrame(ControlFrame):
49-
"""Control frame used to synchronize parallel pipeline processing.
54+
"""Sentinel frame used to detect when a parallel pipeline has finished processing.
5055
51-
This frame is sent through parallel pipelines to determine when the
52-
internal pipelines have finished processing a batch of frames.
56+
After sending a real frame into a parallel pipeline, a SyncFrame is sent
57+
behind it. When the SyncFrame emerges from the pipeline's output, we know
58+
all output frames for the preceding input have been produced.
5359
"""
5460

5561
pass
5662

5763

5864
class SyncParallelPipelineSource(FrameProcessor):
59-
"""Source processor for synchronous parallel pipeline processing.
65+
"""Bookend processor placed at the start of each parallel pipeline.
6066
61-
Routes frames to parallel pipelines and collects upstream responses
62-
for synchronization purposes.
67+
Forwards downstream frames into the pipeline and captures upstream frames
68+
into a queue so the parent SyncParallelPipeline can release them later.
6369
"""
6470

6571
def __init__(self, upstream_queue: asyncio.Queue):
@@ -88,10 +94,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
8894

8995

9096
class SyncParallelPipelineSink(FrameProcessor):
91-
"""Sink processor for synchronous parallel pipeline processing.
97+
"""Bookend processor placed at the end of each parallel pipeline.
9298
93-
Collects downstream frames from parallel pipelines and routes
94-
upstream frames back through the pipeline.
99+
Captures downstream output frames into a queue so the parent
100+
SyncParallelPipeline can release them later, and forwards upstream
101+
frames back through the pipeline.
95102
"""
96103

97104
def __init__(self, downstream_queue: asyncio.Queue):
@@ -120,15 +127,20 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
120127

121128

122129
class SyncParallelPipeline(BasePipeline):
123-
"""Pipeline that processes frames through multiple parallel pipelines synchronously.
130+
"""Fans each input frame to parallel pipelines then holds output until every pipeline finishes.
131+
132+
For each inbound frame the pipeline:
124133
125-
Creates multiple parallel processing paths that all receive the same input frames
126-
and produces synchronized output. Each parallel path is a separate pipeline that
127-
processes frames independently, with synchronization points to ensure consistent
128-
ordering and prevent duplicate frame processing.
134+
1. Sends the frame into every parallel pipeline.
135+
2. Sends a ``SyncFrame`` sentinel behind it in each pipeline.
136+
3. Waits until every pipeline has produced its ``SyncFrame``, meaning all
137+
output for that input is ready.
138+
4. Releases the collected output frames (deduplicating by frame id, since
139+
the same frame may emerge from more than one branch).
129140
130-
The pipeline uses SyncFrame control frames to coordinate between parallel paths
131-
and ensure all paths have completed processing before moving to the next frame.
141+
System frames (except ``EndFrame``) bypass this mechanism entirely — they are
142+
forwarded through each pipeline and pushed immediately, since system frames
143+
are expected to race ahead of regular data frames.
132144
133145
By default, output frames are pushed in the order they arrive from any pipeline
134146
(``FrameOrder.ARRIVAL``). Set ``frame_order=FrameOrder.PIPELINE`` to push frames
@@ -239,16 +251,11 @@ async def cleanup(self):
239251
await asyncio.gather(*[p.cleanup() for p in self._pipelines])
240252

241253
async def process_frame(self, frame: Frame, direction: FrameDirection):
242-
"""Process frames through all parallel pipelines with synchronization.
243-
244-
Distributes frames to all parallel pipelines and synchronizes their output
245-
to maintain proper ordering and prevent duplicate processing. Uses SyncFrame
246-
control frames to coordinate between parallel paths.
254+
"""Send a frame through all parallel pipelines and release output once all finish.
247255
248-
When ``frame_order`` is ``FrameOrder.ARRIVAL``, output frames are pushed in
249-
the order they arrive from any pipeline (via a shared queue). When it is
250-
``FrameOrder.PIPELINE``, each pipeline collects its output into a separate
251-
list and the lists are drained in pipeline definition order.
256+
System frames (except EndFrame) skip synchronization and pass straight
257+
through. All other frames are fanned out to every pipeline, and output is
258+
held until every pipeline signals completion (via SyncFrame).
252259
253260
Args:
254261
frame: The frame to process.

0 commit comments

Comments
 (0)