Skip to content

Commit e64095c

Browse files
sdruzkinfacebook-github-bot
authored andcommitted
Fix writer regression caused by memory reallocation in ensureNullsCapacity (facebookincubator#247)
Summary: Pull Request resolved: facebookincubator#247 Fix up to 32x writer regression caused by memory reallocation of the exact size without growing in ensureNullsCapacity by providing the grows policy. This regression was identified by DISCO runs on biggest tables and then narrowed by CPU profiling. Differential Revision: D82172070
1 parent 70939ce commit e64095c

File tree

4 files changed

+55
-22
lines changed

4 files changed

+55
-22
lines changed

dwio/nimble/velox/CMakeLists.txt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ target_link_libraries(nimble_velox_schema_builder nimble_velox_schema_reader
2929

3030
add_library(nimble_velox_stream_data StreamData.cpp)
3131
target_link_libraries(nimble_velox_stream_data nimble_velox_schema_builder
32-
nimble_common)
32+
nimble_common nimble_velox_buffer_growth_policy)
3333

3434
add_library(nimble_velox_field_reader FieldReader.cpp)
3535
target_link_libraries(
@@ -40,11 +40,13 @@ add_library(nimble_velox_layout_planner LayoutPlanner.cpp)
4040
target_link_libraries(nimble_velox_layout_planner nimble_velox_schema_reader
4141
velox_file)
4242

43-
add_library(nimble_velox_field_writer BufferGrowthPolicy.cpp
44-
DeduplicationUtils.cpp FieldWriter.cpp)
43+
add_library(nimble_velox_buffer_growth_policy BufferGrowthPolicy.cpp)
44+
target_link_libraries(nimble_velox_buffer_growth_policy nimble_common)
45+
46+
add_library(nimble_velox_field_writer DeduplicationUtils.cpp FieldWriter.cpp)
4547
target_link_libraries(
4648
nimble_velox_field_writer nimble_velox_schema nimble_velox_stream_data
47-
nimble_velox_schema_builder Folly::folly)
49+
nimble_velox_schema_builder nimble_velox_buffer_growth_policy Folly::folly)
4850

4951
build_flatbuffers(
5052
"${CMAKE_CURRENT_SOURCE_DIR}/Schema.fbs"
@@ -111,6 +113,7 @@ add_library(
111113
ChunkedStreamWriter.cpp VeloxWriterDefaultMetadataOSS.cpp)
112114
target_link_libraries(
113115
nimble_velox_writer
116+
nimble_velox_buffer_growth_policy
114117
nimble_encodings
115118
nimble_common
116119
nimble_tablet_writer

dwio/nimble/velox/FieldWriter.cpp

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
#include "dwio/nimble/velox/FieldWriter.h"
1717
#include "dwio/nimble/common/Exceptions.h"
18-
#include "dwio/nimble/common/Types.h"
1918
#include "dwio/nimble/velox/DeduplicationUtils.h"
2019
#include "dwio/nimble/velox/SchemaBuilder.h"
2120
#include "dwio/nimble/velox/SchemaTypes.h"
@@ -296,7 +295,8 @@ class SimpleFieldWriter : public FieldWriter {
296295
uint64_t nullCount = 0;
297296

298297
if (auto flat = vector->asFlatVector<SourceType>()) {
299-
valuesStream_.ensureNullsCapacity(flat->mayHaveNulls(), size);
298+
valuesStream_.ensureNullsCapacity(
299+
flat->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
300300
bool rangeCopied = false;
301301
if (!flat->mayHaveNulls()) {
302302
if constexpr (
@@ -342,7 +342,8 @@ class SimpleFieldWriter : public FieldWriter {
342342
} else {
343343
auto decodingContext = context_.getDecodingContext();
344344
auto& decoded = decodingContext.decode(vector, ranges);
345-
valuesStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
345+
valuesStream_.ensureNullsCapacity(
346+
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
346347
auto nonNullCount = iterateNonNullValues(
347348
ranges,
348349
valuesStream_.mutableNonNulls(),
@@ -408,7 +409,8 @@ class RowFieldWriter : public FieldWriter {
408409
"Schema mismatch: expected {} fields, but got {} fields",
409410
fields_.size(),
410411
row->childrenSize()));
411-
nullsStream_.ensureNullsCapacity(vector->mayHaveNulls(), size);
412+
nullsStream_.ensureNullsCapacity(
413+
vector->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
412414
if (row->mayHaveNulls()) {
413415
childRangesPtr = &childRanges;
414416
auto nonNullCount = iterateNonNullIndices<true>(
@@ -432,7 +434,8 @@ class RowFieldWriter : public FieldWriter {
432434
fields_.size(),
433435
row->childrenSize()));
434436
childRangesPtr = &childRanges;
435-
nullsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
437+
nullsStream_.ensureNullsCapacity(
438+
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
436439
auto nonNullCount = iterateNonNullIndices<true>(
437440
ranges,
438441
nullsStream_.mutableNonNulls(),
@@ -521,7 +524,8 @@ class MultiValueFieldWriter : public FieldWriter {
521524
offsets = casted->rawOffsets();
522525
lengths = casted->rawSizes();
523526

524-
lengthsStream_.ensureNullsCapacity(casted->mayHaveNulls(), size);
527+
lengthsStream_.ensureNullsCapacity(
528+
casted->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
525529
auto nonNullCount = iterateNonNullIndices<true>(
526530
ranges, lengthsStream_.mutableNonNulls(), Flat{vector}, proc);
527531
nullCount = size - nonNullCount;
@@ -533,7 +537,8 @@ class MultiValueFieldWriter : public FieldWriter {
533537
offsets = casted->rawOffsets();
534538
lengths = casted->rawSizes();
535539

536-
lengthsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
540+
lengthsStream_.ensureNullsCapacity(
541+
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
537542
auto nonNullCount = iterateNonNullIndices<true>(
538543
ranges, lengthsStream_.mutableNonNulls(), Decoded{decoded}, proc);
539544
nullCount = size - nonNullCount;
@@ -747,7 +752,10 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
747752
if (mapVector) {
748753
rawOffsets = mapVector->rawOffsets();
749754
rawLengths = mapVector->rawSizes();
750-
offsetsStream_.ensureNullsCapacity(mapVector->mayHaveNulls(), size);
755+
offsetsStream_.ensureNullsCapacity(
756+
mapVector->mayHaveNulls(),
757+
size,
758+
context_.inputBufferGrowthPolicy.get());
751759
Flat iterableVector{vector};
752760
auto nonNullCount = iterateNonNullIndices<true>(
753761
ranges,
@@ -762,7 +770,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
762770
NIMBLE_ASSERT(mapVector, "Unexpected vector type");
763771
rawOffsets = mapVector->rawOffsets();
764772
rawLengths = mapVector->rawSizes();
765-
offsetsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
773+
offsetsStream_.ensureNullsCapacity(
774+
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
766775
Decoded iterableVector{decoded};
767776
auto nonNullCount = iterateNonNullIndices<true>(
768777
ranges,
@@ -974,7 +983,8 @@ class FlatMapFieldWriter : public FieldWriter {
974983
flatMap,
975984
"Unexpected vector type. Vector must be a decoded ROW vector.");
976985
const auto size = ranges.size();
977-
nullsStream_.ensureNullsCapacity(flatMap->mayHaveNulls(), size);
986+
nullsStream_.ensureNullsCapacity(
987+
flatMap->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
978988
const auto& keys = flatMap->type()->asRow().names();
979989
const auto& values = flatMap->children();
980990

@@ -1080,7 +1090,8 @@ class FlatMapFieldWriter : public FieldWriter {
10801090
offsets = map->rawOffsets();
10811091
lengths = map->rawSizes();
10821092

1083-
nullsStream_.ensureNullsCapacity(map->mayHaveNulls(), size);
1093+
nullsStream_.ensureNullsCapacity(
1094+
map->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
10841095
processVector(map, Flat{vector});
10851096
} else {
10861097
// Map is encoded. Decode.
@@ -1091,7 +1102,10 @@ class FlatMapFieldWriter : public FieldWriter {
10911102
offsets = map->rawOffsets();
10921103
lengths = map->rawSizes();
10931104

1094-
nullsStream_.ensureNullsCapacity(decodedMap.mayHaveNulls(), size);
1105+
nullsStream_.ensureNullsCapacity(
1106+
decodedMap.mayHaveNulls(),
1107+
size,
1108+
context_.inputBufferGrowthPolicy.get());
10951109
processVector(map, Decoded{decodedMap});
10961110
}
10971111

@@ -1357,7 +1371,10 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
13571371
const OrderedRanges& ranges,
13581372
OrderedRanges& filteredRanges) {
13591373
auto size = ranges.size();
1360-
offsetsStream_.ensureNullsCapacity(dictionaryVector.mayHaveNulls(), size);
1374+
offsetsStream_.ensureNullsCapacity(
1375+
dictionaryVector.mayHaveNulls(),
1376+
size,
1377+
context_.inputBufferGrowthPolicy.get());
13611378

13621379
auto& offsetsData = offsetsStream_.mutableData();
13631380
auto& lengthsData = lengthsStream_.mutableData();
@@ -1575,7 +1592,10 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
15751592
rawOffsets = arrayVector->rawOffsets();
15761593
rawLengths = arrayVector->rawSizes();
15771594

1578-
offsetsStream_.ensureNullsCapacity(arrayVector->mayHaveNulls(), size);
1595+
offsetsStream_.ensureNullsCapacity(
1596+
arrayVector->mayHaveNulls(),
1597+
size,
1598+
context_.inputBufferGrowthPolicy.get());
15791599
Flat iterableVector{vector};
15801600
iterateNonNullIndices<true>(
15811601
ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc);
@@ -1589,7 +1609,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
15891609
rawOffsets = arrayVector->rawOffsets();
15901610
rawLengths = arrayVector->rawSizes();
15911611

1592-
offsetsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
1612+
offsetsStream_.ensureNullsCapacity(
1613+
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
15931614
Decoded iterableVector{decoded};
15941615
iterateNonNullIndices<true>(
15951616
ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc);

dwio/nimble/velox/StreamData.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818

1919
namespace facebook::nimble {
2020

21-
void NullsStreamData::ensureNullsCapacity(bool mayHaveNulls, uint32_t size) {
21+
void NullsStreamData::ensureNullsCapacity(
22+
bool mayHaveNulls,
23+
uint32_t size,
24+
InputBufferGrowthPolicy* growthPolicy) {
2225
if (mayHaveNulls || hasNulls_) {
2326
auto newSize = bufferedCount_ + size;
24-
nonNulls_.reserve(newSize);
27+
auto newCapacity = growthPolicy->getExtendedCapacity(
28+
bufferedCount_ + size, nonNulls_.capacity());
29+
nonNulls_.reserve(newCapacity);
2530
if (!hasNulls_) {
2631
hasNulls_ = true;
2732
std::fill(nonNulls_.data(), nonNulls_.data() + bufferedCount_, true);

dwio/nimble/velox/StreamData.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <string_view>
2121

2222
#include "dwio/nimble/common/Vector.h"
23+
#include "dwio/nimble/velox/BufferGrowthPolicy.h"
2324
#include "dwio/nimble/velox/SchemaBuilder.h"
2425
#include "velox/common/memory/Memory.h"
2526

@@ -160,7 +161,10 @@ class NullsStreamData : public StreamData {
160161
}
161162
}
162163

163-
void ensureNullsCapacity(bool mayHaveNulls, uint32_t size);
164+
void ensureNullsCapacity(
165+
bool mayHaveNulls,
166+
uint32_t size,
167+
InputBufferGrowthPolicy* growthPolicy);
164168

165169
protected:
166170
Vector<bool> nonNulls_;

0 commit comments

Comments
 (0)