Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions examples/video_understanding/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,16 @@ The container.yaml file is a configuration file that manages dependencies and se

## Running the Example

3. Run the video understanding example, currently only supports CLI usage:
1. Run the video understanding example via Webpage:

```bash
python run_webpage.py
```

First, select a video or upload a video file on the left; after the video preprocessing is completed, ask questions about the video content on the right.


2. Run the video understanding example, currently only supports CLI usage:

```bash
python run_cli.py
Expand All @@ -96,7 +105,6 @@ The container.yaml file is a configuration file that manages dependencies and se
First time you need to input the video file path, it will take a while to preprocess the video and store the information into vector database.
After the video is preprocessed, you can input your question about the video and the system will answer it. Note that the agent may give the wrong or vague answer, especially some questions are related the name of the characters in the video.


## Troubleshooting

If you encounter issues:
Expand All @@ -110,7 +118,10 @@ If you encounter issues:
- **Open an issue on GitHub if you can't find a solution, we will do our best to help you out!**


## Building the Example
4. Run the video understanding example, currently only supports Webpage usage:

Coming soon! This section will provide detailed instructions for building and packaging the general_dnc example step by step.
```bash
python run_webpage.py
```

First, select a video or upload a video file on the left; after the video preprocessing is completed, ask questions about the video content on the right.
519 changes: 519 additions & 0 deletions examples/video_understanding/agent/client/webpage.py

Large diffs are not rendered by default.

81 changes: 81 additions & 0 deletions examples/video_understanding/agent/conclude/webpage_conclude.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from pathlib import Path
from typing import Iterator, List

from omagent_core.advanced_components.workflow.dnc.schemas.dnc_structure import \
TaskTree
from omagent_core.engine.worker.base import BaseWorker
from omagent_core.memories.ltms.ltm import LTM
from omagent_core.models.llms.base import BaseLLMBackend
from omagent_core.models.llms.prompt import PromptTemplate
from omagent_core.utils.logger import logging
from omagent_core.utils.registry import registry
from openai import Stream
from pydantic import Field

CURRENT_PATH = root_path = Path(__file__).parents[0]


@registry.register_worker()
class WebpageConclude(BaseLLMBackend, BaseWorker):
prompts: List[PromptTemplate] = Field(
default=[
PromptTemplate.from_file(
CURRENT_PATH.joinpath("sys_prompt.prompt"), role="system"
),
PromptTemplate.from_file(
CURRENT_PATH.joinpath("user_prompt.prompt"), role="user"
),
]
)

def _run(self, dnc_structure: dict, last_output: str, *args, **kwargs):
"""A conclude node that summarizes and completes the root task.

This component acts as the final node that:
- Takes the root task and its execution results
- Generates a final conclusion/summary of the entire task execution
- Formats and presents the final output in a clear way
- Cleans up any temporary state/memory used during execution

The conclude node is responsible for providing a coherent final response that
addresses the original root task objective based on all the work done by
previous nodes.

Args:
agent_task (dict): The task tree containing the root task and results
last_output (str): The final output from previous task execution
*args: Additional arguments
**kwargs: Additional keyword arguments

Returns:
dict: Final response containing the conclusion/summary
"""
task = TaskTree(**dnc_structure)
self.callback.info(
agent_id=self.workflow_instance_id,
progress=f"Conclude",
message=f"{task.get_current_node().task}",
)
chat_complete_res = self.simple_infer(
task=task.get_root().task,
result=str(last_output),
img_placeholders="".join(
list(self.stm(self.workflow_instance_id).get("image_cache", {}).keys())
),
)
if isinstance(chat_complete_res, Iterator):
last_output = "Answer: "
for chunk in chat_complete_res:
if len(chunk.choices) > 0:
current_msg = chunk.choices[0].delta.content if chunk.choices[0].delta.content is not None else ''
last_output += current_msg
self.callback.send_answer(agent_id=self.workflow_instance_id, msg=last_output)
else:
last_output = chat_complete_res["choices"][0]["message"]["content"]
self.callback.send_answer(
agent_id=self.workflow_instance_id,
msg=f'Answer: {chat_complete_res["choices"][0]["message"]["content"]}',
)
self.callback.send_answer(agent_id=self.workflow_instance_id, msg=f"Token usage: {self.token_usage}")
self.stm(self.workflow_instance_id).clear()
return {"last_output": last_output}
252 changes: 252 additions & 0 deletions examples/video_understanding/agent/video_preprocessor/webpage_vp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import hashlib
import pickle
import time
from pathlib import Path
from typing import List, Optional, Union

import json_repair
from omagent_core.engine.worker.base import BaseWorker
from omagent_core.models.asr.stt import STT
from omagent_core.models.encoders.openai_encoder import OpenaiTextEmbeddingV3
from omagent_core.models.llms.base import BaseLLMBackend
from omagent_core.models.llms.prompt import PromptTemplate
from omagent_core.utils.registry import registry
from pydantic import Field, field_validator
from pydub import AudioSegment
from pydub.effects import normalize
from scenedetect import open_video

from ..misc.scene import VideoScenes

CURRENT_PATH = root_path = Path(__file__).parents[0]


@registry.register_worker()
class WebpageVideoPreprocessor(BaseLLMBackend, BaseWorker):
prompts: List[PromptTemplate] = Field(
default=[
PromptTemplate.from_file(
CURRENT_PATH.joinpath("sys_prompt.prompt"), role="system"
),
PromptTemplate.from_file(
CURRENT_PATH.joinpath("user_prompt.prompt"), role="user"
),
]
)
text_encoder: OpenaiTextEmbeddingV3

stt: STT
scene_detect_threshold: Union[float, int] = 27
min_scene_len: int = 1
frame_extraction_interval: int = 5
kernel_size: Optional[int] = None
show_progress: bool = True

use_cache: bool = False
cache_dir: str = "./video_cache"

@field_validator("stt", mode="before")
@classmethod
def validate_asr(cls, stt):
if isinstance(stt, STT):
return stt
elif isinstance(stt, dict):
return STT(**stt)
else:
raise ValueError("Invalid STT type.")

def calculate_md5(self, file_path):
md5_hash = hashlib.md5()
with open(file_path, "rb") as file:
for byte_block in iter(lambda: file.read(4096), b""):
md5_hash.update(byte_block)
return md5_hash.hexdigest()

def _run(self, video_path: str, *args, **kwargs):
"""
Process video files by:
1. Calculating MD5 hash of input video for caching
2. Loading video from cache if available and use_cache=True
3. Otherwise, processing video by:
- Extracting audio and video streams
- Detecting scene boundaries
- Extracting frames at specified intervals
- Generating scene summaries using LLM
- Caching results for future use

Args:
video_path (str): Path to input video file
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments

Returns:
dict: Dictionary containing video_md5 and video_path
"""
video_md5 = self.calculate_md5(video_path)
kwargs["video_md5"] = video_md5

cache_path = (
Path(self.cache_dir)
.joinpath(video_path.replace("/", "-"))
.joinpath("video_cache.pkl")
)
# Load video from cache if available
if self.use_cache and cache_path.exists():
with open(cache_path, "rb") as f:
loaded_scene = pickle.load(f)
try:
audio = AudioSegment.from_file(video_path)
audio = normalize(audio)
except Exception:
audio = None
video = VideoScenes(
stream=open_video(video_path),
audio=audio,
scenes=loaded_scene,
frame_extraction_interval=self.frame_extraction_interval,
)
self.callback.send_block(
agent_id=self.workflow_instance_id,
msg="Loaded video scenes from cache.\nResume the interrupted transfer for results with scene.summary of None.",
)
for index, scene in enumerate(video.scenes):
if scene.summary is None:
self.callback.send_block(
agent_id=self.workflow_instance_id,
msg=f"Resume the interrupted transfer for scene {index}.",
)
video_frames, time_stamps = video.get_video_frames(scene)
try:
chat_complete_res = self.infer(
input_list=[
{
"stt_res": scene.conversation,
"img_placeholders": "".join(
[
f"<image_{i}>"
for i in range(len(video_frames))
]
),
}
],
images=video_frames,
)
scene.summary = chat_complete_res[0]["choices"][0][
"message"
]["content"]
scene_info = scene.summary.get("scene", [])
events = scene.summary.get("events", [])
start_time = scene.start.get_seconds()
end_time = scene.end.get_seconds()
content = (
f"Time in video: {scene.summary.get('time', 'null')}\n"
f"Location: {scene.summary.get('location', 'null')}\n"
f"Character': {scene.summary.get('character', 'null')}\n"
f"Events: {events}\n"
f"Scene: {scene_info}\n"
f"Summary: {scene.summary.get('summary', '')}"
)
content_vector = self.text_encoder.infer([content])[0]
self.ltm[index] = {
"value": {
"video_md5": video_md5,
"content": content,
"start_time": start_time,
"end_time": end_time,
},
"embedding": content_vector,
}
except Exception as e:
self.callback.error(
f"Failed to resume scene {index}: {e}. Set to default."
)
scene.summary = {
"time": "",
"location": "",
"character": "",
"events": [],
"scene": [],
"summary": "",
}
self.stm(self.workflow_instance_id)["video"] = video.to_serializable()
# Cache the processed video scenes
with open(cache_path, "wb") as f:
pickle.dump(video.scenes, f)

# Process video if not loaded from cache
if not self.stm(self.workflow_instance_id).get("video", None):
video = VideoScenes.load(
video_path=video_path,
threshold=self.scene_detect_threshold,
min_scene_len=self.min_scene_len,
frame_extraction_interval=self.frame_extraction_interval,
show_progress=self.show_progress,
kernel_size=self.kernel_size,
)
self.stm(self.workflow_instance_id)["video"] = video.to_serializable()

for index, scene in enumerate(video.scenes):
print(f"Processing scene {index} / {len(video.scenes)}...")
audio_clip = video.get_audio_clip(scene)
if audio_clip is None:
scene.stt_res = {"text": ""}
else:
scene.stt_res = self.stt.infer(audio_clip)
video_frames, time_stamps = video.get_video_frames(scene)
try:
face_rec = registry.get_tool("FaceRecognition")
for frame in video_frames:
objs = face_rec.infer(frame)
face_rec.visual_prompting(frame, objs)
except Exception:
pass
try:
chat_complete_res = self.infer(
input_list=[
{
"stt_res": scene.conversation,
"img_placeholders": "".join(
[f"<image_{i}>" for i in range(len(video_frames))]
),
}
],
images=video_frames,
)
scene.summary = chat_complete_res[0]["choices"][0]["message"][
"content"
]
scene_info = scene.summary.get("scene", [])
events = scene.summary.get("events", [])
start_time = scene.start.get_seconds()
end_time = scene.end.get_seconds()
content = (
f"Time in video: {scene.summary.get('time', 'null')}\n"
f"Location: {scene.summary.get('location', 'null')}\n"
f"Character': {scene.summary.get('character', 'null')}\n"
f"Events: {events}\n"
f"Scene: {scene_info}\n"
f"Summary: {scene.summary.get('summary', '')}"
)
content_vector = self.text_encoder.infer([content])[0]
self.ltm[index] = {
"value": {
"video_md5": video_md5,
"content": content,
"start_time": start_time,
"end_time": end_time,
},
"embedding": content_vector,
}
except Exception as e:
self.callback.error(f"Failed to process scene {index}: {e}")
scene.summary = None

if self.use_cache and not cache_path.exists():
cache_path.parent.mkdir(parents=True, exist_ok=True)
with open(cache_path, "wb") as f:
pickle.dump(video.scenes, f)
return {
"video_md5": video_md5,
"video_path": video_path,
"instance_id": self.workflow_instance_id,
}
Loading