Skip to content

Commit 86d860c

Browse files
rui-momarin-ma
authored andcommitted
Support struct column reading with different schemas (facebookincubator#5962)
1 parent 4c81a22 commit 86d860c

File tree

13 files changed

+216
-33
lines changed

13 files changed

+216
-33
lines changed

velox/connectors/hive/SplitReader.cpp

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,23 @@ std::vector<TypePtr> SplitReader::adaptColumns(
255255
} else {
256256
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
257257
if (!fileTypeIdx.has_value()) {
258-
// Column is missing. Most likely due to schema evolution.
259-
VELOX_CHECK(tableSchema);
260-
childSpec->setConstantValue(BaseVector::createNullConstant(
261-
tableSchema->findChild(fieldName),
262-
1,
263-
connectorQueryCtx_->memoryPool()));
258+
// If field name exists in the user-specified output type, set the
259+
// column as null constant. Related PR:
260+
// https://github.com/facebookincubator/velox/pull/6427.
261+
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
262+
if (outputTypeIdx.has_value()) {
263+
childSpec->setConstantValue(BaseVector::createNullConstant(
264+
readerOutputType_->childAt(outputTypeIdx.value()),
265+
1,
266+
connectorQueryCtx_->memoryPool()));
267+
} else {
268+
// Column is missing. Most likely due to schema evolution.
269+
VELOX_CHECK(tableSchema);
270+
childSpec->setConstantValue(BaseVector::createNullConstant(
271+
tableSchema->findChild(fieldName),
272+
1,
273+
connectorQueryCtx_->memoryPool()));
274+
}
264275
} else {
265276
// Column no longer missing, reset constant value set on the spec.
266277
childSpec->setConstantValue(nullptr);

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ void SelectiveStructColumnReaderBase::read(
157157
}
158158

159159
auto& childSpecs = scanSpec_->children();
160-
VELOX_CHECK(!childSpecs.empty());
161160
for (size_t i = 0; i < childSpecs.size(); ++i) {
162161
auto& childSpec = childSpecs[i];
163162
VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str());
@@ -243,7 +242,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant(
243242
fileType_->type()->kind() !=
244243
TypeKind::MAP && // If this is the case it means this is a flat map,
245244
// so it can't have "missing" fields.
246-
childSpec.channel() >= fileType_->size());
245+
!fileType_->containsChild(childSpec.fieldName()));
247246
}
248247

249248
namespace {
@@ -327,7 +326,6 @@ void setNullField(
327326
void SelectiveStructColumnReaderBase::getValues(
328327
RowSet rows,
329328
VectorPtr* result) {
330-
VELOX_CHECK(!scanSpec_->children().empty());
331329
VELOX_CHECK_NOT_NULL(
332330
*result, "SelectiveStructColumnReaderBase expects a non-null result");
333331
VELOX_CHECK(

velox/dwio/common/TypeWithId.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,22 @@ std::vector<std::shared_ptr<const TypeWithId>> toShared(
3232
}
3333
return result;
3434
}
35+
36+
TypePtr adjustNameAsLowerCase(const TypePtr& type) {
37+
if (auto rowTypePtr = asRowType(type)) {
38+
std::vector<std::string> names;
39+
names.reserve(rowTypePtr->names().size());
40+
std::vector<TypePtr> types = rowTypePtr->children();
41+
for (const auto& name : rowTypePtr->names()) {
42+
std::string childName = name;
43+
folly::toLowerAscii(childName);
44+
names.emplace_back(childName);
45+
}
46+
return TypeFactory<TypeKind::ROW>::create(
47+
std::move(names), std::move(types));
48+
}
49+
return type;
50+
}
3551
} // namespace
3652

3753
TypeWithId::TypeWithId(
@@ -57,6 +73,29 @@ std::unique_ptr<TypeWithId> TypeWithId::create(
5773
return create(root, next, 0);
5874
}
5975

76+
std::unique_ptr<TypeWithId> TypeWithId::duplicate(bool nameAsLowerCase) const {
77+
if (children_.empty()) {
78+
std::vector<std::unique_ptr<TypeWithId>> children;
79+
return std::make_unique<TypeWithId>(
80+
nameAsLowerCase ? adjustNameAsLowerCase(type_) : type_,
81+
std::move(children),
82+
id_,
83+
maxId_,
84+
column_);
85+
}
86+
std::vector<std::unique_ptr<TypeWithId>> children;
87+
children.reserve(children_.size());
88+
for (const auto& child : children_) {
89+
children.emplace_back(std::move(child->duplicate(nameAsLowerCase)));
90+
}
91+
return std::make_unique<TypeWithId>(
92+
nameAsLowerCase ? adjustNameAsLowerCase(type_) : type_,
93+
std::move(children),
94+
id_,
95+
maxId_,
96+
column_);
97+
}
98+
6099
uint32_t TypeWithId::size() const {
61100
return children_.size();
62101
}

velox/dwio/common/TypeWithId.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
3939
const std::shared_ptr<const velox::Type>& root,
4040
uint32_t next = 0);
4141

42+
std::unique_ptr<TypeWithId> duplicate(bool nameAsLowerCase) const;
43+
4244
uint32_t size() const override;
4345

4446
const std::shared_ptr<const velox::Type>& type() const {
@@ -63,6 +65,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
6365

6466
const std::shared_ptr<const TypeWithId>& childAt(uint32_t idx) const override;
6567

68+
bool containsChild(const std::string& name) const {
69+
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
70+
return type_->as<velox::TypeKind::ROW>().containsChild(name);
71+
}
72+
6673
const std::shared_ptr<const TypeWithId>& childByName(
6774
const std::string& name) const {
6875
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
3535
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3636
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3737
ParquetParams& params,
38-
common::ScanSpec& scanSpec) {
38+
common::ScanSpec& scanSpec,
39+
memory::MemoryPool& pool) {
3940
auto colName = scanSpec.fieldName();
4041

4142
switch (fileType->type()->kind()) {
@@ -56,19 +57,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
5657

5758
case TypeKind::ROW:
5859
return std::make_unique<StructColumnReader>(
59-
requestedType, fileType, params, scanSpec);
60+
requestedType, fileType, params, scanSpec, pool);
6061

6162
case TypeKind::VARBINARY:
6263
case TypeKind::VARCHAR:
6364
return std::make_unique<StringColumnReader>(fileType, params, scanSpec);
6465

6566
case TypeKind::ARRAY:
6667
return std::make_unique<ListColumnReader>(
67-
requestedType, fileType, params, scanSpec);
68+
requestedType, fileType, params, scanSpec, pool);
6869

6970
case TypeKind::MAP:
7071
return std::make_unique<MapColumnReader>(
71-
requestedType, fileType, params, scanSpec);
72+
requestedType, fileType, params, scanSpec, pool);
7273

7374
case TypeKind::BOOLEAN:
7475
return std::make_unique<BooleanColumnReader>(

velox/dwio/parquet/reader/ParquetColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class ParquetColumnReader {
4545
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
4646
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
4747
ParquetParams& params,
48-
common::ScanSpec& scanSpec);
48+
common::ScanSpec& scanSpec,
49+
memory::MemoryPool& pool);
4950
};
5051
} // namespace facebook::velox::parquet

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -661,13 +661,17 @@ class ParquetRowReader::Impl {
661661
}
662662
ParquetParams params(
663663
pool_, columnReaderStats_, readerBase_->fileMetaData());
664-
auto columnSelector = std::make_shared<ColumnSelector>(
665-
ColumnSelector::apply(options_.getSelector(), readerBase_->schema()));
664+
auto columnSelector = options_.getSelector()
665+
? options_.getSelector()
666+
: std::make_shared<ColumnSelector>(ColumnSelector::apply(
667+
options_.getSelector(), readerBase_->schema()));
666668
columnReader_ = ParquetColumnReader::build(
667-
columnSelector->getSchemaWithId(),
669+
columnSelector->getSchemaWithId()->duplicate(
670+
readerBase_->isFileColumnNamesReadAsLowerCase()),
668671
readerBase_->schemaWithId(), // Id is schema id
669672
params,
670-
*options_.getScanSpec());
673+
*options_.getScanSpec(),
674+
pool_);
671675

