Skip to content

Commit ffba9cd

Browse files
committed
feat: media worker supports video
1 parent f43f205 commit ffba9cd

File tree

3 files changed

+88
-20
lines changed

3 files changed

+88
-20
lines changed

src/worker/media/config.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ operators:
2323
- name: "Video Vector Representation"
2424
type: "vid_vec_rep_resnet"
2525
parameters: { index_name: "video" }
26-
- name: "Audio Vector Representation"
27-
type: "audio_vec_embedding"
28-
parameters: { index_name: "audio" }
26+
# - name: "Audio Vector Representation"
27+
# type: "audio_vec_embedding"
28+
# parameters: { index_name: "audio" }
2929

3030
postgresql:
3131
label: "PostgreSQL"
Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
from core.feluda import ComponentType, Feluda
22
from core.logger import Logger
33
from time import sleep
4+
import uuid
45

56
log = Logger(__name__)
67

78
try:
89
feluda = Feluda("worker/media/config.yml")
910
feluda.setup()
10-
video_index_queue = feluda.config.queue.parameters.queues[0]["name"]
11-
feluda.start_component(ComponentType.STORE)
11+
media_index_queue = feluda.config.queue.parameters.queues[0]["name"]
1212
feluda.start_component(ComponentType.QUEUE)
1313

1414
for _ in range(1):
15+
unique_id = str(uuid.uuid4())
1516
dummy_payload = {
16-
"id": str(12345),
17+
"id": unique_id,
1718
"path": "https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/sample-cat-video.mp4",
1819
"media_type": "video"
1920
}
20-
feluda.queue.message(video_index_queue, dummy_payload)
21+
feluda.queue.message(media_index_queue, dummy_payload)
22+
sleep(0.3)
2123
except Exception as e:
22-
print("Error Initializing Indexer", e)
24+
print("Error Sending Payload", e)

src/worker/media/media_worker.py

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@ def make_report_indexed(data, status):
1919
report = {}
2020
report["indexer_id"] = 1
2121
report["post_id"] = data["id"]
22+
report["media_type"] = data["media_type"]
2223
report["status"] = status
2324
report["status_code"] = 200
2425
return json.dumps(report)
2526

26-
2727
def make_report_failed(data, status):
2828
report = {}
2929
report["indexer_id"] = 1
3030
report["post_id"] = data["id"]
31+
report["media_type"] = data["media_type"]
3132
report["status"] = status
3233
report["status_code"] = 400
3334
return json.dumps(report)
@@ -51,14 +52,68 @@ def generator_doc():
5152

5253
return generator_doc
5354

55+
def calc_video_vec_crc(video_vec_gen):
56+
count = 0
57+
combined_vec = [[]]
58+
for vector in video_vec_gen:
59+
if count == 0:
60+
# skip first vector - mean of keyframes
61+
count += 1
62+
else:
63+
combined_vec.append(vector["vid_vec"])
64+
# remove first list which is empty
65+
combined_vec = combined_vec[1:]
66+
combined_vec_arr = np.asarray(combined_vec)
67+
arr_crc = binascii.crc32(combined_vec_arr.tobytes(order='C'))
68+
return arr_crc
69+
5470
def indexer(feluda):
5571
def worker(ch, method, properties, body):
5672
print("MESSAGE RECEIVED")
5773
file_content = json.loads(body)
5874
media_type = file_content["media_type"]
59-
print(media_type)
75+
if media_type == "video":
76+
try:
77+
# download the video from url (supports s3)
78+
video_path = VideoFactory.make_from_url(file_content["path"])
79+
# extract video vectors
80+
video_vec = vid_vec_rep_resnet.run(video_path)
81+
# add crc to database
82+
if feluda.config.postgresql:
83+
video_vec_crc = calc_video_vec_crc(video_vec)
84+
pg_manager.store(
85+
"user_message_inbox_perceptually_similar",
86+
str(video_vec_crc),
87+
"video_vector_crc")
88+
log.info("CRC value added to PostgreSQL")
89+
# generate document to report
90+
doc = generate_document(video_path["path"], video_vec)
91+
media_type = MediaType.VIDEO
92+
# store in ES
93+
if feluda.config.store:
94+
result = feluda.store.store(media_type, doc)
95+
log.info(result)
96+
# send indexed report to report queue
97+
report = make_report_indexed(file_content, "indexed")
98+
feluda.queue.message(feluda.config.queue.parameters.queues[1]["name"], report)
99+
# send ack
100+
ch.basic_ack(delivery_tag=method.delivery_tag)
101+
except Exception as e:
102+
print("Error indexing media", e)
103+
# send failed report to report queue
104+
report = make_report_failed(file_content, "failed")
105+
feluda.queue.message(feluda.config.queue.parameters.queues[1]["name"], report)
106+
# requeue the media file
107+
ch.basic_nack(delivery_tag=method.delivery_tag)
108+
elif media_type == "audio":
109+
pass
110+
else:
111+
log.info("This media type is not supported currently")
112+
# TODO: send a customised report and then report it to the queue with a ack
113+
60114
return worker
61115

116+
62117
def handle_exception(feluda, queue_name, worker_func, retries, max_retries):
63118
retry_interval = 60
64119
if retries < max_retries:
@@ -75,27 +130,38 @@ def handle_exception(feluda, queue_name, worker_func, retries, max_retries):
75130
else:
76131
print("Failed to re-establish connection after maximum retries.")
77132

133+
78134
feluda = None
79135
pg_manager = None
80136
media_index_queue = None
81137
try:
138+
# Init Feluda and load config
82139
feluda = Feluda("worker/media/config.yml")
83140
feluda.setup()
84-
# pg_manager = PostgreSQLManager()
85-
# pg_manager.connect()
86-
# pg_manager.create_trigger_function()
87-
# pg_manager.create_table("user_message_inbox_perceptually_similar")
88-
# pg_manager.create_trigger("user_message_inbox_perceptually_similar")
141+
# check if postgresql exists in config
142+
if feluda.config.postgresql:
143+
pg_manager = PostgreSQLManager()
144+
pg_manager.connect()
145+
pg_manager.create_trigger_function()
146+
pg_manager.create_table("user_message_inbox_perceptually_similar")
147+
pg_manager.create_trigger("user_message_inbox_perceptually_similar")
148+
else:
149+
log.info("PostgreSQL is not defined in the config file")
89150
media_index_queue = feluda.config.queue.parameters.queues[0]["name"]
90-
print(media_index_queue)
91-
feluda.start_component(ComponentType.STORE)
151+
# start components and init operators
92152
feluda.start_component(ComponentType.QUEUE)
93-
vid_vec_rep_resnet.initialize(param=None)
94-
audio_vec_embedding.initialize(param=None)
153+
# check if store is present in config
154+
if feluda.config.store:
155+
feluda.start_component(ComponentType.STORE)
156+
else:
157+
log.info("Store (ES) is not defined in the config file")
158+
# start listening to the queue
95159
feluda.queue.listen(media_index_queue, indexer(feluda))
96160
except Exception as e:
97161
print("Error Initializing Indexer", e)
162+
# Try connecting to Queue again
98163
retries = 0
99164
max_retries = 10
100165
handle_exception(feluda, media_index_queue, indexer(feluda), retries, max_retries)
101-
pg_manager.close_connection()
166+
if feluda.config.postgresql:
167+
pg_manager.close_connection()

0 commit comments

Comments
 (0)