Skip to content

Commit d2b44a4

Browse files
committed
Allow rename and deletion of subfields
1 parent 161401d commit d2b44a4

15 files changed

+220
-14
lines changed

velox/connectors/hive/HiveConfig.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,20 @@ bool HiveConfig::isParquetUseColumnNames(
110110
config_->get<bool>(kParquetUseColumnNames, false));
111111
}
112112

113+
bool HiveConfig::isOrcAllowEnhancedSchemaEvolution(
114+
const config::ConfigBase* session) const {
115+
return session->get<bool>(
116+
kOrcAllowEnhancedSchemaEvolutionSession,
117+
config_->get<bool>(kOrcAllowEnhancedSchemaEvolution, false));
118+
}
119+
120+
bool HiveConfig::isParquetAllowEnhancedSchemaEvolution(
121+
const config::ConfigBase* session) const {
122+
return session->get<bool>(
123+
kParquetAllowEnhancedSchemaEvolutionSession,
124+
config_->get<bool>(kParquetAllowEnhancedSchemaEvolution, false));
125+
}
126+
113127
bool HiveConfig::isFileColumnNamesReadAsLowerCase(
114128
const config::ConfigBase* session) const {
115129
return session->get<bool>(

velox/connectors/hive/HiveConfig.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,16 @@ class HiveConfig {
8585
static constexpr const char* kParquetUseColumnNamesSession =
8686
"parquet_use_column_names";
8787

88+
static constexpr const char* kOrcAllowEnhancedSchemaEvolution =
89+
"hive.orc.allow-enhanced-schema-evolution";
90+
static constexpr const char* kOrcAllowEnhancedSchemaEvolutionSession =
91+
"orc_allow_enhanced_schema_evolution";
92+
93+
static constexpr const char* kParquetAllowEnhancedSchemaEvolution =
94+
"hive.parquet.allow-enhanced-schema-evolution";
95+
static constexpr const char* kParquetAllowEnhancedSchemaEvolutionSession =
96+
"parquet_allow_enhanced_schema_evolution";
97+
8898
/// Reads the source file column name as lower case.
8999
static constexpr const char* kFileColumnNamesReadAsLowerCase =
90100
"file-column-names-read-as-lower-case";
@@ -231,6 +241,12 @@ class HiveConfig {
231241

232242
bool isParquetUseColumnNames(const config::ConfigBase* session) const;
233243

244+
bool isOrcAllowEnhancedSchemaEvolution(
245+
const config::ConfigBase* session) const;
246+
247+
bool isParquetAllowEnhancedSchemaEvolution(
248+
const config::ConfigBase* session) const;
249+
234250
bool isFileColumnNamesReadAsLowerCase(
235251
const config::ConfigBase* session) const;
236252

velox/connectors/hive/HiveConnectorUtil.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,24 +556,30 @@ void configureReaderOptions(
556556
hiveConfig->isFileColumnNamesReadAsLowerCase(sessionProperties));
557557
readerOptions.setAllowEmptyFile(true);
558558
bool useColumnNamesForColumnMapping = false;
559+
bool allowEnhancedSchemaEvolution = false;
559560
switch (hiveSplit->fileFormat) {
560561
case dwio::common::FileFormat::DWRF:
561562
case dwio::common::FileFormat::ORC: {
562563
useColumnNamesForColumnMapping =
563564
hiveConfig->isOrcUseColumnNames(sessionProperties);
565+
allowEnhancedSchemaEvolution =
566+
hiveConfig->isOrcAllowEnhancedSchemaEvolution(sessionProperties);
564567
break;
565568
}
566569
case dwio::common::FileFormat::PARQUET: {
567570
useColumnNamesForColumnMapping =
568571
hiveConfig->isParquetUseColumnNames(sessionProperties);
572+
allowEnhancedSchemaEvolution =
573+
hiveConfig->isParquetAllowEnhancedSchemaEvolution(sessionProperties);
569574
break;
570575
}
571576
default:
572-
useColumnNamesForColumnMapping = false;
577+
break;
573578
}
574579

575580
readerOptions.setUseColumnNamesForColumnMapping(
576581
useColumnNamesForColumnMapping);
582+
readerOptions.setAllowEnhancedSchemaEvolution(allowEnhancedSchemaEvolution);
577583
readerOptions.setFileSchema(fileSchema);
578584
readerOptions.setFooterEstimatedSize(hiveConfig->footerEstimatedSize());
579585
readerOptions.setFilePreloadThreshold(hiveConfig->filePreloadThreshold());

velox/connectors/hive/SplitReader.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -440,12 +440,22 @@ std::vector<TypePtr> SplitReader::adaptColumns(
440440
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
441441
if (!fileTypeIdx.has_value()) {
442442
// Column is missing. Most likely due to schema evolution.
443-
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
443+
TypePtr fieldType = nullptr;
444+
if (baseReaderOpts_.allowEnhancedSchemaEvolution()) {
445+
auto outputTypeIdx =
446+
readerOutputType_->getChildIdxIfExists(fieldName);
447+
if (outputTypeIdx.has_value()) {
448+
// Field name exists in the user-specified output type.
449+
fieldType = readerOutputType_->childAt(outputTypeIdx.value());
450+
}
451+
}
452+
if (fieldType == nullptr) {
453+
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
454+
fieldType = tableSchema->findChild(fieldName);
455+
}
444456
childSpec->setConstantValue(
445457
BaseVector::createNullConstant(
446-
tableSchema->findChild(fieldName),
447-
1,
448-
connectorQueryCtx_->memoryPool()));
458+
fieldType, 1, connectorQueryCtx_->memoryPool()));
449459
} else {
450460
// Column no longer missing, reset constant value set on the spec.
451461
childSpec->setConstantValue(nullptr);

velox/dwio/common/Options.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ ColumnReaderOptions makeColumnReaderOptions(const ReaderOptions& options) {
7474
ColumnReaderOptions columnReaderOptions;
7575
columnReaderOptions.useColumnNamesForColumnMapping_ =
7676
options.useColumnNamesForColumnMapping();
77+
columnReaderOptions.allowEnhancedSchemaEvolution_ =
78+
options.allowEnhancedSchemaEvolution();
7779
return columnReaderOptions;
7880
}
7981

velox/dwio/common/Options.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,11 @@ class ReaderOptions : public io::ReaderOptions {
600600
return *this;
601601
}
602602

603+
ReaderOptions& setAllowEnhancedSchemaEvolution(bool flag) {
604+
allowEnhancedSchemaEvolution_ = flag;
605+
return *this;
606+
}
607+
603608
ReaderOptions& setIOExecutor(std::shared_ptr<folly::Executor> executor) {
604609
ioExecutor_ = std::move(executor);
605610
return *this;
@@ -675,6 +680,10 @@ class ReaderOptions : public io::ReaderOptions {
675680
return useColumnNamesForColumnMapping_;
676681
}
677682

683+
bool allowEnhancedSchemaEvolution() const {
684+
return allowEnhancedSchemaEvolution_;
685+
}
686+
678687
const std::shared_ptr<random::RandomSkipTracker>& randomSkip() const {
679688
return randomSkip_;
680689
}
@@ -726,6 +735,7 @@ class ReaderOptions : public io::ReaderOptions {
726735
uint64_t filePreloadThreshold_{kDefaultFilePreloadThreshold};
727736
bool fileColumnNamesReadAsLowerCase_{false};
728737
bool useColumnNamesForColumnMapping_{false};
738+
bool allowEnhancedSchemaEvolution_{false};
729739
std::shared_ptr<folly::Executor> ioExecutor_;
730740
std::shared_ptr<random::RandomSkipTracker> randomSkip_;
731741
std::shared_ptr<velox::common::ScanSpec> scanSpec_;
@@ -771,6 +781,10 @@ struct ColumnReaderOptions {
771781
// Whether to map table field names to file field names using names, not
772782
// indices.
773783
bool useColumnNamesForColumnMapping_{false};
784+
785+
// Whether enhanced schema evolution operations, such as renaming or deleting
786+
// fields and creating NULL values, are allowed.
787+
bool allowEnhancedSchemaEvolution_{false};
774788
};
775789

776790
ColumnReaderOptions makeColumnReaderOptions(const ReaderOptions& options);

velox/dwio/common/SelectiveFlatMapColumnReader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ namespace facebook::velox::dwio::common {
2424
class SelectiveFlatMapColumnReader : public SelectiveStructColumnReaderBase {
2525
protected:
2626
SelectiveFlatMapColumnReader(
27+
const dwio::common::ColumnReaderOptions& columnReaderOptions,
2728
const TypePtr& requestedType,
2829
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
2930
FormatParams& params,
3031
velox::common::ScanSpec& scanSpec)
3132
: SelectiveStructColumnReaderBase(
33+
columnReaderOptions,
3234
requestedType,
3335
fileType,
3436
params,

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,9 @@ void SelectiveStructColumnReaderBase::read(
425425
}
426426

427427
const auto& childSpecs = scanSpec_->children();
428-
VELOX_CHECK(!childSpecs.empty());
428+
if (!columnReaderOptions_.allowEnhancedSchemaEvolution_) {
429+
VELOX_CHECK(!childSpecs.empty());
430+
}
429431
for (size_t i = 0; i < childSpecs.size(); ++i) {
430432
const auto& childSpec = childSpecs[i];
431433

@@ -512,6 +514,9 @@ void SelectiveStructColumnReaderBase::recordParentNullsInChildren(
512514

513515
bool SelectiveStructColumnReaderBase::isChildMissing(
514516
const velox::common::ScanSpec& childSpec) const {
517+
bool enhancedSchemaEvolutionByName =
518+
columnReaderOptions_.useColumnNamesForColumnMapping_ &&
519+
columnReaderOptions_.allowEnhancedSchemaEvolution_;
515520
return
516521
// The below check is trying to determine if this is a missing field in a
517522
// struct that should be constant null.
@@ -524,9 +529,12 @@ bool SelectiveStructColumnReaderBase::isChildMissing(
524529
// row type that doesn't exist
525530
// in the output.
526531
fileType_->type()->kind() !=
527-
TypeKind::MAP && // If this is the case it means this is a flat map,
528-
// so it can't have "missing" fields.
529-
childSpec.channel() >= fileType_->size());
532+
TypeKind::MAP // If this is the case it means this is a flat map,
533+
// so it can't have "missing" fields.
534+
) &&
535+
(enhancedSchemaEvolutionByName
536+
? !asRowType(fileType_->type())->containsChild(childSpec.fieldName())
537+
: childSpec.channel() >= fileType_->size());
530538
}
531539

532540
std::unique_ptr<velox::dwio::common::ColumnLoader>
@@ -538,7 +546,9 @@ SelectiveStructColumnReaderBase::makeColumnLoader(vector_size_t index) {
538546
void SelectiveStructColumnReaderBase::getValues(
539547
const RowSet& rows,
540548
VectorPtr* result) {
541-
VELOX_CHECK(!scanSpec_->children().empty());
549+
if (!columnReaderOptions_.allowEnhancedSchemaEvolution_) {
550+
VELOX_CHECK(!scanSpec_->children().empty());
551+
}
542552
VELOX_CHECK_NOT_NULL(
543553
*result, "SelectiveStructColumnReaderBase expects a non-null result");
544554
VELOX_CHECK(

velox/dwio/common/SelectiveStructColumnReader.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include "velox/dwio/common/Options.h"
1920
#include "velox/dwio/common/SelectiveColumnReaderInternal.h"
2021

2122
namespace facebook::velox::dwio::common {
@@ -113,13 +114,15 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
113114
static constexpr int32_t kConstantChildSpecSubscript{-1};
114115

115116
SelectiveStructColumnReaderBase(
117+
const dwio::common::ColumnReaderOptions& columnReaderOptions,
116118
const TypePtr& requestedType,
117119
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
118120
FormatParams& params,
119121
velox::common::ScanSpec& scanSpec,
120122
bool isRoot = false,
121123
bool generateLazyChildren = true)
122124
: SelectiveColumnReader(requestedType, fileType, params, scanSpec),
125+
columnReaderOptions_(columnReaderOptions),
123126
debugString_(
124127
getExceptionContext().message(VeloxException::Type::kSystem)),
125128
isRoot_(isRoot),
@@ -180,6 +183,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
180183
}
181184
}
182185

186+
const dwio::common::ColumnReaderOptions& columnReaderOptions_;
187+
183188
// Context information obtained from ExceptionContext. Stored here
184189
// so that LazyVector readers under this can add this to their
185190
// ExceptionContext. Allows contextualizing reader errors to split

velox/dwio/dwrf/reader/DwrfReader.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -320,13 +320,14 @@ DwrfRowReader::DwrfRowReader(
320320
makeProjectedNodes(*getReader().schemaWithId(), *projectedNodes_);
321321
}
322322

323+
// Reader options must be configured before calling 'getUnitLoader()',
324+
// which triggers 'SelectiveDwrfReader::build'.
325+
columnReaderOptions_ = dwio::common::makeColumnReaderOptions(
326+
readerBaseShared()->readerOptions());
323327
unitLoader_ = getUnitLoader();
324328
if (!emptyFile()) {
325329
getReader().loadCache();
326330
}
327-
328-
columnReaderOptions_ = dwio::common::makeColumnReaderOptions(
329-
readerBaseShared()->readerOptions());
330331
}
331332

332333
std::unique_ptr<ColumnReader>& DwrfRowReader::getColumnReader() {

0 commit comments

Comments
 (0)