diff --git a/.devrev/repo.yml b/.devrev/repo.yml new file mode 100644 index 0000000..af3e7a6 --- /dev/null +++ b/.devrev/repo.yml @@ -0,0 +1 @@ +deployable: true \ No newline at end of file diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..122c4a6 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,2 @@ +# Code owners (if set required to merge PR) +* @radovanjorgic @navneel99 @samod @patricijabrecko @devrev/airdrop diff --git a/README.md b/README.md index 9b2a8e7..841ebb0 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,13 @@ ## Release Notes +### v1.2.5 + +- Add batch size option. +- Replace DevRev Typescript SDK requests with Axios for uploading and downloading artifacts. +- Remove unneccessary postState from default workers. +- Fix bugs related to attachment streaming. + ### v1.2.4 - Do not fail the extraction of attachments if streaming of single attachment fails. @@ -126,286 +133,3 @@ It provides features such as: ```bash npm install @devrev/ts-adaas ``` - -# Usage - -ADaaS Snap-ins can import data in both directions: from external sources to DevRev and from DevRev to external sources. Both directions are composed of several phases. - -From external source to DevRev: - -- External Sync Units Extraction -- Metadata Extraction -- Data Extraction -- Attachments Extraction - -From DevRev to external source: - -- Data Loading - -Each phase comes with unique requirements for processing task, and both timeout and error handling. - -The ADaaS library exports processTask to structure the work within each phase, and onTimeout function to handle timeouts. - -### ADaaS Snap-in Invocation - -Each ADaaS snap-in must handle all the phases of ADaaS extraction. In a Snap-in, you typically define a `run` function that iterates over events and invokes workers per extraction phase. - -```typescript -import { AirdropEvent, EventType, spawn } from '@devrev/ts-adaas'; - -interface DummyExtractorState { - issues: { completed: boolean }; - users: { completed: boolean }; - attachments: { completed: boolean }; -} - -const initialState: DummyExtractorState = { - issues: { completed: false }, - users: { completed: false }, - attachments: { completed: false }, -}; - -function getWorkerPerExtractionPhase(event: AirdropEvent) { - let path; - switch (event.payload.event_type) { - case EventType.ExtractionExternalSyncUnitsStart: - path = __dirname + '/workers/external-sync-units-extraction'; - break; - case EventType.ExtractionMetadataStart: - path = __dirname + '/workers/metadata-extraction'; - break; - case EventType.ExtractionDataStart: - case EventType.ExtractionDataContinue: - path = __dirname + '/workers/data-extraction'; - break; - } - return path; -} - -const run = async (events: AirdropEvent[]) => { - for (const event of events) { - const file = getWorkerPerExtractionPhase(event); - await spawn({ - event, - initialState, - workerPath: file, - options: { - isLocalDevelopment: true, - }, - }); - } -}; - -export default run; -``` - -## Extraction - -The ADaaS snap-in extraction lifecycle consists of three main phases: External Sync Units Extraction, Metadata Extraction, and Data Extraction. Each phase is defined in a separate file and is responsible for fetching the respective data. - -The ADaaS library provides a repository management system to handle artifacts in batches. The `initializeRepos` function initializes the repositories, and the `push` function uploads the artifacts to the repositories. The `postState` function is used to post the state of the extraction task. - -State management is crucial for ADaaS Snap-ins to maintain the state of the extraction task. The `postState` function is used to post the state of the extraction task. The state is stored in the adapter and can be retrieved using the `adapter.state` property. - -### 1. External Sync Units Extraction - -This phase is defined in `external-sync-units-extraction.ts` and is responsible for fetching the external sync units. - -```typescript -import { - ExternalSyncUnit, - ExtractorEventType, - processTask, -} from '@devrev/ts-adaas'; - -const externalSyncUnits: ExternalSyncUnit[] = [ - { - id: 'devrev', - name: 'devrev', - description: 'Demo external sync unit', - item_count: 2, - item_type: 'issues', - }, -]; - -processTask({ - task: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, { - external_sync_units: externalSyncUnits, - }); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsError, { - error: { - message: 'Failed to extract external sync units. Lambda timeout.', - }, - }); - }, -}); -``` - -### 2. Metadata Extraction - -This phase is defined in `metadata-extraction.ts` and is responsible for fetching the metadata. - -```typescript -import { ExtractorEventType, processTask } from '@devrev/ts-adaas'; -import externalDomainMetadata from '../dummy-extractor/external_domain_metadata.json'; - -const repos = [{ itemType: 'external_domain_metadata' }]; - -processTask({ - task: async ({ adapter }) => { - adapter.initializeRepos(repos); - await adapter - .getRepo('external_domain_metadata') - ?.push([externalDomainMetadata]); - await adapter.emit(ExtractorEventType.ExtractionMetadataDone); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(ExtractorEventType.ExtractionMetadataError, { - error: { message: 'Failed to extract metadata. Lambda timeout.' }, - }); - }, -}); -``` - -### 3. Data Extraction - -This phase is defined in `data-extraction.ts` and is responsible for fetching the data. In this phase also attachments metadata is extracted. - -```typescript -import { EventType, ExtractorEventType, processTask } from '@devrev/ts-adaas'; -import { normalizeAttachment, normalizeIssue, normalizeUser } from '../dummy-extractor/data-normalization'; - -const issues = [ - { id: 'issue-1', created_date: '1999-12-25T01:00:03+01:00', ... }, - { id: 'issue-2', created_date: '1999-12-27T15:31:34+01:00', ... }, -]; - -const users = [ - { id: 'user-1', created_date: '1999-12-25T01:00:03+01:00', ... }, - { id: 'user-2', created_date: '1999-12-27T15:31:34+01:00', ... }, -]; - -const attachments = [ - { url: 'https://app.dev.devrev-eng.ai/favicon.ico', id: 'attachment-1', ... }, - { url: 'https://app.dev.devrev-eng.ai/favicon.ico', id: 'attachment-2', ... }, -]; - -const repos = [ - { itemType: 'issues', normalize: normalizeIssue }, - { itemType: 'users', normalize: normalizeUser }, - { itemType: 'attachments', normalize: normalizeAttachment }, -]; - -processTask({ - task: async ({ adapter }) => { - adapter.initializeRepos(repos); - - if (adapter.event.payload.event_type === EventType.ExtractionDataStart) { - await adapter.getRepo('issues')?.push(issues); - await adapter.emit(ExtractorEventType.ExtractionDataProgress, { progress: 50 }); - } else { - await adapter.getRepo('users')?.push(users); - await adapter.getRepo('attachments')?.push(attachments); - await adapter.emit(ExtractorEventType.ExtractionDataDone, { progress: 100 }); - } - }, - onTimeout: async ({ adapter }) => { - await adapter.postState(); - await adapter.emit(ExtractorEventType.ExtractionDataProgress, { progress: 50 }); - }, -}); -``` - -### 4. Attachments Streaming - -The ADaaS library handles attachments streaming to improve efficiency and reduce complexity for developers. During the extraction phase, developers need only to provide metadata in a specific format for each attachment, and the library manages the streaming process. - -The Snap-in should provide attachment metadata following the `NormalizedAttachment` interface: - -```typescript -export interface NormalizedAttachment { - url: string; - id: string; - file_name: string; - author_id: string; - parent_id: string; -} -``` - -## Loading phases - -### 1. Loading Data - -This phase is defined in `load-data.ts` and is responsible for loading the data to the external system. - -Loading is done by providing an ordered list of itemTypes to load and their respective create and update functions. - -```typescript - processTask({ - task: async ({ adapter }) => { - const { reports, processed_files } = await adapter.loadItemTypes({ - itemTypesToLoad: [ - { - itemType: 'tickets', - create: createTicket, - update: updateTicket, - }, - { - itemType: 'conversations', - create: createConversation, - update: updateConversation, - }, - ], - }); - - await adapter.emit(LoaderEventType.DataLoadingDone, { - reports, - processed_files, - }); - }, - onTimeout: async ({ adapter }) => { - await adapter.emit(LoaderEventType.DataLoadingProgress, { - reports: adapter.reports, - processed_files: adapter.processedFiles, - }); -}); -``` - -The loading functions `create` and `update` provide loading to the external system. They provide denormalization of the records to the schema of the external system and provide HTTP calls to the external system. Both loading functions must handle rate limiting for the external system and handle errors. - -Functions return an ID and modified date of the record in the external system, or specify rate-liming offset or errors, if the record could not be created or updated. - -### 2. Loading Attachments - -This phase is defined in `load-attachments.ts` and is responsible for loading the attachments to the external system. - -Loading is done by providing the create function to create attachments in the external system. - -```typescript -processTask({ - task: async ({ adapter }) => { - const { reports, processed_files } = await adapter.loadAttachments({ - create, - }); - - await adapter.emit(LoaderEventType.AttachmentLoadingDone, { - reports, - processed_files, - }); - }, - onTimeout: async ({ adapter }) => { - await adapter.postState(); - await adapter.emit(LoaderEventType.AttachmentLoadingProgress, { - reports: adapter.reports, - processed_files: adapter.processedFiles, - }); - }, -}); -``` - -The loading function `create` provides loading to the external system, to make API calls to the external system to create the attachments and handle errors and external system's rate limiting. - -Functions return an ID and modified date of the record in the external system, specify rate-liming back-off, or log errors, if the attachment could not be created. diff --git a/package.json b/package.json index 36c57d4..176f1f7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@devrev/ts-adaas", - "version": "1.2.4", + "version": "1.2.5", "description": "DevRev ADaaS (AirDrop-as-a-Service) Typescript SDK.", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/deprecated/uploader/index.ts b/src/deprecated/uploader/index.ts index 3abeaf9..4809323 100644 --- a/src/deprecated/uploader/index.ts +++ b/src/deprecated/uploader/index.ts @@ -1,4 +1,4 @@ -import { axiosDevRevClient } from '../../http/axios-devrev-client'; +import { axiosClient } from '../../http/axios-client'; import { betaSDK, client } from '@devrev/typescript-sdk'; import fs, { promises as fsPromises } from 'fs'; import { createFormData } from '../common/helpers'; @@ -108,15 +108,11 @@ export class Uploader { ): Promise { const formData = createFormData(preparedArtifact, fetchedObjects); try { - const response = await axiosDevRevClient.post( - preparedArtifact.url, - formData, - { - headers: { - 'Content-Type': 'multipart/form', - }, - } - ); + const response = await axiosClient.post(preparedArtifact.url, formData, { + headers: { + 'Content-Type': 'multipart/form-data', + }, + }); return response; } catch (error) { diff --git a/src/http/axios-devrev-client.ts b/src/http/axios-devrev-client.ts deleted file mode 100644 index 01e577f..0000000 --- a/src/http/axios-devrev-client.ts +++ /dev/null @@ -1,34 +0,0 @@ -import axios, { AxiosError } from 'axios'; -import axiosRetry from 'axios-retry'; - -const axiosDevRevClient = axios.create(); - -axiosRetry(axiosDevRevClient, { - retries: 5, - retryDelay: (retryCount, error) => { - console.warn( - 'Retry attempt: ' + retryCount + ' to url: ' + error.config?.url + '.' - ); - if (error.response) { - const retry_after = error.response?.headers['retry-after']; - if (retry_after) { - return retry_after; - } - } - // Exponential backoff algorithm: 1 * 2 ^ retryCount * 1000ms - return axiosRetry.exponentialDelay(retryCount, error, 1000); - }, - retryCondition: (error: AxiosError) => { - return ( - axiosRetry.isNetworkOrIdempotentRequestError(error) || - error.response?.status === 429 - ); - }, - onMaxRetryTimesExceeded(error: AxiosError, retryCount) { - console.log(`Max retries attempted: ${retryCount}`); - delete error.config?.headers.Authorization; - delete error.request._header; - }, -}); - -export { axios, axiosDevRevClient }; diff --git a/src/repo/repo.test.ts b/src/repo/repo.test.ts index 23add4f..ca5fb7c 100644 --- a/src/repo/repo.test.ts +++ b/src/repo/repo.test.ts @@ -19,7 +19,6 @@ describe('Repo class push method', () => { itemType: 'test_item_type', normalize, onUpload: jest.fn(), - options: {}, }); }); @@ -105,4 +104,44 @@ describe('Repo class push method', () => { uploadSpy.mockRestore(); }); + + describe('should take batch size into account', () => { + beforeEach(() => { + repo = new Repo({ + event: createEvent({ eventType: EventType.ExtractionDataStart }), + itemType: 'test_item_type', + normalize, + onUpload: jest.fn(), + options: { + batchSize: 50, + }, + }); + }); + + it('should empty the items array after pushing 50 items with batch size of 50', async () => { + const items = createItems(50); + await repo.push(items); + expect(repo.getItems()).toEqual([]); + }); + + it('should leave 5 items in the items array after pushing 205 items with batch size of 50', async () => { + const items = createItems(205); + await repo.push(items); + + expect(repo.getItems().length).toBe(5); + }); + + it('should upload 4 batches of 50 and leave 5 items in the items array after pushing 205 items with batch size of 50', async () => { + const uploadSpy = jest.spyOn(repo, 'upload'); + + const items = createItems(205); + await repo.push(items); + + expect(normalize).toHaveBeenCalledTimes(205); + expect(repo.getItems().length).toBe(5); + expect(uploadSpy).toHaveBeenCalledTimes(4); + + uploadSpy.mockRestore(); + }); + }); }); diff --git a/src/repo/repo.ts b/src/repo/repo.ts index 1c14576..e863e34 100644 --- a/src/repo/repo.ts +++ b/src/repo/repo.ts @@ -12,6 +12,7 @@ import { NormalizedItem, NormalizedAttachment, } from './repo.interfaces'; +import { WorkerAdapterOptions } from 'types/workers'; export class Repo { readonly itemType: string; @@ -19,6 +20,7 @@ export class Repo { private normalize?: (item: Item) => NormalizedItem | NormalizedAttachment; private uploader: Uploader; private onUpload: (artifact: Artifact) => void; + private options?: WorkerAdapterOptions; constructor({ event, @@ -32,6 +34,7 @@ export class Repo { this.normalize = normalize; this.onUpload = onUpload; this.uploader = new Uploader({ event, options }); + this.options = options; } getItems(): (NormalizedItem | NormalizedAttachment | Item)[] { @@ -98,9 +101,10 @@ export class Repo { this.items.push(...recordsToPush); // Upload in batches while the number of items exceeds the batch size - while (this.items.length >= ARTIFACT_BATCH_SIZE) { - // Slice out a batch of ARTIFACT_BATCH_SIZE items to upload - const batch = this.items.splice(0, ARTIFACT_BATCH_SIZE); + const batchSize = this.options?.batchSize || ARTIFACT_BATCH_SIZE; + while (this.items.length >= batchSize) { + // Slice out a batch of batchSize items to upload + const batch = this.items.splice(0, batchSize); try { // Upload the batch diff --git a/src/types/workers.ts b/src/types/workers.ts index c7f859b..51c24dd 100644 --- a/src/types/workers.ts +++ b/src/types/workers.ts @@ -27,10 +27,12 @@ export interface WorkerAdapterInterface { * @constructor * @param {boolean=} isLocalDevelopment - A flag to indicate if the adapter is being used in local development * @param {number=} timeout - The timeout for the worker thread + * @param {number=} batchSize - Maximum number of extracted items in a batch */ export interface WorkerAdapterOptions { isLocalDevelopment?: boolean; timeout?: number; + batchSize?: number; } /** diff --git a/src/uploader/uploader.interfaces.ts b/src/uploader/uploader.interfaces.ts index 2a2cbac..c6d01ae 100644 --- a/src/uploader/uploader.interfaces.ts +++ b/src/uploader/uploader.interfaces.ts @@ -19,7 +19,6 @@ export interface Artifact { /** * ArtifactsPrepareResponse is an interface that defines the structure of the response from the prepare artifacts endpoint. - * @deprecated */ export interface ArtifactsPrepareResponse { url: string; diff --git a/src/uploader/uploader.ts b/src/uploader/uploader.ts index 37d6226..5f01f58 100644 --- a/src/uploader/uploader.ts +++ b/src/uploader/uploader.ts @@ -3,7 +3,6 @@ import { axios, axiosClient } from '../http/axios-client'; import zlib from 'zlib'; import { jsonl } from 'js-jsonl'; import FormData from 'form-data'; -import { betaSDK, client } from '@devrev/typescript-sdk'; import { MAX_DEVREV_ARTIFACT_SIZE } from '../common/constants'; import { NormalizedAttachment } from '../repo/repo.interfaces'; @@ -11,6 +10,7 @@ import { AirdropEvent } from '../types/extraction'; import { Artifact, + ArtifactsPrepareResponse, UploadResponse, UploaderFactoryInterface, } from './uploader.interfaces'; @@ -19,15 +19,14 @@ import { AxiosResponse } from 'axios'; export class Uploader { private event: AirdropEvent; - private betaDevrevSdk: betaSDK.Api; private isLocalDevelopment?: boolean; + private devrevApiEndpoint: string; + private devrevApiToken: string; constructor({ event, options }: UploaderFactoryInterface) { this.event = event; - this.betaDevrevSdk = client.setupBeta({ - endpoint: event.execution_metadata.devrev_endpoint, - token: event.context.secrets.service_account_token, - }); + this.devrevApiEndpoint = event.execution_metadata.devrev_endpoint; + this.devrevApiToken = event.context.secrets.service_account_token; this.isLocalDevelopment = options?.isLocalDevelopment; } @@ -91,12 +90,20 @@ export class Uploader { public async prepareArtifact( filename: string, fileType: string - ): Promise { + ): Promise { try { - const response = await this.betaDevrevSdk.artifactsPrepare({ - file_name: filename, - file_type: fileType, - }); + const response = await axiosClient.post( + `${this.devrevApiEndpoint}/artifacts.prepare`, + { + file_name: filename, + file_type: fileType, + }, + { + headers: { + Authorization: `Bearer ${this.devrevApiToken}`, + }, + } + ); return response.data; } catch (error) { @@ -112,9 +119,8 @@ export class Uploader { } private async uploadToArtifact( - preparedArtifact: betaSDK.ArtifactsPrepareResponse, + preparedArtifact: ArtifactsPrepareResponse, file: Buffer - // eslint-disable-next-line @typescript-eslint/no-explicit-any ): Promise { const formData = new FormData(); for (const field of preparedArtifact.form_data) { @@ -143,29 +149,33 @@ export class Uploader { } public async streamToArtifact( - preparedArtifact: betaSDK.ArtifactsPrepareResponse, - // eslint-disable-next-line @typescript-eslint/no-explicit-any + preparedArtifact: ArtifactsPrepareResponse, fileStreamResponse: any ): Promise { const formData = new FormData(); for (const field of preparedArtifact.form_data) { formData.append(field.key, field.value); } - formData.append('file', fileStreamResponse.data); if ( fileStreamResponse.headers['content-length'] > MAX_DEVREV_ARTIFACT_SIZE ) { + console.warn( + `File size exceeds the maximum limit of ${MAX_DEVREV_ARTIFACT_SIZE} bytes.` + ); return; } + try { const response = await axiosClient.post(preparedArtifact.url, formData, { headers: { ...formData.getHeaders(), - ...(!fileStreamResponse.headers['content-length'] && { - 'Content-Length': MAX_DEVREV_ARTIFACT_SIZE, - }), + ...(!fileStreamResponse.headers['content-length'] + ? { + 'Content-Length': MAX_DEVREV_ARTIFACT_SIZE, + } + : {}), }, }); return response; @@ -190,7 +200,7 @@ export class Uploader { attachments?: NormalizedAttachment[]; error?: { message: string }; }> { - // 1. Get the URL of the attachments metadata artifact + // Get the URL of the attachments metadata artifact const artifactUrl = await this.getArtifactDownloadUrl(artifact); if (!artifactUrl) { @@ -199,7 +209,7 @@ export class Uploader { }; } - // 2. Download artifact from the URL + // Download artifact from the URL const gzippedJsonlObject = await this.downloadArtifact(artifactUrl); if (!gzippedJsonlObject) { return { @@ -207,7 +217,7 @@ export class Uploader { }; } - // 3. Decompress the gzipped jsonl object + // Decompress the gzipped jsonl object const jsonlObject = this.decompressGzip(gzippedJsonlObject); if (!jsonlObject) { return { @@ -215,7 +225,7 @@ export class Uploader { }; } - // 4. Parse the jsonl object to get the attachment metadata + // Parse the jsonl object to get the attachment metadata const jsonObject = this.parseJsonl(jsonlObject) as NormalizedAttachment[]; if (!jsonObject) { return { @@ -230,9 +240,17 @@ export class Uploader { artifactId: string ): Promise { try { - const response = await this.betaDevrevSdk.artifactsLocate({ - id: artifactId, - }); + const response = await axiosClient.post( + `${this.devrevApiEndpoint}/artifacts.locate`, + { + id: artifactId, + }, + { + headers: { + Authorization: `Bearer ${this.devrevApiToken}`, + }, + } + ); return response.data.url; } catch (error) { diff --git a/src/workers/default-workers/attachments-extraction.ts b/src/workers/default-workers/attachments-extraction.ts index 56d37db..3fe393c 100644 --- a/src/workers/default-workers/attachments-extraction.ts +++ b/src/workers/default-workers/attachments-extraction.ts @@ -17,6 +17,9 @@ const getAttachmentStream = async ({ try { const fileStreamResponse = await axiosClient.get(url, { responseType: 'stream', + headers: { + 'Accept-Encoding': 'identity', + }, }); return { httpStream: fileStreamResponse }; @@ -63,7 +66,6 @@ processTask({ } }, onTimeout: async ({ adapter }) => { - await adapter.postState(); await adapter.emit(ExtractorEventType.ExtractionAttachmentsProgress, { progress: 50, }); diff --git a/src/workers/default-workers/data-extraction.ts b/src/workers/default-workers/data-extraction.ts index 8e5979a..682e3cf 100644 --- a/src/workers/default-workers/data-extraction.ts +++ b/src/workers/default-workers/data-extraction.ts @@ -100,7 +100,6 @@ processTask({ } }, onTimeout: async ({ adapter }) => { - await adapter.postState(); await adapter.emit(ExtractorEventType.ExtractionDataProgress, { progress: 50, }); diff --git a/src/workers/default-workers/load-attachments.ts b/src/workers/default-workers/load-attachments.ts index 0865a93..2c0cdc3 100644 --- a/src/workers/default-workers/load-attachments.ts +++ b/src/workers/default-workers/load-attachments.ts @@ -11,7 +11,6 @@ processTask({ }); }, onTimeout: async ({ adapter }) => { - await adapter.postState(); await adapter.emit(LoaderEventType.AttachmentLoadingError, { reports: adapter.reports, processed_files: adapter.processedFiles, diff --git a/src/workers/default-workers/load-data.ts b/src/workers/default-workers/load-data.ts index 9a5148c..9965d84 100644 --- a/src/workers/default-workers/load-data.ts +++ b/src/workers/default-workers/load-data.ts @@ -9,7 +9,6 @@ processTask({ }); }, onTimeout: async ({ adapter }) => { - await adapter.postState(); await adapter.emit(LoaderEventType.DataLoadingError, { reports: adapter.reports, processed_files: adapter.processedFiles, diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index 267444b..03230fa 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -1,4 +1,4 @@ -import { axios } from '../http/axios-devrev-client'; +import { axios } from '../http/axios-client'; import { AirdropEvent, ExtractorEventType, @@ -700,9 +700,7 @@ export class WorkerAdapter { ); if (!preparedArtifact) { console.warn( - 'Error while preparing artifact for attachment ID ' + - attachment.id + - '. Skipping attachment' + `Error while preparing artifact for attachment ID ${attachment.id}. Skipping attachment.` ); return; } @@ -714,7 +712,7 @@ export class WorkerAdapter { if (!uploadedArtifact) { console.warn( - 'Error while preparing artifact for attachment ID ' + attachment.id + `Error while streaming to artifact for attachment ID ${attachment.id}. Skipping attachment.` ); return; } @@ -802,14 +800,26 @@ export class WorkerAdapter { }, ]; this.initializeRepos(repos); - const attachmentsState = ( - this.state.toDevRev?.attachmentsMetadata.artifactIds || [] - ).slice(); - console.log('Attachments metadata artifact IDs', attachmentsState); - for (const attachmentsMetadataArtifactId of attachmentsState) { + const attachmentsMetadataArtifactIds = + this.state.toDevRev?.attachmentsMetadata?.artifactIds; + + if ( + !attachmentsMetadataArtifactIds || + attachmentsMetadataArtifactIds.length === 0 + ) { + console.log(`No attachments metadata artifact IDs found in state.`); + + return; + } else { console.log( - `Started processing attachments for artifact ID: ${attachmentsMetadataArtifactId}.` + `Found ${attachmentsMetadataArtifactIds.length} attachments metadata artifact IDs in state.` + ); + } + + for (const attachmentsMetadataArtifactId of attachmentsMetadataArtifactIds) { + console.log( + `Started processing attachments for attachments metadata artifact ID: ${attachmentsMetadataArtifactId}.` ); const { attachments, error } = @@ -831,8 +841,13 @@ export class WorkerAdapter { continue; } + console.log( + `Found ${attachments.length} attachments for artifact ID: ${attachmentsMetadataArtifactId}.` + ); + if (processors) { console.log(`Using custom processors for attachments.`); + const { reducer, iterator } = processors; const reducedAttachments = reducer({ attachments, adapter: this }); @@ -841,6 +856,7 @@ export class WorkerAdapter { adapter: this, stream, }); + if (response?.delay || response?.error) { return response; }