Skip to content

Commit e7eab9e

Browse files
committed
Support struct column reading with different schemas
1 parent f64795f commit e7eab9e

File tree

12 files changed

+209
-33
lines changed

12 files changed

+209
-33
lines changed

velox/connectors/hive/SplitReader.cpp

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,23 @@ std::vector<TypePtr> SplitReader::adaptColumns(
255255
} else {
256256
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
257257
if (!fileTypeIdx.has_value()) {
258-
// Column is missing. Most likely due to schema evolution.
259-
VELOX_CHECK(tableSchema);
260-
childSpec->setConstantValue(BaseVector::createNullConstant(
261-
tableSchema->findChild(fieldName),
262-
1,
263-
connectorQueryCtx_->memoryPool()));
258+
// If field name exists in the user-specified output type, set the
259+
// column as null constant. Related PR:
260+
// https://github.com/facebookincubator/velox/pull/6427.
261+
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
262+
if (outputTypeIdx.has_value()) {
263+
childSpec->setConstantValue(BaseVector::createNullConstant(
264+
readerOutputType_->childAt(outputTypeIdx.value()),
265+
1,
266+
connectorQueryCtx_->memoryPool()));
267+
} else {
268+
// Column is missing. Most likely due to schema evolution.
269+
VELOX_CHECK(tableSchema);
270+
childSpec->setConstantValue(BaseVector::createNullConstant(
271+
tableSchema->findChild(fieldName),
272+
1,
273+
connectorQueryCtx_->memoryPool()));
274+
}
264275
} else {
265276
// Column no longer missing, reset constant value set on the spec.
266277
childSpec->setConstantValue(nullptr);

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ void SelectiveStructColumnReaderBase::read(
157157
}
158158

159159
auto& childSpecs = scanSpec_->children();
160-
VELOX_CHECK(!childSpecs.empty());
161160
for (size_t i = 0; i < childSpecs.size(); ++i) {
162161
auto& childSpec = childSpecs[i];
163162
VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str());
@@ -243,7 +242,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant(
243242
fileType_->type()->kind() !=
244243
TypeKind::MAP && // If this is the case it means this is a flat map,
245244
// so it can't have "missing" fields.
246-
childSpec.channel() >= fileType_->size());
245+
!fileType_->containsChild(childSpec.fieldName()));
247246
}
248247

