Skip to content

Commit c03152b

Browse files
committed
Support struct reading with different schemas
1 parent 7614668 commit c03152b

File tree

11 files changed

+223
-35
lines changed

11 files changed

+223
-35
lines changed

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,7 @@ void SelectiveStructColumnReaderBase::read(
128128
activeRows = outputRows_;
129129
}
130130

131-
auto& childSpecs = scanSpec_->children();
132-
VELOX_CHECK(!childSpecs.empty());
131+
auto& childSpecs = scanSpec_->stableChildren();
133132
for (size_t i = 0; i < childSpecs.size(); ++i) {
134133
auto& childSpec = childSpecs[i];
135134
if (isChildConstant(*childSpec)) {
@@ -214,7 +213,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant(
214213
fileType_->type()->kind() !=
215214
TypeKind::MAP && // If this is the case it means this is a flat map,
216215
// so it can't have "missing" fields.
217-
childSpec.channel() >= fileType_->size());
216+
!fileType_->containsChild(childSpec.fieldName()));
218217
}
219218

220219
namespace {
@@ -294,7 +293,6 @@ void setNullField(vector_size_t size, VectorPtr& field) {
294293
void SelectiveStructColumnReaderBase::getValues(
295294
RowSet rows,
296295
VectorPtr* result) {
297-
VELOX_CHECK(!scanSpec_->children().empty());
298296
VELOX_CHECK(
299297
*result != nullptr,
300298
"SelectiveStructColumnReaderBase expects a non-null result");
@@ -329,7 +327,7 @@ void SelectiveStructColumnReaderBase::getValues(
329327
resultRow->clearNulls(0, rows.size());
330328
}
331329
bool lazyPrepared = false;
332-
for (auto& childSpec : scanSpec_->children()) {
330+
for (auto& childSpec : scanSpec_->stableChildren()) {
333331
if (!childSpec->projectOut()) {
334332
continue;
335333
}

velox/dwio/common/TypeWithId.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
5959

6060
const std::shared_ptr<const TypeWithId>& childAt(uint32_t idx) const override;
6161

62+
bool containsChild(const std::string& name) const {
63+
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
64+
return type_->as<velox::TypeKind::ROW>().containsChild(name);
65+
}
66+
6267
const std::shared_ptr<const TypeWithId>& childByName(
6368
const std::string& name) const {
6469
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ namespace facebook::velox::parquet {
3636
std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
3737
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
3838
ParquetParams& params,
39-
common::ScanSpec& scanSpec) {
39+
common::ScanSpec& scanSpec,
40+
const TypePtr& outputType,
41+
memory::MemoryPool& pool) {
4042
auto colName = scanSpec.fieldName();
4143

4244
switch (dataType->type()->kind()) {
@@ -56,17 +58,34 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
5658
dataType->type(), dataType, params, scanSpec);
5759

5860
case TypeKind::ROW:
59-
return std::make_unique<StructColumnReader>(dataType, params, scanSpec);
61+
return std::make_unique<StructColumnReader>(
62+
dataType,
63+
params,
64+
scanSpec,
65+
outputType ? asRowType(outputType) : nullptr,
66+
pool);
6067

6168
case TypeKind::VARBINARY:
6269
case TypeKind::VARCHAR:
6370
return std::make_unique<StringColumnReader>(dataType, params, scanSpec);
6471

6572
case TypeKind::ARRAY:
66-
return std::make_unique<ListColumnReader>(dataType, params, scanSpec);
73+
return std::make_unique<ListColumnReader>(
74+
dataType,
75+
params,
76+
scanSpec,
77+
outputType ? std::dynamic_pointer_cast<const ArrayType>(outputType)
78+
: nullptr,
79+
pool);
6780

6881
case TypeKind::MAP:
69-
return std::make_unique<MapColumnReader>(dataType, params, scanSpec);
82+
return std::make_unique<MapColumnReader>(
83+
dataType,
84+
params,
85+
scanSpec,
86+
outputType ? std::dynamic_pointer_cast<const MapType>(outputType)
87+
: nullptr,
88+
pool);
7089

7190
case TypeKind::BOOLEAN:
7291
return std::make_unique<BooleanColumnReader>(dataType, params, scanSpec);

velox/dwio/parquet/reader/ParquetColumnReader.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class ParquetColumnReader {
4444
static std::unique_ptr<dwio::common::SelectiveColumnReader> build(
4545
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
4646
ParquetParams& params,
47-
common::ScanSpec& scanSpec);
47+
common::ScanSpec& scanSpec,
48+
const TypePtr& outputType,
49+
memory::MemoryPool& pool);
4850
};
4951
} // namespace facebook::velox::parquet

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,16 @@ class ReaderBase {
8585
int32_t rowGroupIndex,
8686
const dwio::common::TypeWithId& type) const;
8787

88+
/// @brief Convert the names of row type to lower case when
89+
/// fileColumnNamesReadAsLowerCase is true.
90+
/// @param rowTypePtr the input row type.
91+
/// @param fileColumnNamesReadAsLowerCase whether to convert names into lower
92+
/// case.
93+
/// @return row type with names converted.
94+
static std::shared_ptr<const RowType> convertRowTypeNames(
95+
const RowTypePtr& rowTypePtr,
96+
bool fileColumnNamesReadAsLowerCase);
97+
8898
private:
8999
// Reads and parses file footer.
90100
void loadFileMetaData();
@@ -547,22 +557,39 @@ TypePtr ReaderBase::convertType(
547557
}
548558
}
549559

560+
std::shared_ptr<const RowType> ReaderBase::convertRowTypeNames(
561+
const RowTypePtr& rowTypePtr,
562+
bool fileColumnNamesReadAsLowerCase) {
563+
if (!fileColumnNamesReadAsLowerCase) {
564+
return rowTypePtr;
565+
}
566+
std::vector<std::string> names;
567+
names.reserve(rowTypePtr->names().size());
568+
std::vector<TypePtr> types = rowTypePtr->children();
569+
for (const auto& name : rowTypePtr->names()) {
570+
std::string childName = name;
571+
folly::toLowerAscii(childName);
572+
names.emplace_back(childName);
573+
}
574+
return TypeFactory<TypeKind::ROW>::create(std::move(names), std::move(types));
575+
}
576+
550577
std::shared_ptr<const RowType> ReaderBase::createRowType(
551578
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>> children,
552579
bool fileColumnNamesReadAsLowerCase) {
553580
std::vector<std::string> childNames;
581+
childNames.reserve(children.size());
554582
std::vector<TypePtr> childTypes;
555-
for (auto& child : children) {
556-
auto childName =
557-
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_;
558-
if (fileColumnNamesReadAsLowerCase) {
559-
folly::toLowerAscii(childName);
560-
}
561-
childNames.push_back(std::move(childName));
562-
childTypes.push_back(child->type());
583+
childTypes.reserve(children.size());
584+
for (const auto& child : children) {
585+
childNames.emplace_back(
586+
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_);
587+
childTypes.emplace_back(child->type());
563588
}
564-
return TypeFactory<TypeKind::ROW>::create(
565-
std::move(childNames), std::move(childTypes));
589+
return convertRowTypeNames(
590+
TypeFactory<TypeKind::ROW>::create(
591+
std::move(childNames), std::move(childTypes)),
592+
fileColumnNamesReadAsLowerCase);
566593
}
567594

568595
void ReaderBase::scheduleRowGroups(
@@ -640,7 +667,11 @@ ParquetRowReader::ParquetRowReader(
640667
columnReader_ = ParquetColumnReader::build(
641668
readerBase_->schemaWithId(), // Id is schema id
642669
params,
643-
*options_.getScanSpec());
670+
*options_.getScanSpec(),
671+
ReaderBase::convertRowTypeNames(
672+
asRowType(options_.getSelector()->getSchemaWithId()->type()),
673+
readerBase_->isFileColumnNamesReadAsLowerCase()),
674+
pool_);
644675

645676
filterRowGroups();
646677
if (!rowGroupIds_.empty()) {

velox/dwio/parquet/reader/RepeatedColumnReader.cpp

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ PageReader* FOLLY_NULLABLE readLeafRepDefs(
3131
return nullptr;
3232
}
3333
auto pageReader = reader->formatData().as<ParquetData>().reader();
34+
if (pageReader == nullptr) {
35+
return nullptr;
36+
}
3437
pageReader->decodeRepDefs(numTop);
3538
return pageReader;
3639
}
@@ -110,18 +113,28 @@ void ensureRepDefs(
110113
MapColumnReader::MapColumnReader(
111114
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
112115
ParquetParams& params,
113-
common::ScanSpec& scanSpec)
116+
common::ScanSpec& scanSpec,
117+
const std::shared_ptr<const MapType>& outputType,
118+
memory::MemoryPool& pool)
114119
: dwio::common::SelectiveMapColumnReader(
115120
requestedType,
116121
requestedType,
117122
params,
118123
scanSpec) {
119124
auto& keyChildType = requestedType->childAt(0);
120125
auto& elementChildType = requestedType->childAt(1);
121-
keyReader_ =
122-
ParquetColumnReader::build(keyChildType, params, *scanSpec.children()[0]);
126+
keyReader_ = ParquetColumnReader::build(
127+
keyChildType,
128+
params,
129+
*scanSpec.children()[0],
130+
outputType ? outputType->keyType() : nullptr,
131+
pool);
123132
elementReader_ = ParquetColumnReader::build(
124-
elementChildType, params, *scanSpec.children()[1]);
133+
elementChildType,
134+
params,
135+
*scanSpec.children()[1],
136+
outputType ? outputType->valueType() : nullptr,
137+
pool);
125138
reinterpret_cast<const ParquetTypeWithId*>(requestedType.get())
126139
->makeLevelInfo(levelInfo_);
127140
children_ = {keyReader_.get(), elementReader_.get()};
@@ -218,15 +231,21 @@ void MapColumnReader::filterRowGroups(
218231
ListColumnReader::ListColumnReader(
219232
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
220233
ParquetParams& params,
221-
common::ScanSpec& scanSpec)
234+
common::ScanSpec& scanSpec,
235+
const std::shared_ptr<const ArrayType>& outputType,
236+
memory::MemoryPool& pool)
222237
: dwio::common::SelectiveListColumnReader(
223238
requestedType,
224239
requestedType,
225240
params,
226241
scanSpec) {
227242
auto& childType = requestedType->childAt(0);
228-
child_ =
229-
ParquetColumnReader::build(childType, params, *scanSpec.children()[0]);
243+
child_ = ParquetColumnReader::build(
244+
childType,
245+
params,
246+
*scanSpec.children()[0],
247+
outputType ? outputType->elementType() : nullptr,
248+
pool);
230249
reinterpret_cast<const ParquetTypeWithId*>(requestedType.get())
231250
->makeLevelInfo(levelInfo_);
232251
children_ = {child_.get()};

velox/dwio/parquet/reader/RepeatedColumnReader.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
5858
MapColumnReader(
5959
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
6060
ParquetParams& params,
61-
common::ScanSpec& scanSpec);
61+
common::ScanSpec& scanSpec,
62+
const std::shared_ptr<const MapType>& outputType,
63+
memory::MemoryPool& pool);
6264

6365
void prepareRead(
6466
vector_size_t offset,
@@ -113,7 +115,9 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
113115
ListColumnReader(
114116
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
115117
ParquetParams& params,
116-
common::ScanSpec& scanSpec);
118+
common::ScanSpec& scanSpec,
119+
const std::shared_ptr<const ArrayType>& outputType,
120+
memory::MemoryPool& pool);
117121

118122
void prepareRead(
119123
vector_size_t offset,

velox/dwio/parquet/reader/StructColumnReader.cpp

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,47 @@ namespace facebook::velox::parquet {
2222
StructColumnReader::StructColumnReader(
2323
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
2424
ParquetParams& params,
25-
common::ScanSpec& scanSpec)
25+
common::ScanSpec& scanSpec,
26+
const RowTypePtr& outputType,
27+
memory::MemoryPool& pool)
2628
: SelectiveStructColumnReader(dataType, dataType, params, scanSpec) {
2729
auto& childSpecs = scanSpec_->stableChildren();
30+
std::vector<int> missingFields;
2831
for (auto i = 0; i < childSpecs.size(); ++i) {
2932
if (childSpecs[i]->isConstant()) {
3033
continue;
3134
}
32-
auto childDataType = fileType_->childByName(childSpecs[i]->fieldName());
35+
const auto& fieldName = childSpecs[i]->fieldName();
36+
if (outputType && !fileType_->containsChild(fieldName)) {
37+
missingFields.emplace_back(i);
38+
continue;
39+
}
40+
auto childDataType = fileType_->childByName(fieldName);
3341

34-
addChild(ParquetColumnReader::build(childDataType, params, *childSpecs[i]));
42+
addChild(ParquetColumnReader::build(
43+
childDataType,
44+
params,
45+
*childSpecs[i],
46+
outputType ? outputType->findChild(fieldName) : nullptr,
47+
pool));
3548
childSpecs[i]->setSubscript(children_.size() - 1);
3649
}
50+
51+
if (outputType) {
52+
// Set the struct as null if all the children fields in the output type are
53+
// missing and the number of child fields is more than one.
54+
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
55+
scanSpec_->setConstantValue(
56+
BaseVector::createNullConstant(outputType, 1, &pool));
57+
} else {
58+
// Set null constant for the missing child field of output type.
59+
for (int channel : missingFields) {
60+
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
61+
outputType->findChild(childSpecs[channel]->fieldName()), 1, &pool));
62+
}
63+
}
64+
}
65+
3766
auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
3867
if (type->parent()) {
3968
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
@@ -43,7 +72,10 @@ StructColumnReader::StructColumnReader(
4372
// this and the child.
4473
auto child = childForRepDefs_;
4574
for (;;) {
46-
assert(child);
75+
if (child == nullptr) {
76+
levelMode_ = LevelMode::kNulls;
77+
break;
78+
}
4779
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
4880
child->fileType().type()->kind() == TypeKind::MAP) {
4981
levelMode_ = LevelMode::kStructOverLists;

velox/dwio/parquet/reader/StructColumnReader.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader {
2626
StructColumnReader(
2727
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
2828
ParquetParams& params,
29-
common::ScanSpec& scanSpec);
29+
common::ScanSpec& scanSpec,
30+
const RowTypePtr& outputType,
31+
memory::MemoryPool& pool);
3032

3133
void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
3234
override;
1.32 KB
Binary file not shown.

0 commit comments

Comments
 (0)