Skip to content

Commit 6dc6b0f

Browse files
committed
Support struct column reading with different schemas
1 parent 7b11611 commit 6dc6b0f

File tree

12 files changed

+201
-31
lines changed

12 files changed

+201
-31
lines changed

velox/connectors/hive/SplitReader.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,18 @@ std::vector<TypePtr> SplitReader::adaptColumns(
220220
} else {
221221
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
222222
if (!fileTypeIdx.has_value()) {
223-
// Column is missing. Most likely due to schema evolution.
224-
VELOX_CHECK(tableSchema);
225-
setNullConstantValue(childSpec, tableSchema->findChild(fieldName));
223+
// If field name exists in the user-specified output type,
224+
// set the column as null constant.
225+
// Related PR: https://github.com/facebookincubator/velox/pull/6427.
226+
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
227+
if (outputTypeIdx.has_value()) {
228+
setNullConstantValue(
229+
childSpec, readerOutputType_->childAt(outputTypeIdx.value()));
230+
} else {
231+
// Column is missing. Most likely due to schema evolution.
232+
VELOX_CHECK(tableSchema);
233+
setNullConstantValue(childSpec, tableSchema->findChild(fieldName));
234+
}
226235
} else {
227236
// Column no longer missing, reset constant value set on the spec.
228237
childSpec->setConstantValue(nullptr);

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,7 @@ void SelectiveStructColumnReaderBase::read(
132132
activeRows = outputRows_;
133133
}
134134

135-
auto& childSpecs = scanSpec_->children();
136-
VELOX_CHECK(!childSpecs.empty());
135+
auto& childSpecs = scanSpec_->stableChildren();
137136
for (size_t i = 0; i < childSpecs.size(); ++i) {
138137
auto& childSpec = childSpecs[i];
139138
if (isChildConstant(*childSpec)) {
@@ -218,7 +217,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant(
218217
fileType_->type()->kind() !=
219218
TypeKind::MAP && // If this is the case it means this is a flat map,
220219
// so it can't have "missing" fields.
221-
childSpec.channel() >= fileType_->size());
220+
!fileType_->containsChild(childSpec.fieldName()));
222221
}
223222

224223
namespace {
@@ -302,7 +301,6 @@ void setNullField(
302301
void SelectiveStructColumnReaderBase::getValues(
303302
RowSet rows,
304303
VectorPtr* result) {
305-
VELOX_CHECK(!scanSpec_->children().empty());
306304
VELOX_CHECK(
307305
*result != nullptr,
308306
"SelectiveStructColumnReaderBase expects a non-null result");
@@ -339,7 +337,7 @@ void SelectiveStructColumnReaderBase::getValues(
339337
resultRow->clearNulls(0, rows.size());
340338
}
341339
bool lazyPrepared = false;
342-
for (auto& childSpec : scanSpec_->children()) {
340+
for (auto& childSpec : scanSpec_->stableChildren()) {
343341
if (!childSpec->projectOut()) {
344342
continue;
345343
}

velox/dwio/common/TypeWithId.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
5959

6060
const std::shared_ptr<const TypeWithId>& childAt(uint32_t idx) const override;
6161

62+
bool containsChild(const std::string& name) const {
63+
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
64+
return type_->as<velox::TypeKind::ROW>().containsChild(name);
65+
}
66+
6267
const std::shared_ptr<const TypeWithId>& childByName(
6368
const std::string& name) const {
6469
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
3737
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3838
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3939
ParquetParams& params,
40-
common::ScanSpec& scanSpec) {
40+
common::ScanSpec& scanSpec,
41+
memory::MemoryPool& pool) {
4142
auto colName = scanSpec.fieldName();
4243

4344
switch (fileType->type()->kind()) {
@@ -58,19 +59,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
5859

5960
case TypeKind::ROW:
6061
return std::make_unique<StructColumnReader>(
61-
requestedType, fileType, params, scanSpec);
62+
requestedType, fileType, params, scanSpec, pool);
6263

6364
case TypeKind::VARBINARY:
6465
case TypeKind::VARCHAR:
6566
return std::make_unique<StringColumnReader>(fileType, params, scanSpec);
6667

6768
case TypeKind::ARRAY:
6869
return std::make_unique<ListColumnReader>(
69-
requestedType, fileType, params, scanSpec);
70+
requestedType, fileType, params, scanSpec, pool);
7071

7172
case TypeKind::MAP:
7273
return std::make_unique<MapColumnReader>(
73-
requestedType, fileType, params, scanSpec);
74+
requestedType, fileType, params, scanSpec, pool);
7475

7576
case TypeKind::BOOLEAN:
7677
return std::make_unique<BooleanColumnReader>(

velox/dwio/parquet/reader/ParquetColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class ParquetColumnReader {
4545
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
4646
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
4747
ParquetParams& params,
48-
common::ScanSpec& scanSpec);
48+
common::ScanSpec& scanSpec,
49+
memory::MemoryPool& pool);
4950
};
5051
} // namespace facebook::velox::parquet

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ class ReaderBase {
8383
/// the data still exists in the buffered inputs.
8484
bool isRowGroupBuffered(int32_t rowGroupIndex) const;
8585

86+
static std::shared_ptr<const dwio::common::TypeWithId> createTypeWithId(
87+
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
88+
const RowTypePtr& rowTypePtr,
89+
bool fileColumnNamesReadAsLowerCase);
90+
8691
private:
8792
// Reads and parses file footer.
8893
void loadFileMetaData();
@@ -563,6 +568,33 @@ std::shared_ptr<const RowType> ReaderBase::createRowType(
563568
std::move(childNames), std::move(childTypes));
564569
}
565570

571+
std::shared_ptr<const dwio::common::TypeWithId> ReaderBase::createTypeWithId(
572+
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
573+
const RowTypePtr& rowTypePtr,
574+
bool fileColumnNamesReadAsLowerCase) {
575+
if (!fileColumnNamesReadAsLowerCase) {
576+
return inputType;
577+
}
578+
std::vector<std::string> names;
579+
names.reserve(rowTypePtr->names().size());
580+
std::vector<TypePtr> types = rowTypePtr->children();
581+
for (const auto& name : rowTypePtr->names()) {
582+
std::string childName = name;
583+
folly::toLowerAscii(childName);
584+
names.emplace_back(childName);
585+
}
586+
auto convertedType =
587+
TypeFactory<TypeKind::ROW>::create(std::move(names), std::move(types));
588+
589+
auto children = inputType->getChildren();
590+
return std::make_shared<const dwio::common::TypeWithId>(
591+
convertedType,
592+
std::move(children),
593+
inputType->id(),
594+
inputType->maxId(),
595+
inputType->column());
596+
}
597+
566598
void ReaderBase::scheduleRowGroups(
567599
const std::vector<uint32_t>& rowGroupIds,
568600
int32_t currentGroup,
@@ -637,13 +669,17 @@ ParquetRowReader::ParquetRowReader(
637669
return; // TODO
638670
}
639671
ParquetParams params(pool_, columnReaderStats_, readerBase_->fileMetaData());
640-
auto columnSelector = std::make_shared<ColumnSelector>(
641-
ColumnSelector::apply(options_.getSelector(), readerBase_->schema()));
672+
// ColumnSelector::apply does not work for schema pruning case.
673+
auto columnSelector = options_.getSelector();
642674
columnReader_ = ParquetColumnReader::build(
643-
columnSelector->getSchemaWithId(),
675+
ReaderBase::createTypeWithId(
676+
columnSelector->getSchemaWithId(),
677+
asRowType(options_.getSelector()->getSchemaWithId()->type()),
678+
readerBase_->isFileColumnNamesReadAsLowerCase()),
644679
readerBase_->schemaWithId(), // Id is schema id
645680
params,
646-
*options_.getScanSpec());
681+
*options_.getScanSpec(),
682+
pool_);
647683

648684
filterRowGroups();
649685
if (!rowGroupIds_.empty()) {

velox/dwio/parquet/reader/RepeatedColumnReader.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ PageReader* FOLLY_NULLABLE readLeafRepDefs(
3131
return nullptr;
3232
}
3333
auto pageReader = reader->formatData().as<ParquetData>().reader();
34+
if (pageReader == nullptr) {
35+
return nullptr;
36+
}
3437
pageReader->decodeRepDefs(numTop);
3538
return pageReader;
3639
}
@@ -111,7 +114,8 @@ MapColumnReader::MapColumnReader(
111114
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
112115
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
113116
ParquetParams& params,
114-
common::ScanSpec& scanSpec)
117+
common::ScanSpec& scanSpec,
118+
memory::MemoryPool& pool)
115119
: dwio::common::SelectiveMapColumnReader(
116120
requestedType,
117121
fileType,
@@ -121,9 +125,17 @@ MapColumnReader::MapColumnReader(
121125
auto& keyChildType = requestedType->childAt(0);
122126
auto& elementChildType = requestedType->childAt(1);
123127
keyReader_ = ParquetColumnReader::build(
124-
keyChildType, fileType_->childAt(0), params, *scanSpec.children()[0]);
128+
keyChildType,
129+
fileType_->childAt(0),
130+
params,
131+
*scanSpec.children()[0],
132+
pool);
125133
elementReader_ = ParquetColumnReader::build(
126-
elementChildType, fileType_->childAt(1), params, *scanSpec.children()[1]);
134+
elementChildType,
135+
fileType_->childAt(1),
136+
params,
137+
*scanSpec.children()[1],
138+
pool);
127139
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
128140
->makeLevelInfo(levelInfo_);
129141
children_ = {keyReader_.get(), elementReader_.get()};
@@ -221,15 +233,16 @@ ListColumnReader::ListColumnReader(
221233
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
222234
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
223235
ParquetParams& params,
224-
common::ScanSpec& scanSpec)
236+
common::ScanSpec& scanSpec,
237+
memory::MemoryPool& pool)
225238
: dwio::common::SelectiveListColumnReader(
226239
requestedType,
227240
fileType,
228241
params,
229242
scanSpec) {
230243
auto& childType = requestedType->childAt(0);
231244
child_ = ParquetColumnReader::build(
232-
childType, fileType_->childAt(0), params, *scanSpec.children()[0]);
245+
childType, fileType_->childAt(0), params, *scanSpec.children()[0], pool);
233246
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
234247
->makeLevelInfo(levelInfo_);
235248
children_ = {child_.get()};

velox/dwio/parquet/reader/RepeatedColumnReader.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
5959
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
6060
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
6161
ParquetParams& params,
62-
common::ScanSpec& scanSpec);
62+
common::ScanSpec& scanSpec,
63+
memory::MemoryPool& pool);
6364

