Skip to content

Commit 9dec2bb

Browse files
authored
fix(clients): allow chunked requests on WithTransformation methods (#5011)
1 parent 0ad7f6a commit 9dec2bb

22 files changed

+685
-455
lines changed

clients/algoliasearch-client-javascript/packages/algoliasearch/__tests__/algoliasearch.common.test.ts

Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -168,86 +168,6 @@ describe('api', () => {
168168
}),
169169
).rejects.toThrow('`transformation.region` must be provided at client instantiation before calling this method.');
170170
});
171-
172-
test('exposes the transformation methods at the root of the client', async () => {
173-
const ingestionClient = algoliasearch('APP_ID', 'API_KEY', {
174-
requester: browserEchoRequester(),
175-
transformation: { region: 'us' },
176-
});
177-
178-
expect(ingestionClient.saveObjectsWithTransformation).not.toBeUndefined();
179-
180-
let res = (await ingestionClient.saveObjectsWithTransformation({
181-
indexName: 'foo',
182-
objects: [{ objectID: 'bar', baz: 42 }],
183-
waitForTasks: true,
184-
})) as unknown as EchoResponse;
185-
186-
expect(res.headers).toEqual(
187-
expect.objectContaining({
188-
'x-algolia-application-id': 'APP_ID',
189-
'x-algolia-api-key': 'API_KEY',
190-
}),
191-
);
192-
expect(res.url.startsWith('https://data.us.algolia.com/1/push/foo?watch=true')).toBeTruthy();
193-
expect(res.data).toEqual({
194-
action: 'addObject',
195-
records: [
196-
{
197-
baz: 42,
198-
objectID: 'bar',
199-
},
200-
],
201-
});
202-
expect(ingestionClient.partialUpdateObjectsWithTransformation).not.toBeUndefined();
203-
204-
res = (await ingestionClient.partialUpdateObjectsWithTransformation({
205-
indexName: 'foo',
206-
objects: [{ objectID: 'bar', baz: 42 }],
207-
waitForTasks: true,
208-
createIfNotExists: true,
209-
})) as unknown as EchoResponse;
210-
211-
expect(res.headers).toEqual(
212-
expect.objectContaining({
213-
'x-algolia-application-id': 'APP_ID',
214-
'x-algolia-api-key': 'API_KEY',
215-
}),
216-
);
217-
expect(res.url.startsWith('https://data.us.algolia.com/1/push/foo?watch=true')).toBeTruthy();
218-
expect(res.data).toEqual({
219-
action: 'partialUpdateObject',
220-
records: [
221-
{
222-
baz: 42,
223-
objectID: 'bar',
224-
},
225-
],
226-
});
227-
228-
res = (await ingestionClient.partialUpdateObjectsWithTransformation({
229-
indexName: 'foo',
230-
objects: [{ objectID: 'bar', baz: 42 }],
231-
waitForTasks: true,
232-
})) as unknown as EchoResponse;
233-
234-
expect(res.headers).toEqual(
235-
expect.objectContaining({
236-
'x-algolia-application-id': 'APP_ID',
237-
'x-algolia-api-key': 'API_KEY',
238-
}),
239-
);
240-
expect(res.url.startsWith('https://data.us.algolia.com/1/push/foo?watch=true')).toBeTruthy();
241-
expect(res.data).toEqual({
242-
action: 'partialUpdateObjectNoCreate',
243-
records: [
244-
{
245-
baz: 42,
246-
objectID: 'bar',
247-
},
248-
],
249-
});
250-
});
251171
});
252172
});
253173

specs/search/helpers/chunkedPush.yml renamed to specs/ingestion/helpers/chunkedPush.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ method:
55
- Records
66
x-available-languages:
77
- javascript
8+
- go
9+
- java
10+
- php
11+
- python
812
operationId: chunkedPush
913
summary: Replace all records in an index
1014
description: |

specs/ingestion/spec.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,6 @@ paths:
202202
# ###############
203203
/setClientApiKey:
204204
$ref: '../common/helpers/setClientApiKey.yml#/method'
205+
206+
/chunkedPush:
207+
$ref: 'helpers/chunkedPush.yml#/method'

specs/search/helpers/partialUpdateObjectsWithTransformation.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ method:
6161
content:
6262
application/json:
6363
schema:
64-
$ref: '../../common/schemas/ingestion/WatchResponse.yml'
64+
type: array
65+
items:
66+
$ref: '../../common/schemas/ingestion/WatchResponse.yml'
6567
'400':
6668
$ref: '../../common/responses/IndexNotFound.yml'

