Skip to content

Commit 85f002e

Browse files
fix(clients): allow chunked requests on WithTransformation methods (generated)
algolia/api-clients-automation#5011 Co-authored-by: algolia-bot <[email protected]> Co-authored-by: Clément Vannicatte <[email protected]>
1 parent a6bd3a4 commit 85f002e

File tree

2 files changed

+179
-42
lines changed

2 files changed

+179
-42
lines changed

algoliasearch/ingestion/client.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,20 @@
2222

2323
from algoliasearch.http.api_response import ApiResponse
2424
from algoliasearch.http.base_config import BaseConfig
25+
from algoliasearch.http.exceptions import RequestException
26+
from algoliasearch.http.helpers import (
27+
RetryTimeout,
28+
create_iterable,
29+
create_iterable_sync,
30+
)
2531
from algoliasearch.http.request_options import RequestOptions
2632
from algoliasearch.http.serializer import body_serializer
2733
from algoliasearch.http.transporter import Transporter
2834
from algoliasearch.http.transporter_sync import TransporterSync
2935
from algoliasearch.http.verb import Verb
3036
from algoliasearch.ingestion.config import IngestionConfig
3137
from algoliasearch.ingestion.models import (
38+
Action,
3239
ActionType,
3340
Authentication,
3441
AuthenticationCreate,
@@ -61,6 +68,7 @@
6168
OrderKeys,
6269
PlatformWithNone,
6370
PushTaskPayload,
71+
PushTaskRecords,
6472
Run,
6573
RunListResponse,
6674
RunResponse,
@@ -193,6 +201,75 @@ async def add_user_agent(self, segment: str, version: Optional[str] = None) -> N
193201
"""adds a segment to the default user agent, and update the headers sent with each requests as well"""
194202
self._transporter.config.add_user_agent(segment, version)
195203

204+
async def chunked_push(
205+
self,
206+
index_name: str,
207+
objects: List[Dict[str, Any]],
208+
action: Action = Action.ADDOBJECT,
209+
wait_for_tasks: bool = False,
210+
batch_size: int = 1000,
211+
reference_index_name: Optional[str] = None,
212+
request_options: Optional[Union[dict, RequestOptions]] = None,
213+
) -> List[WatchResponse]:
214+
"""
215+
Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
216+
"""
217+
records: List[PushTaskRecords] = []
218+
responses: List[WatchResponse] = []
219+
for i, obj in enumerate(objects):
220+
records.append(obj) # pyright: ignore
221+
if len(records) == batch_size or i == len(objects) - 1:
222+
responses.append(
223+
await self.push(
224+
index_name=index_name,
225+
push_task_payload={
226+
"action": action,
227+
"records": records,
228+
},
229+
reference_index_name=reference_index_name,
230+
request_options=request_options,
231+
)
232+
)
233+
if wait_for_tasks:
234+
for response in responses:
235+
236+
async def _func(_: Optional[Event]) -> Event:
237+
if response.event_id is None:
238+
raise ValueError(
239+
"received unexpected response from the push endpoint, eventID must not be undefined"
240+
)
241+
try:
242+
return await self.get_event(
243+
run_id=response.run_id,
244+
event_id=response.event_id,
245+
request_options=request_options,
246+
)
247+
except RequestException as e:
248+
if e.status_code == 404:
249+
return None # pyright: ignore
250+
raise e
251+
252+
_retry_count = 0
253+
254+
def _aggregator(_: Event | None) -> None:
255+
nonlocal _retry_count
256+
_retry_count += 1
257+
258+
def _validate(_resp: Event | None) -> bool:
259+
return _resp is not None
260+
261+
timeout = RetryTimeout()
262+
263+
await create_iterable(
264+
func=_func,
265+
validate=_validate,
266+
aggregator=_aggregator,
267+
timeout=lambda: timeout(_retry_count),
268+
error_validate=lambda _: _retry_count >= 50,
269+
error_message=lambda _: f"The maximum number of retries exceeded. (${_retry_count}/${50})",
270+
)
271+
return responses
272+
196273
async def create_authentication_with_http_info(
197274
self,
198275
authentication_create: Union[AuthenticationCreate, dict[str, Any]],
@@ -5204,6 +5281,75 @@ def add_user_agent(self, segment: str, version: Optional[str] = None) -> None:
52045281
"""adds a segment to the default user agent, and update the headers sent with each requests as well"""
52055282
self._transporter.config.add_user_agent(segment, version)
52065283

5284+
def chunked_push(
5285+
self,
5286+
index_name: str,
5287+
objects: List[Dict[str, Any]],
5288+
action: Action = Action.ADDOBJECT,
5289+
wait_for_tasks: bool = False,
5290+
batch_size: int = 1000,
5291+
reference_index_name: Optional[str] = None,
5292+
request_options: Optional[Union[dict, RequestOptions]] = None,
5293+
) -> List[WatchResponse]:
5294+
"""
5295+
Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
5296+
"""
5297+
records: List[PushTaskRecords] = []
5298+
responses: List[WatchResponse] = []
5299+
for i, obj in enumerate(objects):
5300+
records.append(obj) # pyright: ignore
5301+
if len(records) == batch_size or i == len(objects) - 1:
5302+
responses.append(
5303+
self.push(
5304+
index_name=index_name,
5305+
push_task_payload={
5306+
"action": action,
5307+
"records": records,
5308+
},
5309+
reference_index_name=reference_index_name,
5310+
request_options=request_options,
5311+
)
5312+
)
5313+
if wait_for_tasks:
5314+
for response in responses:
5315+
5316+
def _func(_: Optional[Event]) -> Event:
5317+
if response.event_id is None:
5318+
raise ValueError(
5319+
"received unexpected response from the push endpoint, eventID must not be undefined"
5320+
)
5321+
try:
5322+
return self.get_event(
5323+
run_id=response.run_id,
5324+
event_id=response.event_id,
5325+
request_options=request_options,
5326+
)
5327+
except RequestException as e:
5328+
if e.status_code == 404:
5329+
return None # pyright: ignore
5330+
raise e
5331+
5332+
_retry_count = 0
5333+
5334+
def _aggregator(_: Event | None) -> None:
5335+
nonlocal _retry_count
5336+
_retry_count += 1
5337+
5338+
def _validate(_resp: Event | None) -> bool:
5339+
return _resp is not None
5340+
5341+
timeout = RetryTimeout()
5342+
5343+
create_iterable_sync(
5344+
func=_func,
5345+
validate=_validate,
5346+
aggregator=_aggregator,
5347+
timeout=lambda: timeout(_retry_count),
5348+
error_validate=lambda _: _retry_count >= 50,
5349+
error_message=lambda _: f"The maximum number of retries exceeded. (${_retry_count}/${50})",
5350+
)
5351+
return responses
5352+
52075353
def create_authentication_with_http_info(
52085354
self,
52095355
authentication_create: Union[AuthenticationCreate, dict[str, Any]],

algoliasearch/search/client.py

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from algoliasearch.http.verb import Verb
4242
from algoliasearch.ingestion.client import IngestionClient, IngestionClientSync
4343
from algoliasearch.ingestion.config import IngestionConfig
44+
from algoliasearch.ingestion.models import Action as IngestionAction
4445
from algoliasearch.ingestion.models import WatchResponse
4546
from algoliasearch.search.config import SearchConfig
4647
from algoliasearch.search.models import (
@@ -291,7 +292,7 @@ async def wait_for_api_key(
291292
"`apiKey` is required when waiting for an `update` operation."
292293
)
293294

294-
async def _func(_prev: Optional[GetApiKeyResponse]) -> GetApiKeyResponse:
295+
async def _func(_: Optional[GetApiKeyResponse]) -> GetApiKeyResponse:
295296
try:
296297
return await self.get_api_key(key=key, request_options=request_options)
297298
except RequestException as e:
@@ -431,9 +432,7 @@ async def browse_synonyms(
431432
page = search_synonyms_params.page or 0
432433
search_synonyms_params.hits_per_page = hits_per_page
433434

434-
async def _func(
435-
_prev: Optional[SearchSynonymsResponse],
436-
) -> SearchSynonymsResponse:
435+
async def _func(_: Optional[SearchSynonymsResponse]) -> SearchSynonymsResponse:
437436
nonlocal page
438437
resp = await self.search_synonyms(
439438
index_name=index_name,
@@ -534,22 +533,20 @@ async def save_objects_with_transformation(
534533
wait_for_tasks: bool = False,
535534
batch_size: int = 1000,
536535
request_options: Optional[Union[dict, RequestOptions]] = None,
537-
) -> WatchResponse:
536+
) -> List[WatchResponse]:
538537
"""
539538
Helper: Similar to the `save_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client's config at instantiation.
540539
"""
541540
if self._ingestion_transporter is None:
542541
raise ValueError(
543542
"`region` must be provided at client instantiation before calling this method."
544543
)
545-
546-
return await self._ingestion_transporter.push(
544+
return await self._ingestion_transporter.chunked_push(
547545
index_name=index_name,
548-
push_task_payload={
549-
"action": Action.ADDOBJECT,
550-
"records": objects,
551-
},
552-
watch=wait_for_tasks,
546+
objects=objects,
547+
action=IngestionAction.ADDOBJECT,
548+
wait_for_tasks=wait_for_tasks,
549+
batch_size=batch_size,
553550
request_options=request_options,
554551
)
555552

@@ -604,24 +601,22 @@ async def partial_update_objects_with_transformation(
604601
wait_for_tasks: bool = False,
605602
batch_size: int = 1000,
606603
request_options: Optional[Union[dict, RequestOptions]] = None,
607-
) -> WatchResponse:
604+
) -> List[WatchResponse]:
608605
"""
609606
Helper: Similar to the `partial_update_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client instantiation method.
610607
"""
611608
if self._ingestion_transporter is None:
612609
raise ValueError(
613610
"`region` must be provided at client instantiation before calling this method."
614611
)
615-
616-
return await self._ingestion_transporter.push(
612+
return await self._ingestion_transporter.chunked_push(
617613
index_name=index_name,
618-
push_task_payload={
619-
"action": Action.PARTIALUPDATEOBJECT
620-
if create_if_not_exists
621-
else Action.PARTIALUPDATEOBJECTNOCREATE,
622-
"records": objects,
623-
},
624-
watch=wait_for_tasks,
614+
objects=objects,
615+
action=IngestionAction.PARTIALUPDATEOBJECT
616+
if create_if_not_exists
617+
else IngestionAction.PARTIALUPDATEOBJECTNOCREATE,
618+
wait_for_tasks=wait_for_tasks,
619+
batch_size=batch_size,
625620
request_options=request_options,
626621
)
627622

@@ -5426,7 +5421,7 @@ def wait_for_api_key(
54265421
"`apiKey` is required when waiting for an `update` operation."
54275422
)
54285423

5429-
def _func(_prev: Optional[GetApiKeyResponse]) -> GetApiKeyResponse:
5424+
def _func(_: Optional[GetApiKeyResponse]) -> GetApiKeyResponse:
54305425
try:
54315426
return self.get_api_key(key=key, request_options=request_options)
54325427
except RequestException as e:
@@ -5566,7 +5561,7 @@ def browse_synonyms(
55665561
page = search_synonyms_params.page or 0
55675562
search_synonyms_params.hits_per_page = hits_per_page
55685563

5569-
def _func(_prev: Optional[SearchSynonymsResponse]) -> SearchSynonymsResponse:
5564+
def _func(_: Optional[SearchSynonymsResponse]) -> SearchSynonymsResponse:
55705565
nonlocal page
55715566
resp = self.search_synonyms(
55725567
index_name=index_name,
@@ -5667,22 +5662,20 @@ def save_objects_with_transformation(
56675662
wait_for_tasks: bool = False,
56685663
batch_size: int = 1000,
56695664
request_options: Optional[Union[dict, RequestOptions]] = None,
5670-
) -> WatchResponse:
5665+
) -> List[WatchResponse]:
56715666
"""
56725667
Helper: Similar to the `save_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client's config at instantiation.
56735668
"""
56745669
if self._ingestion_transporter is None:
56755670
raise ValueError(
56765671
"`region` must be provided at client instantiation before calling this method."
56775672
)
5678-
5679-
return self._ingestion_transporter.push(
5673+
return self._ingestion_transporter.chunked_push(
56805674
index_name=index_name,
5681-
push_task_payload={
5682-
"action": Action.ADDOBJECT,
5683-
"records": objects,
5684-
},
5685-
watch=wait_for_tasks,
5675+
objects=objects,
5676+
action=IngestionAction.ADDOBJECT,
5677+
wait_for_tasks=wait_for_tasks,
5678+
batch_size=batch_size,
56865679
request_options=request_options,
56875680
)
56885681

@@ -5737,24 +5730,22 @@ def partial_update_objects_with_transformation(
57375730
wait_for_tasks: bool = False,
57385731
batch_size: int = 1000,
57395732
request_options: Optional[Union[dict, RequestOptions]] = None,
5740-
) -> WatchResponse:
5733+
) -> List[WatchResponse]:
57415734
"""
57425735
Helper: Similar to the `partial_update_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client instantiation method.
57435736
"""
57445737
if self._ingestion_transporter is None:
57455738
raise ValueError(
57465739
"`region` must be provided at client instantiation before calling this method."
57475740
)
5748-
5749-
return self._ingestion_transporter.push(
5741+
return self._ingestion_transporter.chunked_push(
57505742
index_name=index_name,
5751-
push_task_payload={
5752-
"action": Action.PARTIALUPDATEOBJECT
5753-
if create_if_not_exists
5754-
else Action.PARTIALUPDATEOBJECTNOCREATE,
5755-
"records": objects,
5756-
},
5757-
watch=wait_for_tasks,
5743+
objects=objects,
5744+
action=IngestionAction.PARTIALUPDATEOBJECT
5745+
if create_if_not_exists
5746+
else IngestionAction.PARTIALUPDATEOBJECTNOCREATE,
5747+
wait_for_tasks=wait_for_tasks,
5748+
batch_size=batch_size,
57585749
request_options=request_options,
57595750
)
57605751

0 commit comments

Comments
 (0)