1
1
import { boundMethod } from 'autobind-decorator' ;
2
2
import { EventEmitter } from 'events' ;
3
3
import CloudWatchLogs , {
4
+ DescribeLogStreamsResponse ,
5
+ LogStream ,
4
6
InputLogEvent ,
5
7
PutLogEventsRequest ,
6
8
PutLogEventsResponse ,
7
9
} from 'aws-sdk/clients/cloudwatchlogs' ;
8
10
import S3 , { PutObjectRequest , PutObjectOutput } from 'aws-sdk/clients/s3' ;
11
+ import promiseSequential from 'promise-sequential' ;
9
12
10
13
import { SessionProxy } from './proxy' ;
11
- import { HandlerRequest , runInSequence } from './utils' ;
14
+ import { delay , HandlerRequest } from './utils' ;
12
15
13
16
type Console = globalThis . Console ;
17
+ type PromiseFunction = ( ) => Promise < any > ;
14
18
15
19
interface LogOptions {
16
20
groupName : string ;
@@ -26,13 +30,14 @@ export class ProviderLogHandler {
26
30
private static instance : ProviderLogHandler ;
27
31
public emitter : LogEmitter ;
28
32
public client : CloudWatchLogs ;
29
- public sequenceToken = '' ;
33
+ public sequenceToken : string = null ;
30
34
public accountId : string ;
31
35
public groupName : string ;
32
36
public stream : string ;
33
37
public logger : Console ;
34
38
public clientS3 : S3 ;
35
- private stack : Array < Promise < any > > = [ ] ;
39
+ private stack : Array < PromiseFunction > = [ ] ;
40
+ private isProcessing = false ;
36
41
37
42
/**
38
43
* The ProviderLogHandler's constructor should always be private to prevent direct
@@ -50,28 +55,50 @@ export class ProviderLogHandler {
50
55
const logger = options . logger || global . console ;
51
56
this . logger = logger ;
52
57
this . emitter . on ( 'log' , ( ...args : any [ ] ) => {
53
- this . stack . push ( this . deliverLog ( args ) ) ;
58
+ // this.logger.debug('Emitting log event...' );
54
59
} ) ;
55
60
// Create maps of each logger method and then alias that.
56
61
Object . entries ( this . logger ) . forEach ( ( [ key , val ] ) => {
57
62
if ( typeof val === 'function' ) {
58
63
if ( [ 'log' , 'error' , 'warn' , 'info' ] . includes ( key ) ) {
59
- this . logger [ key as 'log' | 'error' | 'warn' | 'info' ] = function (
64
+ this . logger [ key as 'log' | 'error' | 'warn' | 'info' ] = (
60
65
...args : any [ ]
61
- ) : void {
62
- // For adding other event watchers later.
63
- setImmediate ( ( ) => emitter . emit ( 'log' , ...args ) ) ;
66
+ ) : void => {
67
+ if ( ! this . isProcessing ) {
68
+ const logLevel = key . toUpperCase ( ) ;
69
+ // Add log level when not present
70
+ if (
71
+ args . length &&
72
+ ( typeof args [ 0 ] !== 'string' ||
73
+ args [ 0 ]
74
+ . substring ( 0 , logLevel . length )
75
+ . toUpperCase ( ) !== logLevel )
76
+ ) {
77
+ args . unshift ( logLevel ) ;
78
+ }
79
+ this . stack . push ( ( ) =>
80
+ this . deliverLog ( args ) . catch ( this . logger . debug )
81
+ ) ;
82
+ // For adding other event watchers later.
83
+ setImmediate ( ( ) => {
84
+ this . emitter . emit ( 'log' , ...args ) ;
85
+ } ) ;
86
+ } else {
87
+ this . logger . debug (
88
+ 'Logs are being delivered at the moment...'
89
+ ) ;
90
+ }
64
91
65
92
// Calls the logger method.
66
- val . apply ( this , args ) ;
93
+ val . apply ( this . logger , args ) ;
67
94
} ;
68
95
}
69
96
}
70
97
} ) ;
71
98
}
72
99
73
100
private async initialize ( ) : Promise < void > {
74
- this . sequenceToken = '' ;
101
+ this . sequenceToken = null ;
75
102
this . stack = [ ] ;
76
103
try {
77
104
await this . deliverLogCloudWatch ( [ 'Initialize CloudWatch' ] ) ;
@@ -142,11 +169,13 @@ export class ProviderLogHandler {
142
169
143
170
@boundMethod
144
171
public async processLogs ( ) : Promise < void > {
172
+ this . isProcessing = true ;
145
173
if ( this . stack . length > 0 ) {
146
- this . stack . push ( this . deliverLog ( [ 'Log delivery finalized.' ] ) ) ;
174
+ this . stack . push ( ( ) => this . deliverLog ( [ 'Log delivery finalized.' ] ) ) ;
147
175
}
148
- await runInSequence ( this . stack ) ;
176
+ await promiseSequential ( this . stack ) ;
149
177
this . stack = [ ] ;
178
+ this . isProcessing = false ;
150
179
}
151
180
152
181
private async createLogGroup ( ) : Promise < void > {
@@ -199,27 +228,48 @@ export class ProviderLogHandler {
199
228
const response : PutLogEventsResponse = await this . client
200
229
. putLogEvents ( logEventsParams )
201
230
. promise ( ) ;
202
- this . sequenceToken = response ?. nextSequenceToken ;
231
+ this . sequenceToken = response ?. nextSequenceToken || null ;
203
232
this . logger . debug ( 'Response from "putLogEvents"' , response ) ;
204
233
return response ;
205
234
} catch ( err ) {
206
235
const errorCode = err . code || err . name ;
207
- this . logger . debug ( 'Error from "deliverLogCloudWatch"' , err ) ;
208
- this . logger . debug ( `Error from 'putLogEvents' ${ JSON . stringify ( err ) } ` ) ;
236
+ this . logger . debug (
237
+ `Error from "putLogEvents" with sequence token ${ this . sequenceToken } ` ,
238
+ err
239
+ ) ;
209
240
if (
210
241
errorCode === 'DataAlreadyAcceptedException' ||
211
242
errorCode === 'InvalidSequenceTokenException'
212
243
) {
213
- this . sequenceToken = ( err . message || '' ) . split ( ' ' ) . pop ( ) ;
214
- this . putLogEvents ( record ) ;
244
+ this . sequenceToken = null ;
245
+ // Delay to avoid throttling
246
+ await delay ( 1 ) ;
247
+ try {
248
+ const response : DescribeLogStreamsResponse = await this . client
249
+ . describeLogStreams ( {
250
+ logGroupName : this . groupName ,
251
+ logStreamNamePrefix : this . stream ,
252
+ limit : 1 ,
253
+ } )
254
+ . promise ( ) ;
255
+ this . logger . debug ( 'Response from "describeLogStreams"' , response ) ;
256
+ if ( response . logStreams && response . logStreams . length ) {
257
+ const logStream = response . logStreams [ 0 ] as LogStream ;
258
+ this . sequenceToken = logStream . uploadSequenceToken ;
259
+ }
260
+ } catch ( err ) {
261
+ this . logger . debug ( 'Error from "describeLogStreams"' , err ) ;
262
+ }
215
263
} else {
216
264
throw err ;
217
265
}
218
266
}
219
267
}
220
268
221
269
@boundMethod
222
- private async deliverLogCloudWatch ( messages : any [ ] ) : Promise < PutLogEventsResponse > {
270
+ private async deliverLogCloudWatch (
271
+ messages : any [ ]
272
+ ) : Promise < PutLogEventsResponse | void > {
223
273
const currentTime = new Date ( Date . now ( ) ) ;
224
274
const record : InputLogEvent = {
225
275
message : JSON . stringify ( { messages } ) ,
@@ -236,8 +286,20 @@ export class ProviderLogHandler {
236
286
await this . createLogGroup ( ) ;
237
287
}
238
288
await this . createLogStream ( ) ;
239
- return this . putLogEvents ( record ) ;
240
- } else {
289
+ } else if (
290
+ errorCode !== 'DataAlreadyAcceptedException' &&
291
+ errorCode !== 'InvalidSequenceTokenException'
292
+ ) {
293
+ throw err ;
294
+ }
295
+ try {
296
+ const response = await this . putLogEvents ( record ) ;
297
+ return response ;
298
+ } catch ( err ) {
299
+ // Additional retry for sequence token error
300
+ if ( this . sequenceToken ) {
301
+ return this . putLogEvents ( record ) ;
302
+ }
241
303
throw err ;
242
304
}
243
305
}
@@ -316,7 +378,7 @@ export class ProviderLogHandler {
316
378
@boundMethod
317
379
private async deliverLog (
318
380
messages : any [ ]
319
- ) : Promise < PutLogEventsResponse | PutObjectOutput > {
381
+ ) : Promise < PutLogEventsResponse | PutObjectOutput | void > {
320
382
if ( this . clientS3 ) {
321
383
return this . deliverLogS3 ( messages ) ;
322
384
}
0 commit comments