672676
filterRowGroups();
673677
if (!rowGroupIds_.empty()) {

velox/dwio/parquet/reader/RepeatedColumnReader.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ PageReader* readLeafRepDefs(
3333
return nullptr;
3434
}
3535
auto pageReader = reader->formatData().as<ParquetData>().reader();
36+
if (pageReader == nullptr) {
37+
return nullptr;
38+
}
3639
pageReader->decodeRepDefs(numTop);
3740
return pageReader;
3841
}
@@ -113,7 +116,8 @@ MapColumnReader::MapColumnReader(
113116
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
114117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
115118
ParquetParams& params,
116-
common::ScanSpec& scanSpec)
119+
common::ScanSpec& scanSpec,
120+
memory::MemoryPool& pool)
117121
: dwio::common::SelectiveMapColumnReader(
118122
requestedType,
119123
fileType,
@@ -123,9 +127,17 @@ MapColumnReader::MapColumnReader(
123127
auto& keyChildType = requestedType->childAt(0);
124128
auto& elementChildType = requestedType->childAt(1);
125129
keyReader_ = ParquetColumnReader::build(
126-
keyChildType, fileType_->childAt(0), params, *scanSpec.children()[0]);
130+
keyChildType,
131+
fileType_->childAt(0),
132+
params,
133+
*scanSpec.children()[0],
134+
pool);
127135
elementReader_ = ParquetColumnReader::build(
128-
elementChildType, fileType_->childAt(1), params, *scanSpec.children()[1]);
136+
elementChildType,
137+
fileType_->childAt(1),
138+
params,
139+
*scanSpec.children()[1],
140+
pool);
129141
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
130142
->makeLevelInfo(levelInfo_);
131143
children_ = {keyReader_.get(), elementReader_.get()};
@@ -223,15 +235,16 @@ ListColumnReader::ListColumnReader(
223235
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
224236
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
225237
ParquetParams& params,
226-
common::ScanSpec& scanSpec)
238+
common::ScanSpec& scanSpec,
239+
memory::MemoryPool& pool)
227240
: dwio::common::SelectiveListColumnReader(
228241
requestedType,
229242
fileType,
230243
params,
231244
scanSpec) {
232245
auto& childType = requestedType->childAt(0);
233246
child_ = ParquetColumnReader::build(
234-
childType, fileType_->childAt(0), params, *scanSpec.children()[0]);
247+
childType, fileType_->childAt(0), params, *scanSpec.children()[0], pool);
235248
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
236249
->makeLevelInfo(levelInfo_);
237250
children_ = {child_.get()};

