diff --git a/README.md b/README.md index f6853d8..3c704af 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,13 @@ ## Release Notes +### v1.2.1 + +- Reduced the `delayFactor` to minimize unnecessary delays. +- Correct the setting of the `lastSyncStarted` timestamp. +- Improve logging for attachment extraction and loading. +- Fix several bugs related to the control protocol. + ### v1.2.0 - Add support for loading attachments from DevRev to external system. @@ -361,7 +368,7 @@ This phase is defined in `load-attachments.ts` and is responsible for loading th Loading is done by providing the create function to create attachments in the external system. ```typescript - processTask({ +processTask({ task: async ({ adapter }) => { const { reports, processed_files } = await adapter.loadAttachments({ create, @@ -380,7 +387,6 @@ Loading is done by providing the create function to create attachments in the ex }); }, }); - ``` The loading function `create` provides loading to the external system, to make API calls to the external system to create the attachments and handle errors and external system's rate limiting. diff --git a/package-lock.json b/package-lock.json index 9f162f1..91348fb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@devrev/ts-adaas", - "version": "1.2.0", + "version": "1.2.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@devrev/ts-adaas", - "version": "1.2.0", + "version": "1.2.1", "license": "ISC", "dependencies": { "@devrev/typescript-sdk": "^1.1.27", diff --git a/package.json b/package.json index 5d82923..4d51ffc 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@devrev/ts-adaas", - "version": "1.2.0-beta.1", + "version": "1.2.1", "description": "DevRev ADaaS (AirDrop-as-a-Service) Typescript SDK.", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/common/control-protocol.ts b/src/common/control-protocol.ts index ce29f1a..28e70f0 100644 --- a/src/common/control-protocol.ts +++ b/src/common/control-protocol.ts @@ -1,4 +1,5 @@ -import { axios, axiosClient } from '../http/axios-client'; +import { AxiosResponse } from 'axios'; +import { axiosClient } from '../http/axios-client'; import { AirdropEvent, EventData, @@ -7,7 +8,6 @@ import { LoaderEvent, } from '../types/extraction'; import { LoaderEventType } from '../types/loading'; -import { serializeAxiosError } from '../logger/logger'; export interface EmitInterface { event: AirdropEvent; @@ -19,7 +19,7 @@ export const emit = async ({ event, eventType, data, -}: EmitInterface): Promise => { +}: EmitInterface): Promise => { const newEvent: ExtractorEvent | LoaderEvent = { event_type: eventType, event_context: event.payload.event_context, @@ -28,37 +28,17 @@ export const emit = async ({ }, }; - return new Promise(async (resolve, reject) => { - console.info('Emitting event', JSON.stringify(newEvent)); + console.info('Emitting event', JSON.stringify(newEvent)); - try { - await axiosClient.post( - event.payload.event_context.callback_url, - { ...newEvent }, - { - headers: { - Accept: 'application/json, text/plain, */*', - Authorization: event.context.secrets.service_account_token, - 'Content-Type': 'application/json', - }, - } - ); - - resolve(); - } catch (error) { - if (axios.isAxiosError(error)) { - console.error( - `Failed to emit event with event type ${eventType}.`, - serializeAxiosError(error) - ); - } else { - // TODO: Stop it through UI or think about retrying this request. Implement exponential retry mechanism. - console.error( - `Failed to emit event with event type ${eventType}.`, - error - ); - } - reject(); + return axiosClient.post( + event.payload.event_context.callback_url, + { ...newEvent }, + { + headers: { + Accept: 'application/json, text/plain, */*', + Authorization: event.context.secrets.service_account_token, + 'Content-Type': 'application/json', + }, } - }); + ); }; diff --git a/src/http/axios-client.ts b/src/http/axios-client.ts index ff14265..d5e7a0e 100644 --- a/src/http/axios-client.ts +++ b/src/http/axios-client.ts @@ -10,8 +10,8 @@ axiosRetry(axiosClient, { 'Retry attempt: ' + retryCount + 'to url: ' + error.config?.url + '.' ); - // Exponential backoff algorithm: 1 * 2 ^ retryCount * 5000ms - return axiosRetry.exponentialDelay(retryCount, error, 5000); + // Exponential backoff algorithm: 1 * 2 ^ retryCount * 1000ms + return axiosRetry.exponentialDelay(retryCount, error, 1000); }, retryCondition: (error: AxiosError) => { if ( diff --git a/src/repo/repo.interfaces.ts b/src/repo/repo.interfaces.ts index c18a456..cd3a58b 100644 --- a/src/repo/repo.interfaces.ts +++ b/src/repo/repo.interfaces.ts @@ -41,7 +41,7 @@ export interface NormalizedAttachment { file_name: string; author_id: string; parent_id: string; - inline?: boolean; + grand_parent_id?: number; } /** diff --git a/src/repo/repo.ts b/src/repo/repo.ts index 1f97ddc..1c14576 100644 --- a/src/repo/repo.ts +++ b/src/repo/repo.ts @@ -78,6 +78,11 @@ export class Repo { async push(items: Item[]): Promise { let recordsToPush: (NormalizedItem | NormalizedAttachment | Item)[]; + if (!items || items.length === 0) { + console.log(`No items to push for type ${this.itemType}. Skipping push.`); + return true; + } + // Normalize items if needed if ( this.normalize && @@ -92,10 +97,6 @@ export class Repo { // Add the new records to the items array this.items.push(...recordsToPush); - console.info( - `Extracted ${recordsToPush.length} new items of type ${this.itemType}. Total number of items in repo: ${this.items.length}.` - ); - // Upload in batches while the number of items exceeds the batch size while (this.items.length >= ARTIFACT_BATCH_SIZE) { // Slice out a batch of ARTIFACT_BATCH_SIZE items to upload diff --git a/src/state/state.ts b/src/state/state.ts index 9385a18..3f2d468 100644 --- a/src/state/state.ts +++ b/src/state/state.ts @@ -1,6 +1,6 @@ import { axios, axiosClient } from '../http/axios-client'; -import { AirdropEvent, SyncMode } from '../types/extraction'; +import { AirdropEvent, EventType, SyncMode } from '../types/extraction'; import { STATELESS_EVENT_TYPES } from '../common/constants'; import { serializeAxiosError, getPrintableState } from '../logger/logger'; import { ErrorRecord } from '../types/common'; @@ -22,6 +22,14 @@ export async function createAdapterState({ if (!STATELESS_EVENT_TYPES.includes(event.payload.event_type)) { await as.fetchState(newInitialState); + + if ( + event.payload.event_type === EventType.ExtractionDataStart && + !as.state.lastSyncStarted + ) { + as.state.lastSyncStarted = new Date().toISOString(); + console.log(`Setting lastSyncStarted to ${as.state.lastSyncStarted}.`); + } } return as; @@ -44,7 +52,7 @@ export class State { }, } : { - lastSyncStarted: new Date().toISOString(), + lastSyncStarted: '', lastSuccessfulSyncStarted: '', toDevRev: { attachmentsMetadata: { @@ -155,7 +163,7 @@ export class State { this.state = state; console.log( - 'State not found, returning initial state. Current state:', + 'State not found, returning initial state. Current state', getPrintableState(this.state) ); await this.postState(this.state); diff --git a/src/types/extraction.ts b/src/types/extraction.ts index c8cb9d1..17a2e9d 100644 --- a/src/types/extraction.ts +++ b/src/types/extraction.ts @@ -5,8 +5,9 @@ import { Artifact } from '../uploader/uploader.interfaces'; import { ErrorRecord } from './common'; import { DonV2, LoaderReport, RateLimited } from './loading'; -import { NormalizedAttachment } from 'repo/repo.interfaces'; +import { NormalizedAttachment } from '../repo/repo.interfaces'; import { AxiosResponse } from 'axios'; +import { WorkerAdapter } from '../workers/worker-adapter'; /** * EventType is an enum that defines the different types of events that can be sent to the external extractor from ADaaS. @@ -186,6 +187,14 @@ export interface EventData { stats_file?: string; } + +/** + * WorkerMetadata is an interface that defines the structure of the worker metadata that is sent from the external extractor to ADaaS. + */ +export interface WorkerMetadata { + adaas_library_version: string; +} + /** * DomainObject is an interface that defines the structure of a domain object that can be extracted. * It must contain a name, a next chunk ID, the pages, the last modified date, whether it is done, and the count. @@ -238,6 +247,7 @@ export interface ExtractorEvent { event_type: string; event_context: EventContext; event_data?: EventData; + worker_metadata?: WorkerMetadata; } /** @@ -247,6 +257,7 @@ export interface LoaderEvent { event_type: string; event_context: EventContext; event_data?: EventData; + worker_metadata?: WorkerMetadata; } export type ExternalSystemAttachmentStreamingFunction = ({ @@ -270,3 +281,61 @@ export interface StreamAttachmentsResponse { report?: LoaderReport; rateLimit?: RateLimited; } + +export type ProcessAttachmentReturnType = + | { + delay?: number; + error?: { message: string }; + } + | undefined; + +export type StreamAttachmentsReturnType = + | { + delay?: number; + error?: ErrorRecord; + } + | undefined; + +export type ExternalSystemAttachmentReducerFunction< + Batch, + NewBatch, + ConnectorState, +> = ({ + attachments, + adapter, +}: { + attachments: Batch; + adapter: WorkerAdapter; +}) => NewBatch; + +export type ExternalProcessAttachmentFunction = ({ + attachment, + stream, +}: { + attachment: NormalizedAttachment; + stream: ExternalSystemAttachmentStreamingFunction; +}) => Promise; + +export type ExternalSystemAttachmentIteratorFunction = + ({ + reducedAttachments, + adapter, + stream, + }: { + reducedAttachments: NewBatch; + adapter: WorkerAdapter; + stream: ExternalSystemAttachmentStreamingFunction; + }) => Promise; + +export interface ExternalSystemAttachmentProcessors< + ConnectorState, + Batch, + NewBatch, +> { + reducer: ExternalSystemAttachmentReducerFunction< + Batch, + NewBatch, + ConnectorState + >; + iterator: ExternalSystemAttachmentIteratorFunction; +} diff --git a/src/types/index.ts b/src/types/index.ts index c52edbe..9197c8c 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -25,6 +25,9 @@ export { ExternalSystemAttachmentStreamingParams, ExternalSystemAttachmentStreamingResponse, ExternalSystemAttachmentStreamingFunction, + ExternalProcessAttachmentFunction, + ExternalSystemAttachmentReducerFunction, + ExternalSystemAttachmentIteratorFunction, } from './extraction'; // Loading diff --git a/src/types/loading.ts b/src/types/loading.ts index 743f175..de2e689 100644 --- a/src/types/loading.ts +++ b/src/types/loading.ts @@ -31,6 +31,8 @@ export interface ExternalSystemAttachment { created_date: string; modified_by_id: string; modified_date: string; + parent_id?: string; + grand_parent_id?: string; } export interface ExternalSystemItem { diff --git a/src/types/workers.ts b/src/types/workers.ts index 4431f27..2439072 100644 --- a/src/types/workers.ts +++ b/src/types/workers.ts @@ -36,8 +36,6 @@ export interface WorkerAdapterOptions { timeout?: number; } -export type SpawnResolve = (value: boolean | PromiseLike) => void; - /** * SpawnInterface is an interface for Spawn class. * @interface SpawnInterface @@ -49,7 +47,7 @@ export interface SpawnInterface { event: AirdropEvent; worker: Worker; options?: WorkerAdapterOptions; - resolve: SpawnResolve; + resolve: (value: void | PromiseLike) => void; } /** diff --git a/src/uploader/uploader.ts b/src/uploader/uploader.ts index 0b966f5..bf62586 100644 --- a/src/uploader/uploader.ts +++ b/src/uploader/uploader.ts @@ -84,7 +84,7 @@ export class Uploader { item_count: Array.isArray(fetchedObjects) ? fetchedObjects.length : 1, }; - console.log('Successful upload of artifact: ', artifact); + console.log('Successful upload of artifact', artifact); return { artifact }; } diff --git a/src/workers/default-workers/attachments-extraction.ts b/src/workers/default-workers/attachments-extraction.ts index 2ca6315..56d37db 100644 --- a/src/workers/default-workers/attachments-extraction.ts +++ b/src/workers/default-workers/attachments-extraction.ts @@ -22,13 +22,16 @@ const getAttachmentStream = async ({ return { httpStream: fileStreamResponse }; } catch (error) { if (axios.isAxiosError(error)) { - console.error( - 'Error while fetching attachment from URL.', + console.warn( + `Error while fetching attachment ${id} from URL.`, serializeAxiosError(error) ); + console.warn('Failed attachment metadata', item); } else { - console.error('Error while fetching attachment from URL.', error); + console.warn(`Error while fetching attachment ${id} from URL.`, error); + console.warn('Failed attachment metadata', item); } + return { error: { message: 'Error while fetching attachment ' + id + ' from URL.', @@ -39,20 +42,25 @@ const getAttachmentStream = async ({ processTask({ task: async ({ adapter }) => { - const { error, delay } = await adapter.streamAttachments({ - stream: getAttachmentStream, - }); - - if (delay) { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsDelay, { - delay, - }); - } else if (error) { - await adapter.emit(ExtractorEventType.ExtractionAttachmentsError, { - error, + try { + const response = await adapter.streamAttachments({ + stream: getAttachmentStream, }); + + if (response?.delay) { + await adapter.emit(ExtractorEventType.ExtractionAttachmentsDelay, { + delay: response.delay, + }); + } else if (response?.error) { + await adapter.emit(ExtractorEventType.ExtractionAttachmentsError, { + error: response.error, + }); + } else { + await adapter.emit(ExtractorEventType.ExtractionAttachmentsDone); + } + } catch (error) { + console.error('An error occured while processing a task.', error); } - await adapter.emit(ExtractorEventType.ExtractionAttachmentsDone); }, onTimeout: async ({ adapter }) => { await adapter.postState(); diff --git a/src/workers/default-workers/data-loading.ts b/src/workers/default-workers/data-loading.ts deleted file mode 100644 index e3a1762..0000000 --- a/src/workers/default-workers/data-loading.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { processTask } from 'workers/process-task'; -import { LoaderEventType } from '../../types/loading'; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(LoaderEventType.DataLoadingDone, { - reports: adapter.reports, - processed_files: adapter.processedFiles, - }); - }, - onTimeout: async ({ adapter }) => { - await adapter.postState(); - await adapter.emit(LoaderEventType.DataLoadingProgress, { - reports: adapter.reports, - processed_files: adapter.processedFiles, - }); - }, -}); diff --git a/src/workers/default-workers/loader-state-deletion.ts b/src/workers/default-workers/loader-state-deletion.ts deleted file mode 100644 index 1858dd6..0000000 --- a/src/workers/default-workers/loader-state-deletion.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { processTask, LoaderEventType } from '../../index'; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(LoaderEventType.LoaderStateDeletionDone); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(LoaderEventType.LoaderStateDeletionError, { - error: { - message: 'Failed to delete data. Lambda timeout.', - }, - }); - }, -}); diff --git a/src/workers/spawn.ts b/src/workers/spawn.ts index 81b2404..2a9c169 100644 --- a/src/workers/spawn.ts +++ b/src/workers/spawn.ts @@ -1,5 +1,4 @@ -import { Worker } from 'node:worker_threads'; - +import axios from 'axios'; import { AirdropEvent, EventType, @@ -7,14 +6,13 @@ import { } from '../types/extraction'; import { emit } from '../common/control-protocol'; import { getTimeoutErrorEventType } from '../common/helpers'; -import { Logger } from '../logger/logger'; +import { Logger, serializeAxiosError } from '../logger/logger'; import { GetWorkerPathInterface, WorkerEvent, WorkerMessageSubject, SpawnFactoryInterface, SpawnInterface, - SpawnResolve, } from '../types/workers'; import { createWorker } from './create-worker'; @@ -86,11 +84,8 @@ export async function spawn({ initialState, workerPath, options, -}: SpawnFactoryInterface): Promise< - boolean | PromiseLike -> { +}: SpawnFactoryInterface): Promise { const logger = new Logger({ event, options }); - const script = getWorkerPath({ event, connectorWorkerPath: workerPath, @@ -106,7 +101,7 @@ export async function spawn({ options, }); - return new Promise((resolve) => { + return new Promise((resolve) => { new Spawn({ event, worker, @@ -115,58 +110,70 @@ export async function spawn({ }); }); } catch (error) { - logger.error('Worker error while processing task.', error); - return false; + logger.error('Worker error while processing task', error); } } else { - await emit({ - event, - eventType: ExtractorEventType.UnknownEventType, - data: { - error: { - message: - 'Unrecognized event type in spawn ' + - event.payload.event_type + - '.', + console.error( + 'Script was not found for event type: ' + event.payload.event_type + '.' + ); + + try { + await emit({ + event, + eventType: ExtractorEventType.UnknownEventType, + data: { + error: { + message: + 'Unrecognized event type in spawn ' + + event.payload.event_type + + '.', + }, }, - }, - }); - return false; + }); + } catch (error) { + if (axios.isAxiosError(error)) { + console.error('Error while emitting event', serializeAxiosError(error)); + } else { + console.error('Error while emitting event', error); + } + } } } export class Spawn { private event: AirdropEvent; - private hasWorkerEmitted: boolean; + private alreadyEmitted: boolean; private defaultLambdaTimeout: number = 10 * 60 * 1000; // 10 minutes in milliseconds private lambdaTimeout: number; - private worker: Worker | null; - private resolve: SpawnResolve; private timer: ReturnType | undefined; private logger: Logger; + private resolve: (value: void | PromiseLike) => void; constructor({ event, worker, options, resolve }: SpawnInterface) { - this.hasWorkerEmitted = false; + this.alreadyEmitted = false; this.event = event; + this.logger = new Logger({ event, options }); this.lambdaTimeout = options?.timeout ? Math.min(options.timeout, this.defaultLambdaTimeout) : this.defaultLambdaTimeout; - this.resolve = resolve; - this.timer = setTimeout(async () => { - this.logger.log('Lambda timeout reached. Exiting.'); - if (this.worker) { - this.worker.postMessage({ + // if lambda timeout is reached, then send a message to the worker to gracefully exit + this.timer = setTimeout(async () => { + this.logger.log( + 'Lambda timeout reached. Sending a message to the worker to gracefully exit.' + ); + if (worker) { + worker.postMessage({ subject: WorkerMessageSubject.WorkerMessageExit, }); } else { + console.log("Worker doesn't exist. Exiting from main thread."); await this.exitFromMainThread(); } }, this.lambdaTimeout); - this.logger = new Logger({ event, options }); - this.worker = worker; + // if worker exits with process.exit(code) then we need to clear the timer and exit from main thread worker.on(WorkerEvent.WorkerExit, async (code) => { this.logger.info('Worker exited with exit code: ' + code + '.'); if (this.timer) { @@ -174,50 +181,67 @@ export class Spawn { } await this.exitFromMainThread(); }); + worker.on(WorkerEvent.WorkerMessage, async (message) => { - if (message?.subject === WorkerMessageSubject.WorkerMessageEmitted) { - this.logger.info('Worker has emitted message to ADaaS.'); - this.hasWorkerEmitted = true; + // if worker send a log message, then log it from the main thread with logger + if (message?.subject === WorkerMessageSubject.WorkerMessageLog) { + const args = message.payload?.args; + const level = message.payload?.level as LogLevel; + this.logger.logFn(args, level); } + + // if worker sends a message that it has completed work, then clear the timer and exit from main thread if (message?.subject === WorkerMessageSubject.WorkerMessageDone) { - this.logger.info('Worker has completed work.'); - clearTimeout(this.timer); + this.logger.info('Worker has completed with executing the task.'); + if (this.timer) { + clearTimeout(this.timer); + } await this.exitFromMainThread(); } - }); - worker.on(WorkerEvent.WorkerMessage, (message) => { - if (message?.subject === WorkerMessageSubject.WorkerMessageLog) { - const args = message.payload?.args; - const level = message.payload?.level as LogLevel; - this.logger.logFn(args, level); + // if worker sends a message that it has emitted an event, then set alreadyEmitted to true + if (message?.subject === WorkerMessageSubject.WorkerMessageEmitted) { + this.logger.info('Worker has emitted message to ADaaS.'); + this.alreadyEmitted = true; } }); } private async exitFromMainThread(): Promise { - if (this.hasWorkerEmitted) { - this.resolve(true); + if (this.alreadyEmitted) { + this.resolve(); return; } + this.alreadyEmitted = true; const timeoutEventType = getTimeoutErrorEventType( this.event.payload.event_type ); - if (timeoutEventType !== null) { + + if (timeoutEventType) { const { eventType } = timeoutEventType; - await emit({ - eventType, - event: this.event, - data: { - error: { - message: 'Worker has not emitted anything. Exited.', + + try { + await emit({ + eventType, + event: this.event, + data: { + error: { + message: 'Worker has not emitted anything. Exited.', + }, }, - }, - }).then(() => { - this.logger.error('Worker has not emitted anything. Exited.'); - this.resolve(true); - }); + }); + this.resolve(); + } catch (error) { + if (axios.isAxiosError(error)) { + console.error( + 'Error while emitting event', + serializeAxiosError(error) + ); + } else { + console.error('Error while emitting event', error); + } + } } } } diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index ed4ae3f..613d2ba 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -5,6 +5,9 @@ import { EventData, EventType, ExternalSystemAttachmentStreamingFunction, + ExternalSystemAttachmentProcessors, + ProcessAttachmentReturnType, + StreamAttachmentsReturnType, } from '../types/extraction'; import { ActionType, @@ -27,7 +30,7 @@ import { MessagePort } from 'node:worker_threads'; import { emit } from '../common/control-protocol'; import { WorkerMessageEmitted, WorkerMessageSubject } from '../types/workers'; import { Repo } from '../repo/repo'; -import { RepoInterface } from '../repo/repo.interfaces'; +import { NormalizedAttachment, RepoInterface } from '../repo/repo.interfaces'; import { ExternalSystemItem, ItemTypesToLoadParams, @@ -209,6 +212,7 @@ export class WorkerAdapter { // If the extraction is done, we want to save the timestamp of the last successful sync if (newEventType === ExtractorEventType.ExtractionAttachmentsDone) { this.state.lastSuccessfulSyncStarted = this.state.lastSyncStarted; + this.state.lastSyncStarted = ''; } // We want to save the state every time we emit an event, except for the start and delete events @@ -240,11 +244,20 @@ export class WorkerAdapter { }; this.artifacts = []; this.parentPort.postMessage(message); + this.hasWorkerEmitted = true; } catch (error) { - console.error( - 'Error while emitting event with event type: ' + newEventType + '.', - error - ); + if (axios.isAxiosError(error)) { + console.error( + 'Error while emitting event with event type: ' + newEventType + '.', + serializeAxiosError(error) + ); + } else { + console.error( + 'Error while emitting event with event type: ' + newEventType + '.', + error + ); + } + this.parentPort.postMessage(WorkerMessageSubject.WorkerMessageExit); } } @@ -657,6 +670,69 @@ export class WorkerAdapter { } } + processAttachment = async ( + attachment: NormalizedAttachment, + stream: ExternalSystemAttachmentStreamingFunction + ): Promise => { + const { httpStream, delay, error } = await stream({ + item: attachment, + event: this.event, + }); + + if (error) { + console.warn('Error while streaming attachment', error?.message); + return { error }; + } else if (delay) { + return { delay }; + } + + if (httpStream) { + const fileType = + httpStream.headers?.['content-type'] || 'application/octet-stream'; + + const preparedArtifact = await this.uploader.prepareArtifact( + attachment.file_name, + fileType + ); + if (!preparedArtifact) { + console.warn( + 'Error while preparing artifact for attachment ID ' + + attachment.id + + '. Skipping attachment' + ); + return; + } + + const uploadedArtifact = await this.uploader.streamToArtifact( + preparedArtifact, + httpStream + ); + + if (!uploadedArtifact) { + console.warn( + 'Error while preparing artifact for attachment ID ' + attachment.id + ); + return; + } + + const ssorAttachment: SsorAttachment = { + id: { + devrev: preparedArtifact.id, + external: attachment.id, + }, + parent_id: { + external: attachment.parent_id, + }, + actor_id: { + external: attachment.author_id, + }, + }; + + await this.getRepo('ssor_attachment')?.push([ssorAttachment]); + } + return; + }; + async loadAttachment({ item, create, @@ -698,29 +774,36 @@ export class WorkerAdapter { /** * Streams the attachments to the DevRev platform. * The attachments are streamed to the platform and the artifact information is returned. - * @param {string} attachmentsMetadataArtifactId - The artifact ID of the attachments metadata - * @returns {Promise} - The response object containing the ssoAttachment artifact information + * @param {{ stream, processors }: { stream: ExternalSystemAttachmentStreamingFunction, processors?: ExternalSystemAttachmentProcessors }} Params - The parameters to stream the attachments + * @returns {Promise} - The response object containing the ssoAttachment artifact information * or error information if there was an error */ - async streamAttachments({ + async streamAttachments({ stream, + processors, }: { stream: ExternalSystemAttachmentStreamingFunction; - }) { + processors?: ExternalSystemAttachmentProcessors< + ConnectorState, + NormalizedAttachment[], + NewBatch + >; + }): Promise { const repos = [ { itemType: 'ssor_attachment', }, ]; this.initializeRepos(repos); + const attachmentsState = ( + this.state.toDevRev?.attachmentsMetadata.artifactIds || [] + ).slice(); - for (const attachmentsMetadataArtifactId of this.state.toDevRev - ?.attachmentsMetadata.artifactIds || []) { - if (this.state.toDevRev?.attachmentsMetadata.artifactIds.length === 0) { - return { report: {} }; - } - - console.log('Started streaming attachments to the platform.'); + console.log('Attachments metadata artifact IDs', attachmentsState); + for (const attachmentsMetadataArtifactId of attachmentsState) { + console.log( + `Started processing attachments for artifact ID: ${attachmentsMetadataArtifactId}.` + ); const { attachments, error } = await this.uploader.getAttachmentsFromArtifactId({ @@ -728,92 +811,60 @@ export class WorkerAdapter { }); if (error) { + console.error( + `Failed to get attachments for artifact ID: ${attachmentsMetadataArtifactId}.` + ); return { error }; } - if (attachments) { + if (!attachments || attachments.length === 0) { + console.warn( + `No attachments found for artifact ID: ${attachmentsMetadataArtifactId}.` + ); + continue; + } + + if (processors) { + console.log(`Using custom processors for attachments.`); + const { reducer, iterator } = processors; + const reducedAttachments = reducer({ attachments, adapter: this }); + + const response = await iterator({ + reducedAttachments, + adapter: this, + stream, + }); + if (response?.delay || response?.error) { + return response; + } + } else { + console.log(`Using default processors for attachments.`); const attachmentsToProcess = attachments.slice( this.state.toDevRev?.attachmentsMetadata?.lastProcessed, attachments.length ); for (const attachment of attachmentsToProcess) { - const { httpStream, delay, error } = await stream({ - item: attachment, - event: this.event, - }); - - if (error) { - console.warn('Error while streaming attachment', error?.message); - continue; - } else if (delay) { - return { delay }; + const response = await this.processAttachment(attachment, stream); + if (response?.delay || response?.error) { + return response; } - if (httpStream) { - const fileType = - httpStream.headers?.['content-type'] || - 'application/octet-stream'; - - const preparedArtifact = await this.uploader.prepareArtifact( - attachment.file_name, - fileType - ); - if (!preparedArtifact) { - console.warn( - 'Error while preparing artifact for attachment ID ' + - attachment.id + - '. Skipping attachment' - ); - if (this.state.toDevRev) { - this.state.toDevRev.attachmentsMetadata.lastProcessed++; - } - continue; - } - - const uploadedArtifact = await this.uploader.streamToArtifact( - preparedArtifact, - httpStream - ); - - if (!uploadedArtifact) { - console.warn( - 'Error while preparing artifact for attachment ID ' + - attachment.id - ); - if (this.state.toDevRev) { - this.state.toDevRev.attachmentsMetadata.lastProcessed++; - } - continue; - } - - const ssorAttachment: SsorAttachment = { - id: { - devrev: preparedArtifact.id, - external: attachment.id, - }, - parent_id: { - external: attachment.parent_id, - }, - actor_id: { - external: attachment.author_id, - }, - }; - - await this.getRepo('ssor_attachment')?.push([ssorAttachment]); - if (this.state.toDevRev) { - this.state.toDevRev.attachmentsMetadata.lastProcessed++; - } + if (this.state.toDevRev) { + this.state.toDevRev.attachmentsMetadata.lastProcessed += 1; } } } if (this.state.toDevRev) { + console.log( + `Finished processing attachments for artifact ID. Setting last processed to 0 and removing artifact ID from state.` + ); this.state.toDevRev.attachmentsMetadata.artifactIds.shift(); this.state.toDevRev.attachmentsMetadata.lastProcessed = 0; } } - return { report: {} }; + return; } }