@@ -51,7 +51,7 @@ const $readCSV = async (filePath: string, options?: CsvInputOptionsNode): Promis
51
51
const frameConfig = options ?. frameConfig || { }
52
52
53
53
if ( filePath . startsWith ( "http" ) || filePath . startsWith ( "https" ) ) {
54
- return new Promise ( resolve => {
54
+ return new Promise ( ( resolve , reject ) => {
55
55
const optionsWithDefaults = {
56
56
header : true ,
57
57
dynamicTyping : true ,
@@ -60,6 +60,13 @@ const $readCSV = async (filePath: string, options?: CsvInputOptionsNode): Promis
60
60
}
61
61
62
62
const dataStream = request . get ( filePath ) ;
63
+ // reject any non-2xx status codes
64
+ dataStream . on ( 'response' , ( response : any ) => {
65
+ if ( response . statusCode < 200 || response . statusCode >= 300 ) {
66
+ reject ( new Error ( `HTTP ${ response . statusCode } : ${ response . statusMessage } ` ) ) ;
67
+ }
68
+ } ) ;
69
+
63
70
const parseStream : any = Papa . parse ( Papa . NODE_STREAM_INPUT , optionsWithDefaults as any ) ;
64
71
dataStream . pipe ( parseStream ) ;
65
72
@@ -74,17 +81,24 @@ const $readCSV = async (filePath: string, options?: CsvInputOptionsNode): Promis
74
81
} ) ;
75
82
76
83
} else {
77
- return new Promise ( resolve => {
78
- const fileStream = fs . createReadStream ( filePath )
79
- Papa . parse ( fileStream , {
80
- header : true ,
81
- dynamicTyping : true ,
82
- ...options ,
83
- complete : results => {
84
- const df = new DataFrame ( results . data , frameConfig ) ;
85
- resolve ( df ) ;
84
+ return new Promise ( ( resolve , reject ) => {
85
+ fs . access ( filePath , fs . constants . F_OK , ( err ) => {
86
+ if ( err ) {
87
+ reject ( "ENOENT: no such file or directory" ) ;
86
88
}
87
- } ) ;
89
+
90
+ const fileStream = fs . createReadStream ( filePath )
91
+
92
+ Papa . parse ( fileStream , {
93
+ header : true ,
94
+ dynamicTyping : true ,
95
+ ...options ,
96
+ complete : results => {
97
+ const df = new DataFrame ( results . data , frameConfig ) ;
98
+ resolve ( df ) ;
99
+ }
100
+ } ) ;
101
+ } )
88
102
} ) ;
89
103
}
90
104
} ;
@@ -113,9 +127,17 @@ const $streamCSV = async (filePath: string, callback: (df: DataFrame) => void, o
113
127
dynamicTyping : true ,
114
128
...options ,
115
129
}
116
- return new Promise ( resolve => {
130
+ return new Promise ( ( resolve , reject ) => {
117
131
let count = - 1
118
132
const dataStream = request . get ( filePath ) ;
133
+
134
+ // reject any non-2xx status codes
135
+ dataStream . on ( 'response' , ( response : any ) => {
136
+ if ( response . statusCode < 200 || response . statusCode >= 300 ) {
137
+ reject ( new Error ( `HTTP ${ response . statusCode } : ${ response . statusMessage } ` ) ) ;
138
+ }
139
+ } ) ;
140
+
119
141
const parseStream : any = Papa . parse ( Papa . NODE_STREAM_INPUT , optionsWithDefaults ) ;
120
142
dataStream . pipe ( parseStream ) ;
121
143
@@ -130,19 +152,26 @@ const $streamCSV = async (filePath: string, callback: (df: DataFrame) => void, o
130
152
131
153
} ) ;
132
154
} else {
133
- const fileStream = fs . createReadStream ( filePath )
134
155
135
- return new Promise ( resolve => {
136
- let count = - 1
137
- Papa . parse ( fileStream , {
138
- header : true ,
139
- dynamicTyping : true ,
140
- ...options ,
141
- step : results => {
142
- const df = new DataFrame ( [ results . data ] , { ...frameConfig , index : [ count ++ ] } ) ;
143
- callback ( df ) ;
144
- } ,
145
- complete : ( ) => resolve ( null )
156
+ return new Promise ( ( resolve , reject ) => {
157
+ fs . access ( filePath , fs . constants . F_OK , ( err ) => {
158
+ if ( err ) {
159
+ reject ( "ENOENT: no such file or directory" ) ;
160
+ }
161
+
162
+ const fileStream = fs . createReadStream ( filePath )
163
+
164
+ let count = - 1
165
+ Papa . parse ( fileStream , {
166
+ header : true ,
167
+ dynamicTyping : true ,
168
+ ...options ,
169
+ step : results => {
170
+ const df = new DataFrame ( [ results . data ] , { ...frameConfig , index : [ count ++ ] } ) ;
171
+ callback ( df ) ;
172
+ } ,
173
+ complete : ( ) => resolve ( null )
174
+ } ) ;
146
175
} ) ;
147
176
} ) ;
148
177
}
@@ -228,6 +257,14 @@ const $openCsvInputStream = (filePath: string, options: CsvInputOptionsNode) =>
228
257
229
258
if ( filePath . startsWith ( "http" ) || filePath . startsWith ( "https" ) ) {
230
259
const dataStream = request . get ( filePath ) ;
260
+
261
+ // reject any non-2xx status codes
262
+ dataStream . on ( 'response' , ( response : any ) => {
263
+ if ( response . statusCode < 200 || response . statusCode >= 300 ) {
264
+ throw new Error ( `HTTP ${ response . statusCode } : ${ response . statusMessage } ` ) ;
265
+ }
266
+ } ) ;
267
+
231
268
const parseStream : any = Papa . parse ( Papa . NODE_STREAM_INPUT , { header, dynamicTyping : true , ...options } ) ;
232
269
dataStream . pipe ( parseStream ) ;
233
270
let count = - 1
@@ -258,37 +295,44 @@ const $openCsvInputStream = (filePath: string, options: CsvInputOptionsNode) =>
258
295
return csvInputStream ;
259
296
} else {
260
297
const fileStream = fs . createReadStream ( filePath )
261
- let count = - 1
262
- Papa . parse ( fileStream , {
263
- ...{ header, dynamicTyping : true , ...options } ,
264
- step : results => {
265
- if ( isFirstChunk ) {
266
- if ( header === true ) {
267
- ndFrameColumnNames = results . meta . fields || [ ]
268
- } else {
269
- ndFrameColumnNames = results . data
298
+
299
+ fs . access ( filePath , fs . constants . F_OK , ( err ) => {
300
+ if ( err ) {
301
+ throw new Error ( "ENOENT: no such file or directory" ) ;
302
+ }
303
+
304
+ let count = - 1
305
+ Papa . parse ( fileStream , {
306
+ ...{ header, dynamicTyping : true , ...options } ,
307
+ step : results => {
308
+ if ( isFirstChunk ) {
309
+ if ( header === true ) {
310
+ ndFrameColumnNames = results . meta . fields || [ ]
311
+ } else {
312
+ ndFrameColumnNames = results . data
313
+ }
314
+ isFirstChunk = false
315
+ return
270
316
}
271
- isFirstChunk = false
272
- return
317
+
318
+ const df = new DataFrame ( [ results . data ] , {
319
+ columns : ndFrameColumnNames ,
320
+ index : [ count ++ ]
321
+ } )
322
+
323
+ csvInputStream . push ( df ) ;
324
+ } ,
325
+ complete : ( result : any ) => {
326
+ csvInputStream . push ( null ) ;
327
+ return null
328
+ } ,
329
+ error : ( err ) => {
330
+ csvInputStream . emit ( "error" , err ) ;
273
331
}
332
+ } ) ;
274
333
275
- const df = new DataFrame ( [ results . data ] , {
276
- columns : ndFrameColumnNames ,
277
- index : [ count ++ ]
278
- } )
279
-
280
- csvInputStream . push ( df ) ;
281
- } ,
282
- complete : ( result : any ) => {
283
- csvInputStream . push ( null ) ;
284
- return null
285
- } ,
286
- error : ( err ) => {
287
- csvInputStream . emit ( "error" , err ) ;
288
- }
334
+ return csvInputStream ;
289
335
} ) ;
290
-
291
- return csvInputStream ;
292
336
}
293
337
} ;
294
338
@@ -314,39 +358,45 @@ const $openCsvInputStream = (filePath: string, options: CsvInputOptionsNode) =>
314
358
* ```
315
359
*/
316
360
const $writeCsvOutputStream = ( filePath : string , options : CsvInputOptionsNode ) => {
317
- let isFirstRow = true
318
- const fileOutputStream = fs . createWriteStream ( filePath )
319
- const csvOutputStream = new stream . Writable ( { objectMode : true } )
361
+ fs . access ( filePath , fs . constants . F_OK , ( err ) => {
362
+ if ( err ) {
363
+ throw new Error ( "ENOENT: no such file or directory" ) ;
364
+ }
365
+
366
+ let isFirstRow = true
367
+ const fileOutputStream = fs . createWriteStream ( filePath )
368
+ const csvOutputStream = new stream . Writable ( { objectMode : true } )
320
369
321
- csvOutputStream . _write = ( chunk : DataFrame | Series , encoding , callback ) => {
370
+ csvOutputStream . _write = ( chunk : DataFrame | Series , encoding , callback ) => {
322
371
323
- if ( chunk instanceof DataFrame ) {
372
+ if ( chunk instanceof DataFrame ) {
324
373
325
- if ( isFirstRow ) {
326
- isFirstRow = false
327
- fileOutputStream . write ( $toCSV ( chunk , { header : true , ...options } ) ) ;
374
+ if ( isFirstRow ) {
375
+ isFirstRow = false
376
+ fileOutputStream . write ( $toCSV ( chunk , { header : true , ...options } ) ) ;
377
+ callback ( ) ;
378
+ } else {
379
+ fileOutputStream . write ( $toCSV ( chunk , { header : false , ...options } ) ) ;
380
+ callback ( ) ;
381
+ }
382
+
383
+ } else if ( chunk instanceof Series ) {
384
+
385
+ fileOutputStream . write ( $toCSV ( chunk ) ) ;
328
386
callback ( ) ;
387
+
329
388
} else {
330
- fileOutputStream . write ( $toCSV ( chunk , { header : false , ...options } ) ) ;
331
- callback ( ) ;
389
+ csvOutputStream . emit ( "error" , new Error ( "ValueError: Intermediate chunk must be either a Series or DataFrame" ) )
332
390
}
333
391
334
- } else if ( chunk instanceof Series ) {
335
-
336
- fileOutputStream . write ( $toCSV ( chunk ) ) ;
337
- callback ( ) ;
338
-
339
- } else {
340
- csvOutputStream . emit ( "error" , new Error ( "ValueError: Intermediate chunk must be either a Series or DataFrame" ) )
341
392
}
342
393
343
- }
344
-
345
- csvOutputStream . on ( "finish" , ( ) => {
346
- fileOutputStream . end ( )
347
- } )
394
+ csvOutputStream . on ( "finish" , ( ) => {
395
+ fileOutputStream . end ( )
396
+ } )
348
397
349
- return csvOutputStream
398
+ return csvOutputStream
399
+ } ) ;
350
400
}
351
401
352
402
@@ -358,4 +408,4 @@ export {
358
408
$toCSV ,
359
409
$writeCsvOutputStream ,
360
410
$openCsvInputStream ,
361
- }
411
+ }
0 commit comments