Skip to content

Commit e847a3b

Browse files
committed
Support struct column reading with different schemas
1 parent 4667f6d commit e847a3b

File tree

12 files changed

+201
-29
lines changed

12 files changed

+201
-29
lines changed

velox/connectors/hive/SplitReader.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,18 @@ std::vector<TypePtr> SplitReader::adaptColumns(
220220
} else {
221221
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
222222
if (!fileTypeIdx.has_value()) {
223-
// Column is missing. Most likely due to schema evolution.
224-
VELOX_CHECK(tableSchema);
225-
setNullConstantValue(childSpec, tableSchema->findChild(fieldName));
223+
// If field name exists in the user-specified output type,
224+
// set the column as null constant.
225+
// Related PR: https://github.com/facebookincubator/velox/pull/6427.
226+
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
227+
if (outputTypeIdx.has_value()) {
228+
setNullConstantValue(
229+
childSpec, readerOutputType_->childAt(outputTypeIdx.value()));
230+
} else {
231+
// Column is missing. Most likely due to schema evolution.
232+
VELOX_CHECK(tableSchema);
233+
setNullConstantValue(childSpec, tableSchema->findChild(fieldName));
234+
}
226235
} else {
227236
// Column no longer missing, reset constant value set on the spec.
228237
childSpec->setConstantValue(nullptr);

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ void SelectiveStructColumnReaderBase::read(
133133
}
134134

135135
auto& childSpecs = scanSpec_->children();
136-
VELOX_CHECK(!childSpecs.empty());
137136
for (size_t i = 0; i < childSpecs.size(); ++i) {
138137
auto& childSpec = childSpecs[i];
139138
if (isChildConstant(*childSpec)) {
@@ -218,7 +217,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant(
218217
fileType_->type()->kind() !=
219218
TypeKind::MAP && // If this is the case it means this is a flat map,
220219
// so it can't have "missing" fields.
221-
childSpec.channel() >= fileType_->size());
220+
!fileType_->containsChild(childSpec.fieldName()));
222221
}
223222

224223
namespace {
@@ -302,7 +301,6 @@ void setNullField(
302301
void SelectiveStructColumnReaderBase::getValues(
303302
RowSet rows,
304303
VectorPtr* result) {
305-
VELOX_CHECK(!scanSpec_->children().empty());
306304
VELOX_CHECK_NOT_NULL(
307305
*result, "SelectiveStructColumnReaderBase expects a non-null result");
308306
VELOX_CHECK(

velox/dwio/common/TypeWithId.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
5959

6060
const std::shared_ptr<const TypeWithId>& childAt(uint32_t idx) const override;
6161

62+
bool containsChild(const std::string& name) const {
63+
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
64+
return type_->as<velox::TypeKind::ROW>().containsChild(name);
65+
}
66+
6267
const std::shared_ptr<const TypeWithId>& childByName(
6368
const std::string& name) const {
6469
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
3737
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3838
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3939
ParquetParams& params,
40-
common::ScanSpec& scanSpec) {
40+
common::ScanSpec& scanSpec,
41+
memory::MemoryPool& pool) {
4142
auto colName = scanSpec.fieldName();
4243

4344
switch (fileType->type()->kind()) {
@@ -58,19 +59,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
5859

5960
case TypeKind::ROW:
6061
return std::make_unique<StructColumnReader>(
61-
requestedType, fileType, params, scanSpec);
62+
requestedType, fileType, params, scanSpec, pool);
6263

6364
case TypeKind::VARBINARY:
6465
case TypeKind::VARCHAR:
6566
return std::make_unique<StringColumnReader>(fileType, params, scanSpec);
6667

6768
case TypeKind::ARRAY:
6869
return std::make_unique<ListColumnReader>(
69-
requestedType, fileType, params, scanSpec);
70+
requestedType, fileType, params, scanSpec, pool);
7071

7172
case TypeKind::MAP:
7273
return std::make_unique<MapColumnReader>(
73-
requestedType, fileType, params, scanSpec);
74+
requestedType, fileType, params, scanSpec, pool);
7475

7576
case TypeKind::BOOLEAN:
7677
return std::make_unique<BooleanColumnReader>(

velox/dwio/parquet/reader/ParquetColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class ParquetColumnReader {
4545
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
4646
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
4747
ParquetParams& params,
48-
common::ScanSpec& scanSpec);
48+
common::ScanSpec& scanSpec,
49+
memory::MemoryPool& pool);
4950
};
5051
} // namespace facebook::velox::parquet

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ class ReaderBase {
8484
/// the data still exists in the buffered inputs.
8585
bool isRowGroupBuffered(int32_t rowGroupIndex) const;
8686

87+
static std::shared_ptr<const dwio::common::TypeWithId> createTypeWithId(
88+
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
89+
const RowTypePtr& rowTypePtr,
90+
bool fileColumnNamesReadAsLowerCase);
91+
8792
private:
8893
// Reads and parses file footer.
8994
void loadFileMetaData();
@@ -564,6 +569,33 @@ std::shared_ptr<const RowType> ReaderBase::createRowType(
564569
std::move(childNames), std::move(childTypes));
565570
}
566571

572+
std::shared_ptr<const dwio::common::TypeWithId> ReaderBase::createTypeWithId(
573+
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
574+
const RowTypePtr& rowTypePtr,
575+
bool fileColumnNamesReadAsLowerCase) {
576+
if (!fileColumnNamesReadAsLowerCase) {
577+
return inputType;
578+
}
579+
std::vector<std::string> names;
580+
names.reserve(rowTypePtr->names().size());
581+
std::vector<TypePtr> types = rowTypePtr->children();
582+
for (const auto& name : rowTypePtr->names()) {
583+
std::string childName = name;
584+
folly::toLowerAscii(childName);
585+
names.emplace_back(childName);
586+
}
587+
auto convertedType =
588+
TypeFactory<TypeKind::ROW>::create(std::move(names), std::move(types));
589+
590+
auto children = inputType->getChildren();
591+
return std::make_shared<const dwio::common::TypeWithId>(
592+
convertedType,
593+
std::move(children),
594+
inputType->id(),
595+
inputType->maxId(),
596+
inputType->column());
597+
}
598+
567599
void ReaderBase::scheduleRowGroups(
568600
const std::vector<uint32_t>& rowGroupIds,
569601
int32_t currentGroup,
@@ -630,13 +662,19 @@ ParquetRowReader::ParquetRowReader(
630662
return; // TODO
631663
}
632664
ParquetParams params(pool_, columnReaderStats_, readerBase_->fileMetaData());
633-
auto columnSelector = std::make_shared<ColumnSelector>(
634-
ColumnSelector::apply(options_.getSelector(), readerBase_->schema()));
665+
// ColumnSelector::apply does not work for schema pruning case.
666+
auto columnSelector = options_.getSelector() == nullptr
667+
? std::make_shared<ColumnSelector>(ColumnSelector(readerBase_->schema()))
668+
: options_.getSelector();
635669
columnReader_ = ParquetColumnReader::build(
636-
columnSelector->getSchemaWithId(),
670+
ReaderBase::createTypeWithId(
671+
columnSelector->getSchemaWithId(),
672+
asRowType(columnSelector->getSchemaWithId()->type()),
673+
readerBase_->isFileColumnNamesReadAsLowerCase()),
637674
readerBase_->schemaWithId(), // Id is schema id
638675
params,
639-
*options_.getScanSpec());
676+
*options_.getScanSpec(),
677+
pool_);
640678

641679
filterRowGroups();
642680
if (!rowGroupIds_.empty()) {

velox/dwio/parquet/reader/RepeatedColumnReader.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ PageReader* FOLLY_NULLABLE readLeafRepDefs(
3333
return nullptr;
3434
}
3535
auto pageReader = reader->formatData().as<ParquetData>().reader();
36+
if (pageReader == nullptr) {
37+
return nullptr;
38+
}
3639
pageReader->decodeRepDefs(numTop);
3740
return pageReader;
3841
}
@@ -113,7 +116,8 @@ MapColumnReader::MapColumnReader(
113116
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
114117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
115118
ParquetParams& params,
116-
common::ScanSpec& scanSpec)
119+
common::ScanSpec& scanSpec,
120+
memory::MemoryPool& pool)
117121
: dwio::common::SelectiveMapColumnReader(
118122
requestedType,
119123
fileType,
@@ -123,9 +127,17 @@ MapColumnReader::MapColumnReader(
123127
auto& keyChildType = requestedType->childAt(0);
124128
auto& elementChildType = requestedType->childAt(1);
125129
keyReader_ = ParquetColumnReader::build(
126-
keyChildType, fileType_->childAt(0), params, *scanSpec.children()[0]);
130+
keyChildType,
131+
fileType_->childAt(0),
132+
params,
133+
*scanSpec.children()[0],
134+
pool);
127135
elementReader_ = ParquetColumnReader::build(
128-
elementChildType, fileType_->childAt(1), params, *scanSpec.children()[1]);
136+
elementChildType,
137+
fileType_->childAt(1),
138+
params,
139+
*scanSpec.children()[1],
140+
pool);
129141
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
130142
->makeLevelInfo(levelInfo_);
131143
children_ = {keyReader_.get(), elementReader_.get()};
@@ -223,15 +235,16 @@ ListColumnReader::ListColumnReader(
223235
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
224236
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
225237
ParquetParams& params,
226-
common::ScanSpec& scanSpec)
238+
common::ScanSpec& scanSpec,
239+
memory::MemoryPool& pool)
227240
: dwio::common::SelectiveListColumnReader(
228241
requestedType,
229242
fileType,
230243
params,
231244
scanSpec) {
232245
auto& childType = requestedType->childAt(0);
233246
child_ = ParquetColumnReader::build(
234-
childType, fileType_->childAt(0), params, *scanSpec.children()[0]);
247+
childType, fileType_->childAt(0), params, *scanSpec.children()[0], pool);
235248
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
236249
->makeLevelInfo(levelInfo_);
237250
children_ = {child_.get()};

velox/dwio/parquet/reader/RepeatedColumnReader.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
5959
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
6060
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
6161
ParquetParams& params,
62-
common::ScanSpec& scanSpec);
62+
common::ScanSpec& scanSpec,
63+
memory::MemoryPool& pool);
6364

