Skip to content

Commit a850215

Browse files
Support for reverse attachments, spawn fixes, logging improvments (#25)
1 parent a7e25db commit a850215

18 files changed

+360
-242
lines changed

README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22

33
## Release Notes
44

5+
### v1.2.1
6+
7+
- Reduced the `delayFactor` to minimize unnecessary delays.
8+
- Correct the setting of the `lastSyncStarted` timestamp.
9+
- Improve logging for attachment extraction and loading.
10+
- Fix several bugs related to the control protocol.
11+
512
### v1.2.0
613

714
- Add support for loading attachments from DevRev to external system.
@@ -361,7 +368,7 @@ This phase is defined in `load-attachments.ts` and is responsible for loading th
361368
Loading is done by providing the create function to create attachments in the external system.
362369
363370
```typescript
364-
processTask({
371+
processTask({
365372
task: async ({ adapter }) => {
366373
const { reports, processed_files } = await adapter.loadAttachments({
367374
create,
@@ -380,7 +387,6 @@ Loading is done by providing the create function to create attachments in the ex
380387
});
381388
},
382389
});
383-
384390
```
385391
386392
The loading function `create` provides loading to the external system, to make API calls to the external system to create the attachments and handle errors and external system's rate limiting.

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@devrev/ts-adaas",
3-
"version": "1.2.0-beta.1",
3+
"version": "1.2.1",
44
"description": "DevRev ADaaS (AirDrop-as-a-Service) Typescript SDK.",
55
"type": "commonjs",
66
"main": "./dist/index.js",

src/common/control-protocol.ts

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { axios, axiosClient } from '../http/axios-client';
1+
import { AxiosResponse } from 'axios';
2+
import { axiosClient } from '../http/axios-client';
23
import {
34
AirdropEvent,
45
EventData,
@@ -7,7 +8,6 @@ import {
78
LoaderEvent,
89
} from '../types/extraction';
910
import { LoaderEventType } from '../types/loading';
10-
import { serializeAxiosError } from '../logger/logger';
1111

1212
export interface EmitInterface {
1313
event: AirdropEvent;
@@ -19,7 +19,7 @@ export const emit = async ({
1919
event,
2020
eventType,
2121
data,
22-
}: EmitInterface): Promise<void | Error> => {
22+
}: EmitInterface): Promise<AxiosResponse> => {
2323
const newEvent: ExtractorEvent | LoaderEvent = {
2424
event_type: eventType,
2525
event_context: event.payload.event_context,
@@ -28,37 +28,17 @@ export const emit = async ({
2828
},
2929
};
3030

31-
return new Promise<void>(async (resolve, reject) => {
32-
console.info('Emitting event', JSON.stringify(newEvent));
31+
console.info('Emitting event', JSON.stringify(newEvent));
3332

34-
try {
35-
await axiosClient.post(
36-
event.payload.event_context.callback_url,
37-
{ ...newEvent },
38-
{
39-
headers: {
40-
Accept: 'application/json, text/plain, */*',
41-
Authorization: event.context.secrets.service_account_token,
42-
'Content-Type': 'application/json',
43-
},
44-
}
45-
);
46-
47-
resolve();
48-
} catch (error) {
49-
if (axios.isAxiosError(error)) {
50-
console.error(
51-
`Failed to emit event with event type ${eventType}.`,
52-
serializeAxiosError(error)
53-
);
54-
} else {
55-
// TODO: Stop it through UI or think about retrying this request. Implement exponential retry mechanism.
56-
console.error(
57-
`Failed to emit event with event type ${eventType}.`,
58-
error
59-
);
60-
}
61-
reject();
33+
return axiosClient.post(
34+
event.payload.event_context.callback_url,
35+
{ ...newEvent },
36+
{
37+
headers: {
38+
Accept: 'application/json, text/plain, */*',
39+
Authorization: event.context.secrets.service_account_token,
40+
'Content-Type': 'application/json',
41+
},
6242
}
63-
});
43+
);
6444
};

src/http/axios-client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ axiosRetry(axiosClient, {
1010
'Retry attempt: ' + retryCount + 'to url: ' + error.config?.url + '.'
1111
);
1212

13-
// Exponential backoff algorithm: 1 * 2 ^ retryCount * 5000ms
14-
return axiosRetry.exponentialDelay(retryCount, error, 5000);
13+
// Exponential backoff algorithm: 1 * 2 ^ retryCount * 1000ms
14+
return axiosRetry.exponentialDelay(retryCount, error, 1000);
1515
},
1616
retryCondition: (error: AxiosError) => {
1717
if (

src/repo/repo.interfaces.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export interface NormalizedAttachment {
4141
file_name: string;
4242
author_id: string;
4343
parent_id: string;
44-
inline?: boolean;
44+
grand_parent_id?: number;
4545
}
4646

4747
/**

src/repo/repo.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ export class Repo {
7878
async push(items: Item[]): Promise<boolean | ErrorRecord> {
7979
let recordsToPush: (NormalizedItem | NormalizedAttachment | Item)[];
8080

81+
if (!items || items.length === 0) {
82+
console.log(`No items to push for type ${this.itemType}. Skipping push.`);
83+
return true;
84+
}
85+
8186
// Normalize items if needed
8287
if (
8388
this.normalize &&
@@ -92,10 +97,6 @@ export class Repo {
9297
// Add the new records to the items array
9398
this.items.push(...recordsToPush);
9499

95-
console.info(
96-
`Extracted ${recordsToPush.length} new items of type ${this.itemType}. Total number of items in repo: ${this.items.length}.`
97-
);
98-
99100
// Upload in batches while the number of items exceeds the batch size
100101
while (this.items.length >= ARTIFACT_BATCH_SIZE) {
101102
// Slice out a batch of ARTIFACT_BATCH_SIZE items to upload

src/state/state.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { axios, axiosClient } from '../http/axios-client';
22

3-
import { AirdropEvent, SyncMode } from '../types/extraction';
3+
import { AirdropEvent, EventType, SyncMode } from '../types/extraction';
44
import { STATELESS_EVENT_TYPES } from '../common/constants';
55
import { serializeAxiosError, getPrintableState } from '../logger/logger';
66
import { ErrorRecord } from '../types/common';
@@ -22,6 +22,14 @@ export async function createAdapterState<ConnectorState>({
2222

2323
if (!STATELESS_EVENT_TYPES.includes(event.payload.event_type)) {
2424
await as.fetchState(newInitialState);
25+
26+
if (
27+
event.payload.event_type === EventType.ExtractionDataStart &&
28+
!as.state.lastSyncStarted
29+
) {
30+
as.state.lastSyncStarted = new Date().toISOString();
31+
console.log(`Setting lastSyncStarted to ${as.state.lastSyncStarted}.`);
32+
}
2533
}
2634

2735
return as;
@@ -44,7 +52,7 @@ export class State<ConnectorState> {
4452
},
4553
}
4654
: {
47-
lastSyncStarted: new Date().toISOString(),
55+
lastSyncStarted: '',
4856
lastSuccessfulSyncStarted: '',
4957
toDevRev: {
5058
attachmentsMetadata: {
@@ -155,7 +163,7 @@ export class State<ConnectorState> {
155163
this.state = state;
156164

157165
console.log(
158-
'State not found, returning initial state. Current state:',
166+
'State not found, returning initial state. Current state',
159167
getPrintableState(this.state)
160168
);
161169
await this.postState(this.state);

src/types/extraction.ts

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import { Artifact } from '../uploader/uploader.interfaces';
55
import { ErrorRecord } from './common';
66

77
import { DonV2, LoaderReport, RateLimited } from './loading';
8-
import { NormalizedAttachment } from 'repo/repo.interfaces';
8+
import { NormalizedAttachment } from '../repo/repo.interfaces';
99
import { AxiosResponse } from 'axios';
10+
import { WorkerAdapter } from '../workers/worker-adapter';
1011

1112
/**
1213
* EventType is an enum that defines the different types of events that can be sent to the external extractor from ADaaS.
@@ -186,6 +187,14 @@ export interface EventData {
186187
stats_file?: string;
187188
}
188189

190+
191+
/**
192+
* WorkerMetadata is an interface that defines the structure of the worker metadata that is sent from the external extractor to ADaaS.
193+
*/
194+
export interface WorkerMetadata {
195+
adaas_library_version: string;
196+
}
197+
189198
/**
190199
* DomainObject is an interface that defines the structure of a domain object that can be extracted.
191200
* It must contain a name, a next chunk ID, the pages, the last modified date, whether it is done, and the count.
@@ -238,6 +247,7 @@ export interface ExtractorEvent {
238247
event_type: string;
239248
event_context: EventContext;
240249
event_data?: EventData;
250+
worker_metadata?: WorkerMetadata;
241251
}
242252

243253
/**
@@ -247,6 +257,7 @@ export interface LoaderEvent {
247257
event_type: string;
248258
event_context: EventContext;
249259
event_data?: EventData;
260+
worker_metadata?: WorkerMetadata;
250261
}
251262

252263
export type ExternalSystemAttachmentStreamingFunction = ({
@@ -270,3 +281,61 @@ export interface StreamAttachmentsResponse {
270281
report?: LoaderReport;
271282
rateLimit?: RateLimited;
272283
}
284+
285+
export type ProcessAttachmentReturnType =
286+
| {
287+
delay?: number;
288+
error?: { message: string };
289+
}
290+
| undefined;
291+
292+
export type StreamAttachmentsReturnType =
293+
| {
294+
delay?: number;
295+
error?: ErrorRecord;
296+
}
297+
| undefined;
298+
299+
export type ExternalSystemAttachmentReducerFunction<
300+
Batch,
301+
NewBatch,
302+
ConnectorState,
303+
> = ({
304+
attachments,
305+
adapter,
306+
}: {
307+
attachments: Batch;
308+
adapter: WorkerAdapter<ConnectorState>;
309+
}) => NewBatch;
310+
311+
export type ExternalProcessAttachmentFunction = ({
312+
attachment,
313+
stream,
314+
}: {
315+
attachment: NormalizedAttachment;
316+
stream: ExternalSystemAttachmentStreamingFunction;
317+
}) => Promise<ProcessAttachmentReturnType>;
318+
319+
export type ExternalSystemAttachmentIteratorFunction<NewBatch, ConnectorState> =
320+
({
321+
reducedAttachments,
322+
adapter,
323+
stream,
324+
}: {
325+
reducedAttachments: NewBatch;
326+
adapter: WorkerAdapter<ConnectorState>;
327+
stream: ExternalSystemAttachmentStreamingFunction;
328+
}) => Promise<ProcessAttachmentReturnType>;
329+
330+
export interface ExternalSystemAttachmentProcessors<
331+
ConnectorState,
332+
Batch,
333+
NewBatch,
334+
> {
335+
reducer: ExternalSystemAttachmentReducerFunction<
336+
Batch,
337+
NewBatch,
338+
ConnectorState
339+
>;
340+
iterator: ExternalSystemAttachmentIteratorFunction<NewBatch, ConnectorState>;
341+
}

src/types/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ export {
2525
ExternalSystemAttachmentStreamingParams,
2626
ExternalSystemAttachmentStreamingResponse,
2727
ExternalSystemAttachmentStreamingFunction,
28+
ExternalProcessAttachmentFunction,
29+
ExternalSystemAttachmentReducerFunction,
30+
ExternalSystemAttachmentIteratorFunction,
2831
} from './extraction';
2932

3033
// Loading

0 commit comments

Comments
 (0)