6465
void prepareRead(
6566
vector_size_t offset,
@@ -115,7 +116,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
115116
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
116117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
117118
ParquetParams& params,
118-
common::ScanSpec& scanSpec);
119+
common::ScanSpec& scanSpec,
120+
memory::MemoryPool& pool);
119121

120122
void prepareRead(
121123
vector_size_t offset,

velox/dwio/parquet/reader/StructColumnReader.cpp

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,46 @@ StructColumnReader::StructColumnReader(
2323
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
2424
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
2525
ParquetParams& params,
26-
common::ScanSpec& scanSpec)
26+
common::ScanSpec& scanSpec,
27+
memory::MemoryPool& pool)
2728
: SelectiveStructColumnReader(requestedType, fileType, params, scanSpec) {
2829
auto& childSpecs = scanSpec_->stableChildren();
30+
std::vector<int> missingFields;
2931
for (auto i = 0; i < childSpecs.size(); ++i) {
3032
auto childSpec = childSpecs[i];
3133
if (childSpecs[i]->isConstant()) {
3234
continue;
3335
}
34-
auto childFileType = fileType_->childByName(childSpec->fieldName());
35-
auto childRequestedType =
36-
requestedType_->childByName(childSpec->fieldName());
36+
const auto& fieldName = childSpec->fieldName();
37+
if (!fileType_->containsChild(fieldName)) {
38+
missingFields.emplace_back(i);
39+
continue;
40+
}
41+
auto childFileType = fileType_->childByName(fieldName);
42+
auto childRequestedType = requestedType_->childByName(fieldName);
3743
addChild(ParquetColumnReader::build(
38-
childRequestedType, childFileType, params, *childSpec));
44+
childRequestedType, childFileType, params, *childSpec, pool));
3945
childSpecs[i]->setSubscript(children_.size() - 1);
4046
}
47+
48+
if (missingFields.size() > 0) {
49+
// Set the struct as null if all the children fields in the output type are
50+
// missing and the number of child fields is more than one.
51+
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
52+
scanSpec_->setConstantValue(
53+
BaseVector::createNullConstant(requestedType_->type(), 1, &pool));
54+
} else {
55+
// Set null constant for the missing child field of output type.
56+
for (int channel : missingFields) {
57+
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
58+
requestedType_->childByName(childSpecs[channel]->fieldName())
59+
->type(),
60+
1,
61+
&pool));
62+
}
63+
}
64+
}
65+
4166
auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
4267
if (type->parent()) {
4368
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
@@ -47,7 +72,10 @@ StructColumnReader::StructColumnReader(
4772
// this and the child.
4873
auto child = childForRepDefs_;
4974
for (;;) {
50-
assert(child);
75+
if (child == nullptr) {
76+
levelMode_ = LevelMode::kNulls;
77+
break;
78+
}
5179
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
5280
child->fileType().type()->kind() == TypeKind::MAP) {
5381
levelMode_ = LevelMode::kStructOverLists;

velox/dwio/parquet/reader/StructColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader {
2727
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
2828
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
2929
ParquetParams& params,
30-
common::ScanSpec& scanSpec);
30+
common::ScanSpec& scanSpec,
31+
memory::MemoryPool& pool);
3132

3233
void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
3334
override;

0 commit comments

Comments
 (0)