Skip to content

Commit 12ca41d

Browse files
committed
Support struct column reading with different schemas
1 parent ccb464b commit 12ca41d

File tree

12 files changed

+206
-32
lines changed

12 files changed

+206
-32
lines changed

velox/connectors/hive/SplitReader.cpp

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,12 +243,23 @@ std::vector<TypePtr> SplitReader::adaptColumns(
243243
} else {
244244
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
245245
if (!fileTypeIdx.has_value()) {
246-
// Column is missing. Most likely due to schema evolution.
247-
VELOX_CHECK(tableSchema);
248-
childSpec->setConstantValue(BaseVector::createNullConstant(
249-
tableSchema->findChild(fieldName),
250-
1,
251-
connectorQueryCtx_->memoryPool()));
246+
// If field name exists in the user-specified output type, set the
247+
// column as null constant. Related PR:
248+
// https://github.com/facebookincubator/velox/pull/6427.
249+
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
250+
if (outputTypeIdx.has_value()) {
251+
childSpec->setConstantValue(BaseVector::createNullConstant(
252+
readerOutputType_->childAt(outputTypeIdx.value()),
253+
1,
254+
connectorQueryCtx_->memoryPool()));
255+
} else {
256+
// Column is missing. Most likely due to schema evolution.
257+
VELOX_CHECK(tableSchema);
258+
childSpec->setConstantValue(BaseVector::createNullConstant(
259+
tableSchema->findChild(fieldName),
260+
1,
261+
connectorQueryCtx_->memoryPool()));
262+
}
252263
} else {
253264
// Column no longer missing, reset constant value set on the spec.
254265
childSpec->setConstantValue(nullptr);

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ void SelectiveStructColumnReaderBase::read(
157157
}
158158

159159
auto& childSpecs = scanSpec_->children();
160-
VELOX_CHECK(!childSpecs.empty());
161160
for (size_t i = 0; i < childSpecs.size(); ++i) {
162161
auto& childSpec = childSpecs[i];
163162
VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str());
@@ -243,7 +242,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant(
243242
fileType_->type()->kind() !=
244243
TypeKind::MAP && // If this is the case it means this is a flat map,
245244
// so it can't have "missing" fields.
246-
childSpec.channel() >= fileType_->size());
245+
!fileType_->containsChild(childSpec.fieldName()));
247246
}
248247