249248
namespace {
@@ -327,7 +326,6 @@ void setNullField(
327326
void SelectiveStructColumnReaderBase::getValues(
328327
RowSet rows,
329328
VectorPtr* result) {
330-
VELOX_CHECK(!scanSpec_->children().empty());
331329
VELOX_CHECK_NOT_NULL(
332330
*result, "SelectiveStructColumnReaderBase expects a non-null result");
333331
VELOX_CHECK(

velox/dwio/common/TypeWithId.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
6363

6464
const std::shared_ptr<const TypeWithId>& childAt(uint32_t idx) const override;
6565

66+
bool containsChild(const std::string& name) const {
67+
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
68+
return type_->as<velox::TypeKind::ROW>().containsChild(name);
69+
}
70+
6671
const std::shared_ptr<const TypeWithId>& childByName(
6772
const std::string& name) const {
6873
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
3535
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3636
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3737
ParquetParams& params,
38-
common::ScanSpec& scanSpec) {
38+
common::ScanSpec& scanSpec,
39+
memory::MemoryPool& pool) {
3940
auto colName = scanSpec.fieldName();
4041

4142
switch (fileType->type()->kind()) {
@@ -56,19 +57,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
5657

5758
case TypeKind::ROW:
5859
return std::make_unique<StructColumnReader>(
59-
requestedType, fileType, params, scanSpec);
60+
requestedType, fileType, params, scanSpec, pool);
6061

6162
case TypeKind::VARBINARY:
6263
case TypeKind::VARCHAR:
6364
return std::make_unique<StringColumnReader>(fileType, params, scanSpec);
6465

6566
case TypeKind::ARRAY:
6667
return std::make_unique<ListColumnReader>(
67-
requestedType, fileType, params, scanSpec);
68+
requestedType, fileType, params, scanSpec, pool);
6869

6970
case TypeKind::MAP:
7071
return std::make_unique<MapColumnReader>(
71-
requestedType, fileType, params, scanSpec);
72+
requestedType, fileType, params, scanSpec, pool);
7273

7374
case TypeKind::BOOLEAN:
7475
return std::make_unique<BooleanColumnReader>(

velox/dwio/parquet/reader/ParquetColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class ParquetColumnReader {
4545
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
4646
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
4747
ParquetParams& params,
48-
common::ScanSpec& scanSpec);
48+
common::ScanSpec& scanSpec,
49+
memory::MemoryPool& pool);
4950
};
5051
} // namespace facebook::velox::parquet

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ class ReaderBase {
8484
/// the data still exists in the buffered inputs.
8585
bool isRowGroupBuffered(int32_t rowGroupIndex) const;
8686

87+
static std::shared_ptr<const dwio::common::TypeWithId> createTypeWithId(
88+
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
89+
const RowTypePtr& rowTypePtr,
90+
bool fileColumnNamesReadAsLowerCase);
91+
8792
private:
8893
// Reads and parses file footer.
8994
void loadFileMetaData();
@@ -588,6 +593,33 @@ std::shared_ptr<const RowType> ReaderBase::createRowType(
588593
std::move(childNames), std::move(childTypes));
589594
}
590595

596+
std::shared_ptr<const dwio::common::TypeWithId> ReaderBase::createTypeWithId(
597+
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
598+
const RowTypePtr& rowTypePtr,
599+
bool fileColumnNamesReadAsLowerCase) {
600+
if (!fileColumnNamesReadAsLowerCase) {
601+
return inputType;
602+
}
603+
std::vector<std::string> names;
604+
names.reserve(rowTypePtr->names().size());
605+
std::vector<TypePtr> types = rowTypePtr->children();
606+
for (const auto& name : rowTypePtr->names()) {
607+
std::string childName = name;
608+
folly::toLowerAscii(childName);
609+
names.emplace_back(childName);
610+
}
611+
auto convertedType =
612+
TypeFactory<TypeKind::ROW>::create(std::move(names), std::move(types));
613+
614+
auto children = inputType->getChildren();
615+
return std::make_shared<const dwio::common::TypeWithId>(
616+
convertedType,
617+
std::move(children),
618+
inputType->id(),
619+
inputType->maxId(),
620+
inputType->column());
621+
}
622+
591623
void ReaderBase::scheduleRowGroups(
592624
const std::vector<uint32_t>& rowGroupIds,
593625
int32_t currentGroup,
@@ -661,13 +693,19 @@ class ParquetRowReader::Impl {
661693
}
662694
ParquetParams params(
663695
pool_, columnReaderStats_, readerBase_->fileMetaData());
664-
auto columnSelector = std::make_shared<ColumnSelector>(
665-
ColumnSelector::apply(options_.getSelector(), readerBase_->schema()));
696+
auto columnSelector = options_.getSelector()
697+
? options_.getSelector()
698+
: std::make_shared<ColumnSelector>(ColumnSelector::apply(
699+
options_.getSelector(), readerBase_->schema()));
666700
columnReader_ = ParquetColumnReader::build(
667-
columnSelector->getSchemaWithId(),
701+
ReaderBase::createTypeWithId(
702+
columnSelector->getSchemaWithId(),
703+
asRowType(columnSelector->getSchemaWithId()->type()),
704+
readerBase_->isFileColumnNamesReadAsLowerCase()),
668705
readerBase_->schemaWithId(), // Id is schema id
669706
params,
670-
*options_.getScanSpec());
707+
*options_.getScanSpec(),
708+
pool_);
671709

672710
filterRowGroups();
673711
if (!rowGroupIds_.empty()) {

velox/dwio/parquet/reader/RepeatedColumnReader.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ PageReader* readLeafRepDefs(
3333
return nullptr;
3434
}
3535
auto pageReader = reader->formatData().as<ParquetData>().reader();
36+
if (pageReader == nullptr) {
37+
return nullptr;
38+
}
3639
pageReader->decodeRepDefs(numTop);
3740
return pageReader;
3841
}
@@ -113,7 +116,8 @@ MapColumnReader::MapColumnReader(
113116
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
114117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
115118
ParquetParams& params,
116-
common::ScanSpec& scanSpec)
119+
common::ScanSpec& scanSpec,
120+
memory::MemoryPool& pool)
117121
: dwio::common::SelectiveMapColumnReader(
118122
requestedType,
119123
fileType,
@@ -123,9 +127,17 @@ MapColumnReader::MapColumnReader(
123127
auto& keyChildType = requestedType->childAt(0);
124128
auto& elementChildType = requestedType->childAt(1);
125129
keyReader_ = ParquetColumnReader::build(
126-
keyChildType, fileType_->childAt(0), params, *scanSpec.children()[0]);
130+
keyChildType,
131+
fileType_->childAt(0),
132+
params,
133+
*scanSpec.children()[0],
134+
pool);
127135
elementReader_ = ParquetColumnReader::build(
128-
elementChildType, fileType_->childAt(1), params, *scanSpec.children()[1]);
136+
elementChildType,
137+
fileType_->childAt(1),
138+
params,
139+
*scanSpec.children()[1],
140+
pool);
129141
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
130142
->makeLevelInfo(levelInfo_);
131143
children_ = {keyReader_.get(), elementReader_.get()};
@@ -223,15 +235,16 @@ ListColumnReader::ListColumnReader(
223235
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
224236
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
225237
ParquetParams& params,
226-
common::ScanSpec& scanSpec)
238+
common::ScanSpec& scanSpec,
239+
memory::MemoryPool& pool)
227240
: dwio::common::SelectiveListColumnReader(
228241
requestedType,
229242
fileType,
230243
params,
231244
scanSpec) {
232245
auto& childType = requestedType->childAt(0);
233246
child_ = ParquetColumnReader::build(
234-
childType, fileType_->childAt(0), params, *scanSpec.children()[0]);
247+
childType, fileType_->childAt(0), params, *scanSpec.children()[0], pool);
235248
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
236249
->makeLevelInfo(levelInfo_);
237250
children_ = {child_.get()};

