Skip to content

Commit 1409c8e

Browse files
peterenescumeta-codesync[bot]
authored andcommitted
refactor(nimble): Preserve MAP spec in flat map column readers
Summary: X-link: facebookincubator/velox#15803 A production issue S602695 VeloxRuntimeError blocking feature injection pipeline exposed an issue in the Nimble flat map reader. For context, earlier this half ROO feature injection was enabled in production and supported by both Velox map and flat map vector encodings. A particular pipeline was blocked this past weekend due to flat map reader crashing. A temporary mitigation was to use regular map instead of the flat map encoding as it appeared flat map reader was attempting to set values that were uninitialized by the reader. The culprit was how scan spec is constructed for flat map. Rather than appending all features to keys and values children, we should follow map scan spec construction by preserving the keys and values and create child columns using the preserved values child spec. Differential Revision: D89326920
1 parent b3c4a64 commit 1409c8e

File tree

3 files changed

+105
-23
lines changed

3 files changed

+105
-23
lines changed

dwio/nimble/velox/FieldWriter.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,10 +1060,6 @@ class FlatMapFieldWriter : public FieldWriter {
10601060
return;
10611061
}
10621062

1063-
// Only create keys on first call to write (with valid ranges). Subsequent
1064-
// calls must have the same set of keys, otherwise writer will throw.
1065-
bool populateMap = currentPassthroughFields_.empty();
1066-
10671063
const auto& values = flatMapVector->mapValues();
10681064
const auto& inMaps = flatMapVector->inMaps();
10691065

@@ -1082,8 +1078,13 @@ class FlatMapFieldWriter : public FieldWriter {
10821078
"FlatMapVector keys are not distinct.");
10831079
distinctKeySet.insert(key);
10841080

1085-
auto& writer = populateMap ? createPassthroughValueFieldWriter(key)
1086-
: findPassthroughValueFieldWriter(key);
1081+
// Only create keys on first call to write (with valid ranges). To
1082+
// account for key pruning, let's avoid failing on unfound subsequent
1083+
// keys.
1084+
auto existingPair = currentPassthroughFields_.find(key);
1085+
auto& writer = existingPair != currentPassthroughFields_.end()
1086+
? *existingPair->second
1087+
: createPassthroughValueFieldWriter(key);
10871088