249248
namespace {
@@ -327,7 +326,6 @@ void setNullField(
327326
void SelectiveStructColumnReaderBase::getValues(
328327
RowSet rows,
329328
VectorPtr* result) {
330-
VELOX_CHECK(!scanSpec_->children().empty());
331329
VELOX_CHECK_NOT_NULL(
332330
*result, "SelectiveStructColumnReaderBase expects a non-null result");
333331
VELOX_CHECK(

velox/dwio/common/TypeWithId.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
5959

6060
const std::shared_ptr<const TypeWithId>& childAt(uint32_t idx) const override;
6161

62+
bool containsChild(const std::string& name) const {
63+
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
64+
return type_->as<velox::TypeKind::ROW>().containsChild(name);
65+
}
66+
6267
const std::shared_ptr<const TypeWithId>& childByName(
6368
const std::string& name) const {
6469
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
3535
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3636
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3737
ParquetParams& params,
38-
common::ScanSpec& scanSpec) {
38+
common::ScanSpec& scanSpec,
39+
memory::MemoryPool& pool) {
3940
auto colName = scanSpec.fieldName();
4041

4142
switch (fileType->type()->kind()) {
@@ -56,19 +57,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
5657

5758
case TypeKind::ROW:
5859
return std::make_unique<StructColumnReader>(
59-
requestedType, fileType, params, scanSpec);
60+
requestedType, fileType, params, scanSpec, pool);
6061

6162
case TypeKind::VARBINARY:
6263
case TypeKind::VARCHAR:
6364
return std::make_unique<StringColumnReader>(fileType, params, scanSpec);
6465

6566
case TypeKind::ARRAY:
6667
return std::make_unique<ListColumnReader>(
67-
requestedType, fileType, params, scanSpec);
68+
requestedType, fileType, params, scanSpec, pool);
6869

6970
case TypeKind::MAP:
7071
return std::make_unique<MapColumnReader>(
71-
requestedType, fileType, params, scanSpec);
72+
requestedType, fileType, params, scanSpec, pool);
7273

7374
case TypeKind::BOOLEAN:
7475
return std::make_unique<BooleanColumnReader>(

velox/dwio/parquet/reader/ParquetColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class ParquetColumnReader {
4545
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
4646
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
4747
ParquetParams& params,
48-
common::ScanSpec& scanSpec);
48+
common::ScanSpec& scanSpec,
49+
memory::MemoryPool& pool);
4950
};
5051
} // namespace facebook::velox::parquet

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ class ReaderBase {
8484
/// the data still exists in the buffered inputs.
8585
bool isRowGroupBuffered(int32_t rowGroupIndex) const;
8686

87+
static std::shared_ptr<const dwio::common::TypeWithId> createTypeWithId(
88+
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
89+
const RowTypePtr& rowTypePtr,
90+
bool fileColumnNamesReadAsLowerCase);
91+
8792
private:
8893
// Reads and parses file footer.
8994
void loadFileMetaData();
@@ -589,6 +594,33 @@ std::shared_ptr<const RowType> ReaderBase::createRowType(
589594
std::move(childNames), std::move(childTypes));
590595
}
591596

597+
std::shared_ptr<const dwio::common::TypeWithId> ReaderBase::createTypeWithId(
598+
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
599+
const RowTypePtr& rowTypePtr,
600+
bool fileColumnNamesReadAsLowerCase) {
601+
if (!fileColumnNamesReadAsLowerCase) {
602+
return inputType;
603+
}
604+
std::vector<std::string> names;
605+
names.reserve(rowTypePtr->names().size());
606+
std::vector<TypePtr> types = rowTypePtr->children();
607+
for (const auto& name : rowTypePtr->names()) {
608+
std::string childName = name;
609+
folly::toLowerAscii(childName);
610+
names.emplace_back(childName);
611+
}
612+
auto convertedType =
613+
TypeFactory<TypeKind::ROW>::create(std::move(names), std::move(types));
614+
615+
auto children = inputType->getChildren();
616+
return std::make_shared<const dwio::common::TypeWithId>(
617+
convertedType,
618+
std::move(children),
619+
inputType->id(),
620+
inputType->maxId(),
621+
inputType->column());
622+
}
623+
592624
void ReaderBase::scheduleRowGroups(
593625
const std::vector<uint32_t>& rowGroupIds,
594626
int32_t currentGroup,
@@ -662,13 +694,19 @@ class ParquetRowReader::Impl {
662694
}
663695
ParquetParams params(
664696
pool_, columnReaderStats_, readerBase_->fileMetaData());
665-
auto columnSelector = std::make_shared<ColumnSelector>(
666-
ColumnSelector::apply(options_.getSelector(), readerBase_->schema()));
697+
auto columnSelector = options_.getSelector()
698+
? options_.getSelector()
699+
: std::make_shared<ColumnSelector>(ColumnSelector::apply(
700+
options_.getSelector(), readerBase_->schema()));
667701
columnReader_ = ParquetColumnReader::build(
668-
columnSelector->getSchemaWithId(),
702+
ReaderBase::createTypeWithId(
703+
columnSelector->getSchemaWithId(),
704+
asRowType(columnSelector->getSchemaWithId()->type()),
705+
readerBase_->isFileColumnNamesReadAsLowerCase()),
669706
readerBase_->schemaWithId(), // Id is schema id
670707
params,
671-
*options_.getScanSpec());
708+
*options_.getScanSpec(),
709+
pool_);
672710

673711
filterRowGroups();
674712
if (!rowGroupIds_.empty()) {

velox/dwio/parquet/reader/RepeatedColumnReader.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ PageReader* FOLLY_NULLABLE readLeafRepDefs(
3333
return nullptr;
3434
}
3535
auto pageReader = reader->formatData().as<ParquetData>().reader();
36+
if (pageReader == nullptr) {
37+
return nullptr;
38+
}
3639
pageReader->decodeRepDefs(numTop);
3740
return pageReader;
3841
}
@@ -113,7 +116,8 @@ MapColumnReader::MapColumnReader(
113116
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
114117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
115118
ParquetParams& params,
116-
common::ScanSpec& scanSpec)
119+
common::ScanSpec& scanSpec,
120+
memory::MemoryPool& pool)
117121
: dwio::common::SelectiveMapColumnReader(
118122
requestedType,
119123
fileType,
@@ -123,9 +127,17 @@ MapColumnReader::MapColumnReader(
123127
auto& keyChildType = requestedType->childAt(0);
124128
auto& elementChildType = requestedType->childAt(1);
125129
keyReader_ = ParquetColumnReader::build(
126-
keyChildType, fileType_->childAt(0), params, *scanSpec.children()[0]);
130+
keyChildType,
131+
fileType_->childAt(0),
132+
params,
133+
*scanSpec.children()[0],
134+
pool);
127135
elementReader_ = ParquetColumnReader::build(
128-
elementChildType, fileType_->childAt(1), params, *scanSpec.children()[1]);
136+
elementChildType,
137+
fileType_->childAt(1),
138+
params,
139+
*scanSpec.children()[1],
140+
pool);
129141
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
130142
->makeLevelInfo(levelInfo_);
131143
children_ = {keyReader_.get(), elementReader_.get()};
@@ -223,15 +235,16 @@ ListColumnReader::ListColumnReader(
223235
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
224236
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
225237
ParquetParams& params,
226-
common::ScanSpec& scanSpec)
238+
common::ScanSpec& scanSpec,
239+
memory::MemoryPool& pool)
227240
: dwio::common::SelectiveListColumnReader(
228241
requestedType,
229242
fileType,
230243
params,
231244
scanSpec) {
232245
auto& childType = requestedType->childAt(0);
233246
child_ = ParquetColumnReader::build(
234-
childType, fileType_->childAt(0), params, *scanSpec.children()[0]);
247+
childType, fileType_->childAt(0), params, *scanSpec.children()[0], pool);
235248
reinterpret_cast<const ParquetTypeWithId*>(fileType.get())
236249
->makeLevelInfo(levelInfo_);
237250
children_ = {child_.get()};