6465
void prepareRead(
6566
vector_size_t offset,
@@ -115,7 +116,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
115116
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
116117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
117118
ParquetParams& params,
118-
common::ScanSpec& scanSpec);
119+
common::ScanSpec& scanSpec,
120+
memory::MemoryPool& pool);
119121

120122
void prepareRead(
121123
vector_size_t offset,

velox/dwio/parquet/reader/StructColumnReader.cpp

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,46 @@ StructColumnReader::StructColumnReader(
3030
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3131
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3232
ParquetParams& params,
33-
common::ScanSpec& scanSpec)
33+
common::ScanSpec& scanSpec,
34+
memory::MemoryPool& pool)
3435
: SelectiveStructColumnReader(requestedType, fileType, params, scanSpec) {
3536
auto& childSpecs = scanSpec_->stableChildren();
37+
std::vector<int> missingFields;
3638
for (auto i = 0; i < childSpecs.size(); ++i) {
3739
auto childSpec = childSpecs[i];
3840
if (childSpecs[i]->isConstant()) {
3941
continue;
4042
}
41-
auto childFileType = fileType_->childByName(childSpec->fieldName());
42-
auto childRequestedType =
43-
requestedType_->childByName(childSpec->fieldName());
43+
const auto& fieldName = childSpec->fieldName();
44+
if (!fileType_->containsChild(fieldName)) {
45+
missingFields.emplace_back(i);
46+
continue;
47+
}
48+
auto childFileType = fileType_->childByName(fieldName);
49+
auto childRequestedType = requestedType_->childByName(fieldName);
4450
addChild(ParquetColumnReader::build(
45-
childRequestedType, childFileType, params, *childSpec));
51+
childRequestedType, childFileType, params, *childSpec, pool));
4652
childSpecs[i]->setSubscript(children_.size() - 1);
4753
}
54+
55+
if (missingFields.size() > 0) {
56+
// Set the struct as null if all the children fields in the output type are
57+
// missing and the number of child fields is more than one.
58+
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
59+
scanSpec_->setConstantValue(
60+
BaseVector::createNullConstant(requestedType_->type(), 1, &pool));
61+
} else {
62+
// Set null constant for the missing child field of output type.
63+
for (int channel : missingFields) {
64+
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
65+
requestedType_->childByName(childSpecs[channel]->fieldName())
66+
->type(),
67+
1,
68+
&pool));
69+
}
70+
}
71+
}
72+
4873
auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
4974
if (type->parent()) {
5075
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
@@ -54,7 +79,10 @@ StructColumnReader::StructColumnReader(
5479
// this and the child.
5580
auto child = childForRepDefs_;
5681
for (;;) {
57-
assert(child);
82+
if (child == nullptr) {
83+
levelMode_ = LevelMode::kNulls;
84+
break;
85+
}
5886
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
5987
child->fileType().type()->kind() == TypeKind::MAP) {
6088
levelMode_ = LevelMode::kStructOverLists;

velox/dwio/parquet/reader/StructColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader {
3636
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3737
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3838
ParquetParams& params,
39-
common::ScanSpec& scanSpec);
39+
common::ScanSpec& scanSpec,
40+
memory::MemoryPool& pool);
4041

4142
void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
4243
override;

0 commit comments

Comments
 (0)