Skip to content

Commit 8c8d26c

Browse files
yugan95Yu Gan
andauthored
apache#118 s3 file read retry (apache#128)
Co-authored-by: Yu Gan <[email protected]>
1 parent 941ffcb commit 8c8d26c

File tree

2 files changed

+106
-93
lines changed

2 files changed

+106
-93
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ protected void initialize(String path, List<String> columns) throws IOException
199199
config.set("spark.sql.parquet.int96AsTimestamp", "false");
200200

201201
this.file = new Path(path);
202+
// read retry before getFileStatus
203+
this.file.getFileSystem(config).open(this.file).close();
202204
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
203205
ParquetMetadata footer = readFooter(config, file, range(0, length));
204206

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 104 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -352,113 +352,124 @@ class ParquetFileFormat
352352

353353
(file: PartitionedFile) => {
354354
assert(file.partitionValues.numFields == partitionSchema.size)
355-
356355
val fileSplit =
357356
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
358357
val filePath = fileSplit.getPath
359-
360-
val split =
361-
new org.apache.parquet.hadoop.ParquetInputSplit(
362-
filePath,
363-
fileSplit.getStart,
364-
fileSplit.getStart + fileSplit.getLength,
365-
fileSplit.getLength,
366-
fileSplit.getLocations,
367-
null)
368-
369-
val sharedConf = broadcastedHadoopConf.value.value
370-
371-
lazy val footerFileMetaData =
372-
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
373-
// Try to push down filters when filter push-down is enabled.
374-
val pushed = if (enableParquetFilterPushDown) {
375-
val parquetSchema = footerFileMetaData.getSchema
376-
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
377-
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
378-
filters
379-
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
380-
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
381-
// is used here.
382-
.flatMap(parquetFilters.createFilter(parquetSchema, _))
383-
.reduceOption(FilterApi.and)
384-
} else {
385-
None
386-
}
387-
388-
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
389-
// *only* if the file was created by something other than "parquet-mr", so check the actual
390-
// writer here for this file. We have to do this per-file, as each file in the table may
391-
// have different writers.
392-
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
393-
def isCreatedByParquetMr: Boolean =
394-
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
395-
396-
val convertTz =
397-
if (timestampConversion && !isCreatedByParquetMr) {
398-
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
358+
try {
359+
val split =
360+
new org.apache.parquet.hadoop.ParquetInputSplit(
361+
filePath,
362+
fileSplit.getStart,
363+
fileSplit.getStart + fileSplit.getLength,
364+
fileSplit.getLength,
365+
fileSplit.getLocations,
366+
null)
367+
368+
val sharedConf = broadcastedHadoopConf.value.value
369+
370+
logInfo(s"READ_RETRY before footerFileMetaData $filePath")
371+
filePath.getFileSystem(sharedConf).open(filePath).close()
372+
lazy val footerFileMetaData =
373+
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
374+
// Try to push down filters when filter push-down is enabled.
375+
val pushed = if (enableParquetFilterPushDown) {
376+
val parquetSchema = footerFileMetaData.getSchema
377+
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
378+
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
379+
filters
380+
// Collects all converted Parquet filter predicates.
381+
// Notice that not all predicates can be
382+
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
383+
// is used here.
384+
.flatMap(parquetFilters.createFilter(parquetSchema, _))
385+
.reduceOption(FilterApi.and)
399386
} else {
400387
None
401388
}
402389

403-
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
404-
val hadoopAttemptContext =
405-
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
390+
// PARQUET_INT96_TIMESTAMP_CONVERSION
391+
// says to apply timezone conversions to int96 timestamps'
392+
// *only* if the file was created by something other than "parquet-mr", so check the actual
393+
// writer here for this file. We have to do this per-file, as each file in the table may
394+
// have different writers.
395+
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
396+
def isCreatedByParquetMr: Boolean =
397+
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
398+
399+
val convertTz =
400+
if (timestampConversion && !isCreatedByParquetMr) {
401+
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
402+
} else {
403+
None
404+
}
406405

407-
// Try to push down filters when filter push-down is enabled.
408-
// Notice: This push-down is RowGroups level, not individual records.
409-
if (pushed.isDefined) {
410-
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
411-
}
412-
val taskContext = Option(TaskContext.get())
413-
val iter = if (enableVectorizedReader) {
414-
val vectorizedReader = new VectorizedParquetRecordReader(
415-
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
416-
val iter = new RecordReaderIterator(vectorizedReader)
417-
// SPARK-23457 Register a task completion lister before `initialization`.
418-
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
419-
vectorizedReader.initialize(split, hadoopAttemptContext)
420-
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
421-
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
422-
if (returningBatch) {
423-
vectorizedReader.enableReturningBatches()
424-
}
406+
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
407+
val hadoopAttemptContext =
408+
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
425409

426-
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
427-
iter.asInstanceOf[Iterator[InternalRow]]
428-
} else {
429-
logDebug(s"Falling back to parquet-mr")
430-
// ParquetRecordReader returns UnsafeRow
431-
val reader = if (pushed.isDefined && enableRecordFilter) {
432-
val parquetFilter = FilterCompat.get(pushed.get, null)
433-
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
434-
} else {
435-
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
410+
// Try to push down filters when filter push-down is enabled.
411+
// Notice: This push-down is RowGroups level, not individual records.
412+
if (pushed.isDefined) {
413+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
436414
}
437-
val iter = new RecordReaderIterator(reader)
438-
// SPARK-23457 Register a task completion lister before `initialization`.
439-
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
440-
reader.initialize(split, hadoopAttemptContext)
441-
442-
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
443-
val joinedRow = new JoinedRow()
444-
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
445-
446-
// This is a horrible erasure hack... if we type the iterator above, then it actually check
447-
// the type in next() and we get a class cast exception. If we make that function return
448-
// Object, then we can defer the cast until later!
449-
if (partitionSchema.length == 0) {
450-
// There is no partition columns
415+
val taskContext = Option(TaskContext.get())
416+
val iter = if (enableVectorizedReader) {
417+
logInfo(s"FILE_SYSTEM_CHECK VectorizedParquetRecordReader $filePath")
418+
val vectorizedReader = new VectorizedParquetRecordReader(
419+
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
420+
val iter = new RecordReaderIterator(vectorizedReader)
421+
// SPARK-23457 Register a task completion lister before `initialization`.
422+
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
423+
vectorizedReader.initialize(split, hadoopAttemptContext)
424+
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
425+
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
426+
if (returningBatch) {
427+
vectorizedReader.enableReturningBatches()
428+
}
429+
430+
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
451431
iter.asInstanceOf[Iterator[InternalRow]]
452432
} else {
453-
iter.asInstanceOf[Iterator[InternalRow]]
454-
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
433+
logInfo(s"FILE_SYSTEM_CHECK ParquetRecordReader $filePath")
434+
logDebug(s"Falling back to parquet-mr")
435+
// ParquetRecordReader returns UnsafeRow
436+
val reader = if (pushed.isDefined && enableRecordFilter) {
437+
val parquetFilter = FilterCompat.get(pushed.get, null)
438+
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
439+
} else {
440+
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
441+
}
442+
val iter = new RecordReaderIterator(reader)
443+
// SPARK-23457 Register a task completion lister before `initialization`.
444+
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
445+
reader.initialize(split, hadoopAttemptContext)
446+
447+
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
448+
val joinedRow = new JoinedRow()
449+
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
450+
451+
// This is a horrible erasure hack...
452+
// if we type the iterator above, then it actually check
453+
// the type in next() and we get a class cast exception. If we make that function return
454+
// Object, then we can defer the cast until later!
455+
if (partitionSchema.length == 0) {
456+
// There is no partition columns
457+
iter.asInstanceOf[Iterator[InternalRow]]
458+
} else {
459+
iter.asInstanceOf[Iterator[InternalRow]]
460+
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
461+
}
455462
}
463+
if (taskContext.isDefined) {
464+
val metrics = taskContext.get.taskMetrics()
465+
metrics.setAdditionalMetric("Parquet Metric:" + ParquetMetrics.get().toString)
466+
}
467+
iter
468+
} catch {
469+
case e: IOException =>
470+
logError(s"RECORD_ERROR_STACK_INFO $filePath", e)
471+
throw e
456472
}
457-
if (taskContext.isDefined) {
458-
val metrics = taskContext.get.taskMetrics()
459-
metrics.setAdditionalMetric("Parquet Metric:" + ParquetMetrics.get().toString)
460-
}
461-
iter
462473
}
463474
}
464475

0 commit comments

Comments
 (0)