10881089
if (inMaps[i]) {
10891090
writer.write(values[i], inMaps[i], childRanges);

dwio/nimble/velox/selective/FlatMapColumnReader.cpp

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ std::vector<KeyNode<T>> makeKeyNodes(
6666

6767
// Adjust the scan spec according to the output type.
6868
switch (outputType) {
69-
// For a kMap output, just need a scan spec for map keys and one for map
70-
// values.
71-
case FlatMapOutput::kMap: {
69+
// For a kMap and kFlatMap output, just need a scan spec for map keys and
70+
// one for map values.
71+
case FlatMapOutput::kMap:
72+
case FlatMapOutput::kFlatMap: {
7273
keysSpec = scanSpec.getOrCreateChild(
7374
common::Subfield(common::ScanSpec::kMapKeysFieldName));
7475
valuesSpec = scanSpec.getOrCreateChild(
@@ -78,18 +79,6 @@ std::vector<KeyNode<T>> makeKeyNodes(
7879
valuesSpec->setProjectOut(true);
7980
break;
8081
}
81-
// For a kFlatMap output, need to find the streams (distinct keys) to read
82-
// from the file (nimbleType).
83-
case FlatMapOutput::kFlatMap: {
84-
for (int i = 0; i < childrenCount; ++i) {
85-
auto key = parseKeyValue<T>(nimbleType.nameAt(i));
86-
auto spec = scanSpec.getOrCreateChild(nimbleType.nameAt(i));
87-
spec->setProjectOut(true);
88-
spec->setChannel(i);
89-
childSpecs[key] = spec;
90-
}
91-
break;
92-
}
9382
// For a kStruct output, the streams to be read are part of the scan spec
9483
// already.
9584
case FlatMapOutput::kStruct: {
@@ -113,7 +102,7 @@ std::vector<KeyNode<T>> makeKeyNodes(
113102
if (auto it = childSpecs.find(node.key);
114103
it != childSpecs.end() && !it->second->isConstant()) {
115104
childSpec = it->second;
116-
} else if (outputType != FlatMapOutput::kMap) {
105+
} else if (outputType == FlatMapOutput::kStruct) {
117106
// Column not selected in 'scanSpec', skipping it.
118107
continue;
119108
} else {
@@ -243,6 +232,7 @@ class FlatMapColumnReader
243232
for (int i = 0; i < keyNodes_.size(); ++i) {
244233
keyNodes_[i].reader->scanSpec()->setSubscript(i);
245234
children_[i] = keyNodes_[i].reader.get();
235+
children_[i]->setIsFlatMapValue(true);
246236

247237
rawKeys[i] = keyNodes_[i].key.get();
248238
}
@@ -305,6 +295,60 @@ class FlatMapColumnReader
305295
// decoders.
306296
}
307297

298+
// Same as FlatMapAsMapColumnReader.
299+
void read(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls) {
300+
numReads_ = scanSpec_->newRead();
301+
prepareRead<char>(offset, rows, incomingNulls);
302+
VELOX_DCHECK(!hasDeletion());
303+
auto activeRows = rows;
304+
auto* mapNulls =
305+
nullsInReadRange_ ? nullsInReadRange_->as<uint64_t>() : nullptr;
306+
if (scanSpec_->filter()) {
307+
auto kind = scanSpec_->filter()->kind();
308+
VELOX_CHECK(
309+
kind == velox::common::FilterKind::kIsNull ||
310+
kind == velox::common::FilterKind::kIsNotNull);
311+
filterNulls<int32_t>(
312+
rows, kind == velox::common::FilterKind::kIsNull, false);
313+
if (outputRows_.empty()) {
314+
for (auto* child : children_) {
315+
child->addParentNulls(offset, mapNulls, rows);
316+
}
317+
readOffset_ = offset + rows.back() + 1;
318+
return;
319+
}
320+
activeRows = outputRows_;
321+
}
322+
// Separate the loop to be cache friendly.
323+
for (auto* child : children_) {
324+
advanceFieldReader(child, offset);
325+
}
326+
for (auto* child : children_) {
327+
child->read(offset, activeRows, mapNulls);
328+
child->addParentNulls(offset, mapNulls, rows);
329+
}
330+
readOffset_ = offset + rows.back() + 1;
331+
}
332+
333+
void getValues(const RowSet& rows, VectorPtr* result) override {
334+
SelectiveFlatMapColumnReader::getValues(rows, result);
335+
336+
// After reading the flat map streams recursively, need to read the in map
337+
// buffers.
338+
if (result && *result) {
339+
auto flatMapVector = (*result)->as<FlatMapVector>();
340+
VELOX_CHECK(flatMapVector);
341+
342+
for (int i = 0; i < keyNodes_.size(); ++i) {
343+
auto& nimbleData = children_[i]->formatData().template as<NimbleData>();
344+
if (nimbleData.inMapBuffer()) {
345+
auto inMapBuffer = nimbleData.inMapBuffer();
346+
flatMapVector->inMapsAt(i, true) = inMapBuffer;
347+
}
348+
}
349+
}
350+
}
351+
308352
private:
309353
std::vector<KeyNode<T>> keyNodes_;
310354
};

dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1523,7 +1523,6 @@ TEST_P(SelectiveNimbleReaderTest, nativeFlatMap) {
15231523

15241524
// Dictionary wrapped keys
15251525
{
1526-
LOG(INFO) << "Dictionary wrapped keys";
15271526
testRoundtrip(constructFlatMap(
15281527
BaseVector::wrapInDictionary(
15291528
nullptr,
@@ -1564,6 +1563,44 @@ TEST_P(SelectiveNimbleReaderTest, nativeFlatMap) {
15641563
3, 0, makeFlatVector<int32_t>({1, 2, 3})))),
15651564
"FlatMapVector keys are not distinct.");
15661565
}
1566+
1567+
// Scan spec with map key filter
1568+
{
1569+
auto inputFlatMap = makeFlatMapVector<int64_t, int64_t>({
1570+
{{{1, 100}, {5, 500}, {10, 1000}, {15, 1500}, {20, 2000}}},
1571+
{{{2, 200}, {8, 800}, {12, 1200}}},
1572+
{{{3, 300}, {7, 700}, {18, 1800}, {25, 2500}}},
1573+
});
1574+
auto input = makeRowVector({inputFlatMap, inputFlatMap->toMapVector()});
1575+
1576+
VeloxWriterOptions writerOptions;
1577+
writerOptions.flatMapColumns = {"c0"};
1578+
auto fileContent =
1579+
test::createNimbleFile(*rootPool(), input, writerOptions, true);
1580+
1581+
// Test with map key filter [5, 12]
1582+
auto scanSpec = std::make_shared<common::ScanSpec>("root");
1583+
scanSpec->addAllChildFields(*input->type());
1584+
scanSpec->childByName("c0")
1585+
->childByName(common::ScanSpec::kMapKeysFieldName)
1586+
->setFilter(std::make_unique<common::BigintRange>(5, 12, false));
1587+
scanSpec->childByName("c1")
1588+
->childByName(common::ScanSpec::kMapKeysFieldName)
1589+
->setFilter(std::make_unique<common::BigintRange>(5, 12, false));
1590+
auto readers = makeReaders(input, fileContent, scanSpec, true);
1591+
1592+
// Expected output after filtering: only keys in [5, 12] remain
1593+
auto expectedFlatMap = makeFlatMapVector<int64_t, int64_t>({
1594+
{{{5, 500}, {10, 1000}}},
1595+
{{{8, 800}, {12, 1200}}},
1596+
{{{7, 700}}},
1597+
});
1598+
auto expected =
1599+
makeRowVector({expectedFlatMap->toMapVector(), expectedFlatMap});
1600+
validate(*expected, *readers.rowReader, inputFlatMap->size(), [](auto) {
1601+
return true;
1602+
});
1603+
}
15671604
}
15681605

15691606
TEST_P(SelectiveNimbleReaderTest, mapAsStruct) {

0 commit comments

Comments
 (0)