velox/dwio/parquet/reader/RepeatedColumnReader.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
5959
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
6060
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
6161
ParquetParams& params,
62-
common::ScanSpec& scanSpec);
62+
common::ScanSpec& scanSpec,
63+
memory::MemoryPool& pool);
6364

6465
void prepareRead(
6566
vector_size_t offset,
@@ -115,7 +116,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
115116
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
116117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
117118
ParquetParams& params,
118-
common::ScanSpec& scanSpec);
119+
common::ScanSpec& scanSpec,
120+
memory::MemoryPool& pool);
119121

120122
void prepareRead(
121123
vector_size_t offset,

velox/dwio/parquet/reader/StructColumnReader.cpp

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,46 @@ StructColumnReader::StructColumnReader(
3030
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3131
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3232
ParquetParams& params,
33-
common::ScanSpec& scanSpec)
33+
common::ScanSpec& scanSpec,
34+
memory::MemoryPool& pool)
3435
: SelectiveStructColumnReader(requestedType, fileType, params, scanSpec) {
3536
auto& childSpecs = scanSpec_->stableChildren();
37+
std::vector<int> missingFields;
3638
for (auto i = 0; i < childSpecs.size(); ++i) {
3739
auto childSpec = childSpecs[i];
3840
if (childSpecs[i]->isConstant()) {
3941
continue;
4042
}
41-
auto childFileType = fileType_->childByName(childSpec->fieldName());
42-
auto childRequestedType =
43-
requestedType_->childByName(childSpec->fieldName());
43+
const auto& fieldName = childSpec->fieldName();
44+
if (!fileType_->containsChild(fieldName)) {
45+
missingFields.emplace_back(i);
46+
continue;
47+
}
48+
auto childFileType = fileType_->childByName(fieldName);
49+
auto childRequestedType = requestedType_->childByName(fieldName);
4450
addChild(ParquetColumnReader::build(
45-
childRequestedType, childFileType, params, *childSpec));
51+
childRequestedType, childFileType, params, *childSpec, pool));
4652
childSpecs[i]->setSubscript(children_.size() - 1);
4753
}
54+
55+
if (missingFields.size() > 0) {
56+
// Set the struct as null if all the children fields in the output type are
57+
// missing and the number of child fields is more than one.
58+
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
59+
scanSpec_->setConstantValue(
60+
BaseVector::createNullConstant(requestedType_->type(), 1, &pool));
61+
} else {
62+
// Set null constant for the missing child field of output type.
63+
for (int channel : missingFields) {
64+
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
65+
requestedType_->childByName(childSpecs[channel]->fieldName())
66+
->type(),
67+
1,
68+
&pool));
69+
}
70+
}
71+
}
72+
4873
auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
4974
if (type->parent()) {
5075
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
@@ -54,7 +79,10 @@ StructColumnReader::StructColumnReader(
5479
// this and the child.
5580
auto child = childForRepDefs_;
5681
for (;;) {
57-
assert(child);
82+
if (child == nullptr) {
83+
levelMode_ = LevelMode::kNulls;
84+
break;
85+
}
5886
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
5987
child->fileType().type()->kind() == TypeKind::MAP) {
6088
levelMode_ = LevelMode::kStructOverLists;
@@ -91,7 +119,6 @@ StructColumnReader::findBestLeaf() {
91119
best = child;
92120
}
93121
}
94-
assert(best);
95122
return best;
96123
}
97124

velox/dwio/parquet/reader/StructColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader {
3535
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3636
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3737
ParquetParams& params,
38-
common::ScanSpec& scanSpec);
38+
common::ScanSpec& scanSpec,
39+
memory::MemoryPool& pool);
3940

4041
void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
4142
override;

0 commit comments

Comments
 (0)