Skip to content

Support for reverse attachments, spawn fixes, logging improvments #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## Release Notes

### v1.2.1

- Reduced the `delayFactor` to minimize unnecessary delays.
- Correct the setting of the `lastSyncStarted` timestamp.
- Improve logging for attachment extraction and loading.
- Fix several bugs related to the control protocol.

### v1.2.0

- Add support for loading attachments from DevRev to external system.
Expand Down Expand Up @@ -361,7 +368,7 @@ This phase is defined in `load-attachments.ts` and is responsible for loading th
Loading is done by providing the create function to create attachments in the external system.

```typescript
processTask({
processTask({
task: async ({ adapter }) => {
const { reports, processed_files } = await adapter.loadAttachments({
create,
Expand All @@ -380,7 +387,6 @@ Loading is done by providing the create function to create attachments in the ex
});
},
});

```

The loading function `create` provides loading to the external system, to make API calls to the external system to create the attachments and handle errors and external system's rate limiting.
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@devrev/ts-adaas",
"version": "1.2.0-beta.1",
"version": "1.2.1",
"description": "DevRev ADaaS (AirDrop-as-a-Service) Typescript SDK.",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
48 changes: 14 additions & 34 deletions src/common/control-protocol.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { axios, axiosClient } from '../http/axios-client';
import { AxiosResponse } from 'axios';
import { axiosClient } from '../http/axios-client';
import {
AirdropEvent,
EventData,
Expand All @@ -7,7 +8,6 @@ import {
LoaderEvent,
} from '../types/extraction';
import { LoaderEventType } from '../types/loading';
import { serializeAxiosError } from '../logger/logger';

export interface EmitInterface {
event: AirdropEvent;
Expand All @@ -19,7 +19,7 @@ export const emit = async ({
event,
eventType,
data,
}: EmitInterface): Promise<void | Error> => {
}: EmitInterface): Promise<AxiosResponse> => {
const newEvent: ExtractorEvent | LoaderEvent = {
event_type: eventType,
event_context: event.payload.event_context,
Expand All @@ -28,37 +28,17 @@ export const emit = async ({
},
};

return new Promise<void>(async (resolve, reject) => {
console.info('Emitting event', JSON.stringify(newEvent));
console.info('Emitting event', JSON.stringify(newEvent));

try {
await axiosClient.post(
event.payload.event_context.callback_url,
{ ...newEvent },
{
headers: {
Accept: 'application/json, text/plain, */*',
Authorization: event.context.secrets.service_account_token,
'Content-Type': 'application/json',
},
}
);

resolve();
} catch (error) {
if (axios.isAxiosError(error)) {
console.error(
`Failed to emit event with event type ${eventType}.`,
serializeAxiosError(error)
);
} else {
// TODO: Stop it through UI or think about retrying this request. Implement exponential retry mechanism.
console.error(
`Failed to emit event with event type ${eventType}.`,
error
);
}
reject();
return axiosClient.post(
event.payload.event_context.callback_url,
{ ...newEvent },
{
headers: {
Accept: 'application/json, text/plain, */*',
Authorization: event.context.secrets.service_account_token,
'Content-Type': 'application/json',
},
}
});
);
};
4 changes: 2 additions & 2 deletions src/http/axios-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ axiosRetry(axiosClient, {
'Retry attempt: ' + retryCount + 'to url: ' + error.config?.url + '.'
);

// Exponential backoff algorithm: 1 * 2 ^ retryCount * 5000ms
return axiosRetry.exponentialDelay(retryCount, error, 5000);
// Exponential backoff algorithm: 1 * 2 ^ retryCount * 1000ms
return axiosRetry.exponentialDelay(retryCount, error, 1000);
},
retryCondition: (error: AxiosError) => {
if (
Expand Down
2 changes: 1 addition & 1 deletion src/repo/repo.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export interface NormalizedAttachment {
file_name: string;
author_id: string;
parent_id: string;
inline?: boolean;
grand_parent_id?: number;
}

/**
Expand Down
9 changes: 5 additions & 4 deletions src/repo/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ export class Repo {
async push(items: Item[]): Promise<boolean | ErrorRecord> {
let recordsToPush: (NormalizedItem | NormalizedAttachment | Item)[];

if (!items || items.length === 0) {
console.log(`No items to push for type ${this.itemType}. Skipping push.`);
return true;
}

// Normalize items if needed
if (
this.normalize &&
Expand All @@ -92,10 +97,6 @@ export class Repo {
// Add the new records to the items array
this.items.push(...recordsToPush);

console.info(
`Extracted ${recordsToPush.length} new items of type ${this.itemType}. Total number of items in repo: ${this.items.length}.`
);

// Upload in batches while the number of items exceeds the batch size
while (this.items.length >= ARTIFACT_BATCH_SIZE) {
// Slice out a batch of ARTIFACT_BATCH_SIZE items to upload
Expand Down
14 changes: 11 additions & 3 deletions src/state/state.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { axios, axiosClient } from '../http/axios-client';

import { AirdropEvent, SyncMode } from '../types/extraction';
import { AirdropEvent, EventType, SyncMode } from '../types/extraction';
import { STATELESS_EVENT_TYPES } from '../common/constants';
import { serializeAxiosError, getPrintableState } from '../logger/logger';
import { ErrorRecord } from '../types/common';
Expand All @@ -22,6 +22,14 @@ export async function createAdapterState<ConnectorState>({

if (!STATELESS_EVENT_TYPES.includes(event.payload.event_type)) {
await as.fetchState(newInitialState);

if (
event.payload.event_type === EventType.ExtractionDataStart &&
!as.state.lastSyncStarted
) {
as.state.lastSyncStarted = new Date().toISOString();
console.log(`Setting lastSyncStarted to ${as.state.lastSyncStarted}.`);
}
}

return as;
Expand All @@ -44,7 +52,7 @@ export class State<ConnectorState> {
},
}
: {
lastSyncStarted: new Date().toISOString(),
lastSyncStarted: '',
lastSuccessfulSyncStarted: '',
toDevRev: {
attachmentsMetadata: {
Expand Down Expand Up @@ -155,7 +163,7 @@ export class State<ConnectorState> {
this.state = state;

console.log(
'State not found, returning initial state. Current state:',
'State not found, returning initial state. Current state',
getPrintableState(this.state)
);
await this.postState(this.state);
Expand Down
71 changes: 70 additions & 1 deletion src/types/extraction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import { Artifact } from '../uploader/uploader.interfaces';
import { ErrorRecord } from './common';

import { DonV2, LoaderReport, RateLimited } from './loading';
import { NormalizedAttachment } from 'repo/repo.interfaces';
import { NormalizedAttachment } from '../repo/repo.interfaces';
import { AxiosResponse } from 'axios';
import { WorkerAdapter } from '../workers/worker-adapter';

/**
* EventType is an enum that defines the different types of events that can be sent to the external extractor from ADaaS.
Expand Down Expand Up @@ -186,6 +187,14 @@ export interface EventData {
stats_file?: string;
}


/**
* WorkerMetadata is an interface that defines the structure of the worker metadata that is sent from the external extractor to ADaaS.
*/
export interface WorkerMetadata {
adaas_library_version: string;
}

/**
* DomainObject is an interface that defines the structure of a domain object that can be extracted.
* It must contain a name, a next chunk ID, the pages, the last modified date, whether it is done, and the count.
Expand Down Expand Up @@ -238,6 +247,7 @@ export interface ExtractorEvent {
event_type: string;
event_context: EventContext;
event_data?: EventData;
worker_metadata?: WorkerMetadata;
}

/**
Expand All @@ -247,6 +257,7 @@ export interface LoaderEvent {
event_type: string;
event_context: EventContext;
event_data?: EventData;
worker_metadata?: WorkerMetadata;
}

export type ExternalSystemAttachmentStreamingFunction = ({
Expand All @@ -270,3 +281,61 @@ export interface StreamAttachmentsResponse {
report?: LoaderReport;
rateLimit?: RateLimited;
}

export type ProcessAttachmentReturnType =
| {
delay?: number;
error?: { message: string };
}
| undefined;

export type StreamAttachmentsReturnType =
| {
delay?: number;
error?: ErrorRecord;
}
| undefined;

export type ExternalSystemAttachmentReducerFunction<
Batch,
NewBatch,
ConnectorState,
> = ({
attachments,
adapter,
}: {
attachments: Batch;
adapter: WorkerAdapter<ConnectorState>;
}) => NewBatch;

export type ExternalProcessAttachmentFunction = ({
attachment,
stream,
}: {
attachment: NormalizedAttachment;
stream: ExternalSystemAttachmentStreamingFunction;
}) => Promise<ProcessAttachmentReturnType>;

export type ExternalSystemAttachmentIteratorFunction<NewBatch, ConnectorState> =
({
reducedAttachments,
adapter,
stream,
}: {
reducedAttachments: NewBatch;
adapter: WorkerAdapter<ConnectorState>;
stream: ExternalSystemAttachmentStreamingFunction;
}) => Promise<ProcessAttachmentReturnType>;

export interface ExternalSystemAttachmentProcessors<
ConnectorState,
Batch,
NewBatch,
> {
reducer: ExternalSystemAttachmentReducerFunction<
Batch,
NewBatch,
ConnectorState
>;
iterator: ExternalSystemAttachmentIteratorFunction<NewBatch, ConnectorState>;
}
3 changes: 3 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ export {
ExternalSystemAttachmentStreamingParams,
ExternalSystemAttachmentStreamingResponse,
ExternalSystemAttachmentStreamingFunction,
ExternalProcessAttachmentFunction,
ExternalSystemAttachmentReducerFunction,
ExternalSystemAttachmentIteratorFunction,
} from './extraction';

// Loading
Expand Down
2 changes: 2 additions & 0 deletions src/types/loading.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export interface ExternalSystemAttachment {
created_date: string;
modified_by_id: string;
modified_date: string;
parent_id?: string;
grand_parent_id?: string;
}

export interface ExternalSystemItem {
Expand Down
4 changes: 1 addition & 3 deletions src/types/workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ export interface WorkerAdapterOptions {
timeout?: number;
}

export type SpawnResolve = (value: boolean | PromiseLike<boolean>) => void;

/**
* SpawnInterface is an interface for Spawn class.
* @interface SpawnInterface
Expand All @@ -49,7 +47,7 @@ export interface SpawnInterface {
event: AirdropEvent;
worker: Worker;
options?: WorkerAdapterOptions;
resolve: SpawnResolve;
resolve: (value: void | PromiseLike<void>) => void;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/uploader/uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export class Uploader {
item_count: Array.isArray(fetchedObjects) ? fetchedObjects.length : 1,
};

console.log('Successful upload of artifact: ', artifact);
console.log('Successful upload of artifact', artifact);
return { artifact };
}

Expand Down
Loading