velox/dwio/parquet/reader/RepeatedColumnReader.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
5959
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
6060
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
6161
ParquetParams& params,
62-
common::ScanSpec& scanSpec);
62+
common::ScanSpec& scanSpec,
63+
memory::MemoryPool& pool);
6364

6465
void prepareRead(
6566
vector_size_t offset,
@@ -115,7 +116,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
115116
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
116117
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
117118
ParquetParams& params,
118-
common::ScanSpec& scanSpec);
119+
common::ScanSpec& scanSpec,
120+
memory::MemoryPool& pool);
119121

120122
void prepareRead(
121123
vector_size_t offset,

velox/dwio/parquet/reader/StructColumnReader.cpp

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,46 @@ StructColumnReader::StructColumnReader(
3030
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3131
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3232
ParquetParams& params,
33-
common::ScanSpec& scanSpec)
33+
common::ScanSpec& scanSpec,
34+
memory::MemoryPool& pool)
3435
: SelectiveStructColumnReader(requestedType, fileType, params, scanSpec) {
3536
auto& childSpecs = scanSpec_->stableChildren();
37+
std::vector<int> missingFields;
3638
for (auto i = 0; i < childSpecs.size(); ++i) {
3739
auto childSpec = childSpecs[i];
3840
if (childSpecs[i]->isConstant()) {
3941
continue;
4042
}
41-
auto childFileType = fileType_->childByName(childSpec->fieldName());
42-
auto childRequestedType =
43-
requestedType_->childByName(childSpec->fieldName());
43+
const auto& fieldName = childSpec->fieldName();
44+
if (!fileType_->containsChild(fieldName)) {
45+
missingFields.emplace_back(i);
46+
continue;
47+
}
48+
auto childFileType = fileType_->childByName(fieldName);
49+
auto childRequestedType = requestedType_->childByName(fieldName);
4450
addChild(ParquetColumnReader::build(
45-
childRequestedType, childFileType, params, *childSpec));
51+
childRequestedType, childFileType, params, *childSpec, pool));
4652
childSpecs[i]->setSubscript(children_.size() - 1);
4753
}
54+
55+
if (missingFields.size() > 0) {
56+
// Set the struct as null if all the children fields in the output type are
57+
// missing and the number of child fields is more than one.
58+
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
59+
scanSpec_->setConstantValue(
60+
BaseVector::createNullConstant(requestedType_->type(), 1, &pool));
61+
} else {
62+
// Set null constant for the missing child field of output type.
63+
for (int channel : missingFields) {
64+
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
65+
requestedType_->childByName(childSpecs[channel]->fieldName())
66+
->type(),
67+
1,
68+
&pool));
69+
}
70+
}
71+
}
72+
4873
auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
4974
if (type->parent()) {
5075
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
@@ -54,7 +79,10 @@ StructColumnReader::StructColumnReader(
5479
// this and the child.
5580
auto child = childForRepDefs_;
5681
for (;;) {
57-
assert(child);
82+
if (child == nullptr) {
83+
levelMode_ = LevelMode::kNulls;
84+
break;
85+
}
5886
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
5987
child->fileType().type()->kind() == TypeKind::MAP) {
6088
levelMode_ = LevelMode::kStructOverLists;

velox/dwio/parquet/reader/StructColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader {
3535
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
3636
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3737
ParquetParams& params,
38-
common::ScanSpec& scanSpec);
38+
common::ScanSpec& scanSpec,
39+
memory::MemoryPool& pool);
3940

4041
void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
4142
override;

0 commit comments

Comments
 (0)