Skip to content

Commit ba01287

Browse files
committed
chore: add error handling to clustering and reduction operations
1 parent 60315b5 commit ba01287

File tree

1 file changed

+40
-20
lines changed

1 file changed

+40
-20
lines changed

src/worker/clustering_media/clustering_media_worker.py

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -111,38 +111,58 @@ def worker(ch, method, properties, body):
111111
# send failed report to report queue
112112
report = make_report_failed(media_type, "failed", file_id)
113113
feluda.queue.message(
114-
feluda.config.queue.parameters.queues[1]["name"], report
114+
feluda.config.queue.parameters.queues[1]["name"], report
115115
)
116116
# requeue the media file
117117
ch.basic_ack(delivery_tag=method.delivery_tag)
118118
else:
119119
pass
120120

121121
log.info("Clustering embeddings")
122-
clustering_results_audio = cluster_embeddings.run(input_data=audio_embeddings, n_clusters=audio_config.get("n_clusters"), modality='audio')
123-
if "labels" in video_config:
124-
clustering_results_video = video_classifications
125-
else:
126-
clustering_results_video = cluster_embeddings.run(input_data=video_embeddings, n_clusters=video_config.get("n_clusters"), modality='video')
127-
clustering_results_json = {
128-
"audio": clustering_results_audio,
129-
"video": clustering_results_video
130-
}
122+
try:
123+
clustering_results_audio = cluster_embeddings.run(input_data=audio_embeddings, n_clusters=audio_config.get("n_clusters"), modality='audio')
124+
if "labels" in video_config:
125+
clustering_results_video = video_classifications
126+
else:
127+
clustering_results_video = cluster_embeddings.run(input_data=video_embeddings, n_clusters=video_config.get("n_clusters"), modality='video')
128+
clustering_results_json = {
129+
"audio": clustering_results_audio,
130+
"video": clustering_results_video
131+
}
132+
except Exception as e:
133+
print("Error in clustering:", e)
134+
report = make_report_failed("clustering", "failed")
135+
feluda.queue.message(
136+
feluda.config.queue.parameters.queues[1]["name"], report
137+
)
138+
ch.basic_ack(delivery_tag=method.delivery_tag)
139+
131140
log.info("Calculating t-SNE co-ordinates")
132-
if audio_config.get("tsne"):
133-
dim_reduction_results_audio = dimension_reduction.run(audio_embeddings)
134-
if video_config.get("tsne"):
135-
dim_reduction_results_video = dimension_reduction.run(video_embeddings)
136-
137-
dim_reduction_results_json = {
138-
"audio": dim_reduction_results_audio,
139-
"video": dim_reduction_results_video
140-
}
141+
try:
142+
dim_reduction_results_audio = None
143+
dim_reduction_results_video = None
144+
if audio_config.get("tsne"):
145+
dim_reduction_results_audio = dimension_reduction.run(audio_embeddings)
146+
if video_config.get("tsne"):
147+
dim_reduction_results_video = dimension_reduction.run(video_embeddings)
148+
149+
dim_reduction_results_json = {
150+
"audio": dim_reduction_results_audio,
151+
"video": dim_reduction_results_video
152+
}
153+
except Exception as e:
154+
print("Error in dimension reduction:", e)
155+
report = make_report_failed("dimension reduction", "failed")
156+
feluda.queue.message(
157+
feluda.config.queue.parameters.queues[1]["name"], report
158+
)
159+
ch.basic_ack(delivery_tag=method.delivery_tag)
160+
141161
report = make_report_indexed(clustering_results_json, dim_reduction_results_json, "indexed")
142162
log.info("Report generated")
143163
feluda.queue.message(
144164
feluda.config.queue.parameters.queues[1]["name"], report
145-
)
165+
)
146166
ch.basic_ack(delivery_tag=method.delivery_tag)
147167

148168
return worker

0 commit comments

Comments
 (0)