Skip to content

Commit 7129ac3

Browse files
committed
draft: implement initial clustering worker
1 parent 73731ec commit 7129ac3

File tree

2 files changed

+165
-0
lines changed

2 files changed

+165
-0
lines changed

src/worker/clustering_media/__init__.py

Whitespace-only changes.
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
from core.feluda import ComponentType, Feluda
2+
from core.logger import Logger
3+
from core.operators import audio_vec_embedding_clap
4+
from core.operators import cluster_embeddings
5+
from core.operators import dimension_reduction
6+
import json
7+
from core.models.media_factory import AudioFactory
8+
from time import sleep
9+
import numpy as np
10+
import binascii
11+
12+
log = Logger(__name__)
13+
14+
15+
def make_report_indexed(clustering_results_json,dim_reduction_results_json, status):
16+
report = {}
17+
report["indexer_id"] = 1
18+
#report["post_id"] = data["id"]
19+
#report["media_type"] = data["media_type"]
20+
report["clustering_results"] = clustering_results_json
21+
report["dim_reduction_results"] = dim_reduction_results_json
22+
report["status"] = status
23+
report["status_code"] = 200
24+
return json.dumps(report)
25+
26+
def make_report_failed(media_type, status, file_id=None):
27+
report = {}
28+
report["indexer_id"] = 1
29+
# report["post_id"] = data["id"]
30+
report["media_type"] = media_type
31+
report["file_id"] = file_id if file_id else "unknown"
32+
report["status"] = status
33+
report["status_code"] = 400
34+
return json.dumps(report)
35+
36+
def make_report_failed_unsupported_media_type(media_type, status, file_id=None):
37+
report = {}
38+
report["indexer_id"] = 1
39+
#report["post_id"] = data["id"]
40+
report["media_type"] = media_type
41+
report["file_id"] = file_id if file_id else "unknown"
42+
report["clustering_results"] = None
43+
report["dim_reduction_results"] = None
44+
report["status"] = status
45+
report["status_code"] = 415 # 415 Unsupported Media Type
46+
return json.dumps(report)
47+
48+
49+
def calc_audio_vec_crc(audio_vector):
50+
vec_arr = np.asarray(audio_vector)
51+
arr_crc = binascii.crc32(vec_arr.tobytes(order="C"))
52+
return arr_crc
53+
54+
55+
def clustering_worker(feluda):
56+
def worker(ch, method, properties, body):
57+
print("MESSAGE RECEIVED")
58+
audio_vec_crc = None
59+
file_list = json.loads(body)
60+
audio_embeddings = []
61+
62+
for file in file_list:
63+
file_id = file["id"]
64+
file_path = file["path"]
65+
media_type = file["media_type"]
66+
67+
if media_type == "audio":
68+
log.info("Media Type is Audio")
69+
try:
70+
# download the audio from url (supports s3)
71+
audio_path = AudioFactory.make_from_url(file_path)
72+
# extract audio vectors
73+
audio_vec = audio_vec_embedding_clap.run(audio_path)
74+
audio_embeddings.append({"payload": file_id, "embedding": audio_vec})
75+
76+
# add crc to database
77+
if feluda.config.store and "postgresql" in feluda.store:
78+
audio_vec_crc = calc_audio_vec_crc(audio_vec)
79+
feluda.store["postgresql"].store(
80+
str(audio_vec_crc), "audio_vector_crc"
81+
)
82+
log.info("Audio CRC value added to PostgreSQL")
83+
84+
# Add code to store in ES
85+
86+
except Exception as e:
87+
print("Error in generating embeddings", e)
88+
# send failed report to report queue
89+
report = make_report_failed(media_type, "failed", file_id)
90+
feluda.queue.message(
91+
feluda.config.queue.parameters.queues[1]["name"], report
92+
)
93+
# requeue the media file
94+
ch.basic_ack(delivery_tag=method.delivery_tag)
95+
96+
elif media_type == "video":
97+
pass
98+
99+
else:
100+
log.info("This media type is not supported currently")
101+
report = make_report_failed_unsupported_media_type(media_type, "failed", file_id)
102+
feluda.queue.message(
103+
feluda.config.queue.parameters.queues[1]["name"], report
104+
)
105+
ch.basic_ack(delivery_tag=method.delivery_tag)
106+
107+
clustering_results_json = cluster_embeddings.run(input_data=audio_embeddings, n_clusters=2, modality='audio')
108+
dim_reduction_results_json = dimension_reduction.perform_reduction(audio_embeddings)
109+
report = make_report_indexed(clustering_results_json, dim_reduction_results_json, "indexed")
110+
feluda.queue.message(
111+
feluda.config.queue.parameters.queues[1]["name"], report
112+
)
113+
ch.basic_ack(delivery_tag=method.delivery_tag)
114+
115+
return worker
116+
117+
118+
def handle_exception(feluda, queue_name, worker_func, retries, max_retries):
119+
retry_interval = 60
120+
if retries < max_retries:
121+
print("Inside Handle Exception")
122+
try:
123+
feluda.start_component(ComponentType.QUEUE)
124+
feluda.queue.listen(queue_name, worker_func)
125+
return
126+
except Exception as e:
127+
print("Error handling exception:", e)
128+
retries = retries + 1
129+
sleep(retry_interval)
130+
handle_exception(feluda, queue_name, worker_func, retries, max_retries)
131+
else:
132+
print("Failed to re-establish connection after maximum retries.")
133+
134+
135+
feluda = None
136+
clustering_media_index_queue = None
137+
try:
138+
# Init Feluda and load config
139+
feluda = Feluda("worker/clustering_media/config.yml")
140+
feluda.setup()
141+
clustering_media_index_queue = feluda.config.queue.parameters.queues[0]["name"]
142+
# setup Components
143+
feluda.start_component(ComponentType.QUEUE)
144+
if feluda.config.store:
145+
feluda.start_component(ComponentType.STORE)
146+
147+
# init all operators
148+
audio_vec_embedding_clap.initialize(param={})
149+
cluster_embeddings.initialize(param={})
150+
dimension_reduction.setup_reduction(model_type='tsne', params={})
151+
152+
# start listening to the queue
153+
feluda.queue.listen(clustering_media_index_queue, clustering_worker(feluda))
154+
except Exception as e:
155+
print("Error Initializing Indexer", e)
156+
# Try connecting to Queue again
157+
retries = 0
158+
max_retries = 10
159+
handle_exception(
160+
feluda,
161+
clustering_media_index_queue,
162+
clustering_worker(feluda),
163+
retries,
164+
max_retries,
165+
)

0 commit comments

Comments
 (0)