velox/dwio/parquet/reader/RepeatedColumnReader.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
5959
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
6060
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
6161
ParquetParams& params,
62-
common::ScanSpec& scanSpec);
62+
common::ScanSpec& scanSpec,
63+
memory::MemoryPool& pool);
6364

6465
void prepareRead(
6566
vector_size_t offset,
@@ -115,7 +116,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
115116
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
116117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
117118
ParquetParams& params,
118-
common::ScanSpec& scanSpec);
119+
common::ScanSpec& scanSpec,
120+
memory::MemoryPool& pool);
119121

120122
void prepareRead(
121123
vector_size_t offset,

velox/dwio/parquet/reader/StructColumnReader.cpp

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,46 @@ StructColumnReader::StructColumnReader(
3030
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3131
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3232
ParquetParams& params,
33-
common::ScanSpec& scanSpec)
33+
common::ScanSpec& scanSpec,
34+
memory::MemoryPool& pool)
3435
: SelectiveStructColumnReader(requestedType, fileType, params, scanSpec) {
3536
auto& childSpecs = scanSpec_->stableChildren();
37+
std::vector<int> missingFields;
3638
for (auto i = 0; i < childSpecs.size(); ++i) {
3739
auto childSpec = childSpecs[i];
3840
if (childSpecs[i]->isConstant()) {
3941
continue;
4042
}
41-
auto childFileType = fileType_->childByName(childSpec->fieldName());
42-
auto childRequestedType =
43-
requestedType_->childByName(childSpec->fieldName());
43+
const auto& fieldName = childSpec->fieldName();
44+
if (!fileType_->containsChild(fieldName)) {
45+
missingFields.emplace_back(i);
46+
continue;
47+
}
48+
auto childFileType = fileType_->childByName(fieldName);
49+
auto childRequestedType = requestedType_->childByName(fieldName);
4450
addChild(ParquetColumnReader::build(
45-
childRequestedType, childFileType, params, *childSpec));
51+
childRequestedType, childFileType, params, *childSpec, pool));
4652
childSpecs[i]->setSubscript(children_.size() - 1);
4753
}
54+
55+
if (missingFields.size() > 0) {
56+
// Set the struct as null if all the children fields in the output type are
57+
// missing and the number of child fields is more than one.
58+
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
59+
scanSpec_->setConstantValue(
60+
BaseVector::createNullConstant(requestedType_->type(), 1, &pool));
61+
} else {
62+
// Set null constant for the missing child field of output type.
63+
for (int channel : missingFields) {
64+
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
65+
requestedType_->childByName(childSpecs[channel]->fieldName())
66+
->type(),
67+
1,
68+
&pool));
69+
}
70+
}
71+
}
72+
4873
auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
4974
if (type->parent()) {
5075
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
@@ -54,7 +79,10 @@ StructColumnReader::StructColumnReader(
5479
// this and the child.
5580
auto child = childForRepDefs_;
5681
for (;;) {
57-
assert(child);
82+
if (child == nullptr) {
83+
levelMode_ = LevelMode::kNulls;
84+
break;
85+
}
5886
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
5987
child->fileType().type()->kind() == TypeKind::MAP) {
6088
levelMode_ = LevelMode::kStructOverLists;
@@ -91,7 +119,6 @@ StructColumnReader::findBestLeaf() {
91119
best = child;
92120
}
93121
}
94-
assert(best);
95122
return best;
96123
}
97124

0 commit comments

Comments
 (0)