specs/search/helpers/saveObjectsWithTransformation.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ method:
5454
content:
5555
application/json:
5656
schema:
57-
$ref: '../../common/schemas/ingestion/WatchResponse.yml'
57+
type: array
58+
items:
59+
$ref: '../../common/schemas/ingestion/WatchResponse.yml'
5860
'400':
5961
$ref: '../../common/responses/IndexNotFound.yml'

templates/go/api.mustache

Lines changed: 32 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@ type config struct {
3333
headerParams map[string]string
3434
timeouts transport.RequestConfiguration
3535
36+
{{#isIngestionClient}}
37+
// -- ChunkedPush options
38+
waitForTasks bool
39+
batchSize int
40+
41+
// -- Iterable options
42+
maxRetries int
43+
timeout func(int) time.Duration
44+
aggregator func(any, error)
45+
{{/isIngestionClient}}
46+
3647
{{#isSearchClient}}
3748
// -- ChunkedBatch options
3849
waitForTasks bool
@@ -102,42 +113,6 @@ func WithConnectTimeout(timeout time.Duration) requestOption {
102113

103114
{{#isSearchClient}}
104115

105-
// --------- ChunkedBatch options ---------
106-
107-
type ChunkedBatchOption interface {
108-
RequestOption
109-
chunkedBatch()
110-
}
111-
112-
type chunkedBatchOption func(*config)
113-
114-
var (
115-
_ ChunkedBatchOption = (*chunkedBatchOption)(nil)
116-
_ ChunkedBatchOption = (*requestOption)(nil)
117-
)
118-
119-
func (c chunkedBatchOption) apply(conf *config) {
120-
c(conf)
121-
}
122-
123-
func (c chunkedBatchOption) chunkedBatch() {}
124-
125-
func (r requestOption) chunkedBatch() {}
126-
127-
// WithWaitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
128-
func WithWaitForTasks(waitForTasks bool) chunkedBatchOption {
129-
return chunkedBatchOption(func(c *config) {
130-
c.waitForTasks = waitForTasks
131-
})
132-
}
133-
134-
// WithBatchSize the size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
135-
func WithBatchSize(batchSize int) chunkedBatchOption {
136-
return chunkedBatchOption(func(c *config) {
137-
c.batchSize = batchSize
138-
})
139-
}
140-
141116
// --------- PartialUpdateObjects options ---------
142117

143118
type PartialUpdateObjectsOption interface {
@@ -206,49 +181,6 @@ func WithScopes(scopes []ScopeType) replaceAllObjectsOption {
206181
})
207182
}
208183

209-
// --------- Iterable options ---------.
210-
211-
type IterableOption interface {
212-
RequestOption
213-
iterable()
214-
}
215-
216-
type iterableOption func(*config)
217-
218-
var (
219-
_ IterableOption = (*iterableOption)(nil)
220-
_ IterableOption = (*requestOption)(nil)
221-
)
222-
223-
func (i iterableOption) apply(c *config) {
224-
i(c)
225-
}
226-
227-
func (r requestOption) iterable() {}
228-
229-
func (i iterableOption) iterable() {}
230-
231-
// WithMaxRetries the maximum number of retry. Default to 50.
232-
func WithMaxRetries(maxRetries int) iterableOption {
233-
return iterableOption(func(c *config) {
234-
c.maxRetries = maxRetries
235-
})
236-
}
237-
238-
// WithTimeout he function to decide how long to wait between retries. Default to min(retryCount * 200, 5000)
239-
func WithTimeout(timeout func(int) time.Duration) iterableOption {
240-
return iterableOption(func(c *config) {
241-
c.timeout = timeout
242-
})
243-
}
244-
245-
// WithAggregator the function to aggregate the results of the iterable.
246-
func WithAggregator(aggregator func(any, error)) iterableOption {
247-
return iterableOption(func(c *config) {
248-
c.aggregator = aggregator
249-
})
250-
}
251-
252184
// --------- WaitForKey options ---------.
253185

254186
type WaitForApiKeyOption interface {
@@ -295,6 +227,18 @@ func toRequestOptions[T RequestOption](opts []T) []RequestOption {
295227
return requestOpts
296228
}
297229

230+
func toIngestionRequestOptions(opts []RequestOption) []ingestion.RequestOption {
231+
requestOpts := make([]ingestion.RequestOption, 0, len(opts))
232+
233+
for _, opt := range opts {
234+
if opt, ok := opt.(ingestion.RequestOption); ok {
235+
requestOpts = append(requestOpts, opt)
236+
}
237+
}
238+
239+
return requestOpts
240+
}
241+
298242
func toIterableOptions(opts []ChunkedBatchOption) []IterableOption {
299243
iterableOpts := make([]IterableOption, 0, len(opts))
300244
@@ -571,5 +515,13 @@ func (c *APIClient) {{nickname}}({{#hasParams}}r {{#structPrefix}}{{&classname}}
571515
{{/operations}}
572516

573517
{{#isSearchClient}}
518+
{{> helpers}}
519+
574520
{{> search_helpers}}
575-
{{/isSearchClient}}
521+
{{/isSearchClient}}
522+
523+
{{#isIngestionClient}}
524+
{{> helpers}}
525+
526+
{{> ingestion_helpers}}
527+
{{/isIngestionClient}}

templates/go/helpers.mustache

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// --------- ChunkedBatch options ---------
2+
3+
type ChunkedBatchOption interface {
4+
RequestOption
5+
chunkedBatch()
6+
}
7+
8+
type chunkedBatchOption func(*config)
9+
10+
var (
11+
_ ChunkedBatchOption = (*chunkedBatchOption)(nil)
12+
_ ChunkedBatchOption = (*requestOption)(nil)
13+
)
14+
15+
func (c chunkedBatchOption) apply(conf *config) {
16+
c(conf)
17+
}
18+
19+
func (c chunkedBatchOption) chunkedBatch() {}
20+
21+
func (r requestOption) chunkedBatch() {}
22+
23+
// WithWaitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
24+
func WithWaitForTasks(waitForTasks bool) chunkedBatchOption {
25+
return chunkedBatchOption(func(c *config) {
26+
c.waitForTasks = waitForTasks
27+
})
28+
}
29+
30+
// WithBatchSize the size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
31+
func WithBatchSize(batchSize int) chunkedBatchOption {
32+
return chunkedBatchOption(func(c *config) {
33+
c.batchSize = batchSize
34+
})
35+
}
36+
37+
// --------- Iterable options ---------.
38+
39+
type IterableOption interface {
40+
RequestOption
41+
iterable()
42+
}
43+
44+
type iterableOption func(*config)
45+
46+
var (
47+
_ IterableOption = (*iterableOption)(nil)
48+
_ IterableOption = (*requestOption)(nil)
49+
)
50+
51+
func (i iterableOption) apply(c *config) {
52+
i(c)
53+
}
54+
55+
func (r requestOption) iterable() {}
56+
57+
func (i iterableOption) iterable() {}
58+
59+
// WithMaxRetries the maximum number of retry. Default to 50.
60+
func WithMaxRetries(maxRetries int) iterableOption {
61+
return iterableOption(func(c *config) {
62+
c.maxRetries = maxRetries
63+
})
64+
}
65+
66+
// WithTimeout he function to decide how long to wait between retries. Default to min(retryCount * 200, 5000)
67+
func WithTimeout(timeout func(int) time.Duration) iterableOption {
68+
return iterableOption(func(c *config) {
69+
c.timeout = timeout
70+
})
71+
}
72+
73+
// WithAggregator the function to aggregate the results of the iterable.
74+
func WithAggregator(aggregator func(any, error)) iterableOption {
75+
return iterableOption(func(c *config) {
76+
c.aggregator = aggregator
77+
})
78+
}
79+
80+
func CreateIterable[T any](execute func(*T, error) (*T, error), validate func(*T, error) (bool, error), opts ...IterableOption) (*T, error) {
81+
conf := config{
82+
headerParams: map[string]string{},
83+
maxRetries: -1,
84+
timeout: func(count int) time.Duration {
85+
return 0 * time.Millisecond
86+
},
87+
}
88+
89+
for _, opt := range opts {
90+
opt.apply(&conf)
91+
}
92+
93+
var executor func(*T, error) (*T, error)
94+
95+
retryCount := 0
96+
97+
executor = func(previousResponse *T, previousError error) (*T, error) {
98+
response, responseErr := execute(previousResponse, previousError)
99+
100+
retryCount++
101+
102+
if conf.aggregator != nil {
103+
conf.aggregator(response, responseErr)
104+
}
105+
106+
canStop, err := validate(response, responseErr)
107+
if canStop || err != nil {
108+
return response, err
109+
}
110+
111+
if conf.maxRetries >= 0 && retryCount >= conf.maxRetries {
112+
return nil, errs.NewWaitError(fmt.Sprintf("The maximum number of retries exceeded. (%d/%d)", retryCount, conf.maxRetries))
113+
}
114+
115+
time.Sleep(conf.timeout(retryCount))
116+
117+
return executor(response, responseErr)
118+
}
119+
120+
return executor(nil, nil)
121+
}

0 commit comments

Comments
 (0)