Skip to content

Commit dfe16d4

Browse files
Yuhtafacebook-github-bot
authored andcommitted
Optimize PagedInputStream::Skip (#6699)
Summary: Currently when we skip bytes in `PagedInputStream`, we do the decompression unconditionally and it is expensive. Some optimizations are added to address this: 1. Skip decompression of the whole block (frame in case of ZSTD) if 1. We can get the precise decompressed size, and 2. The decompressed size is no larger than the bytes need to skip 2. Accumulate contiguous skip calls to create larger skip region (delayed skipping) 3. Fix `ByteRleDecoder::skipBytes` to avoid reading data and breaking contiguous skips Differential Revision: D49501856
1 parent eb4d908 commit dfe16d4

File tree

7 files changed

+185
-59
lines changed

7 files changed

+185
-59
lines changed

velox/dwio/common/CacheInputStream.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ void CacheInputStream::loadSync(Region region) {
212212
} else {
213213
// Hit memory cache.
214214
if (!entry->getAndClearFirstUseFlag()) {
215-
ioStats_->ramHit().increment(entry->size());
215+
ioStats_->ramHit().increment(region.length);
216216
}
217217
return;
218218
}
@@ -269,7 +269,7 @@ bool CacheInputStream::loadFromSsd(
269269
throw;
270270
}
271271
pin_ = std::move(pins[0]);
272-
ioStats_->ssdRead().increment(entry.size());
272+
ioStats_->ssdRead().increment(region.length);
273273
ioStats_->queryThreadIoLatency().increment(usec);
274274
entry.setExclusiveToShared();
275275
return true;

velox/dwio/common/compression/Compression.cpp

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -246,24 +246,31 @@ class ZstdDecompressor : public Decompressor {
246246
explicit ZstdDecompressor(
247247
uint64_t blockSize,
248248
const std::string& streamDebugInfo)
249-
: Decompressor{blockSize, streamDebugInfo} {}
249+
: Decompressor{blockSize, streamDebugInfo}, context_{ZSTD_createDCtx()} {}
250+
251+
~ZstdDecompressor() override {
252+
ZSTD_freeDCtx(context_);
253+
}
250254

251255
uint64_t decompress(
252256
const char* src,
253257
uint64_t srcLength,
254258
char* dest,
255259
uint64_t destLength) override;
256260

257-
uint64_t getUncompressedLength(const char* src, uint64_t srcLength)
261+
int64_t getDecompressedLength(const char* src, uint64_t srcLength)
258262
const override;
263+
264+
private:
265+
ZSTD_DCtx* context_;
259266
};
260267

261268
uint64_t ZstdDecompressor::decompress(
262269
const char* src,
263270
uint64_t srcLength,
264271
char* dest,
265272
uint64_t destLength) {
266-
auto ret = ZSTD_decompress(dest, destLength, src, srcLength);
273+
auto ret = ZSTD_decompressDCtx(context_, dest, destLength, src, srcLength);
267274
DWIO_ENSURE(
268275
!ZSTD_isError(ret),
269276
"ZSTD returned an error: ",
@@ -273,15 +280,15 @@ uint64_t ZstdDecompressor::decompress(
273280
return ret;
274281
}
275282

276-
uint64_t ZstdDecompressor::getUncompressedLength(
283+
int64_t ZstdDecompressor::getDecompressedLength(
277284
const char* src,
278285
uint64_t srcLength) const {
279286
auto uncompressedLength = ZSTD_getFrameContentSize(src, srcLength);
280287
// in the case when decompression size is not available, return the upper
281288
// bound
282289
if (uncompressedLength == ZSTD_CONTENTSIZE_UNKNOWN ||
283290
uncompressedLength == ZSTD_CONTENTSIZE_ERROR) {
284-
return blockSize_;
291+
return -blockSize_;
285292
}
286293
DWIO_ENSURE_LE(
287294
uncompressedLength,
@@ -304,7 +311,7 @@ class SnappyDecompressor : public Decompressor {
304311
char* dest,
305312
uint64_t destLength) override;
306313

307-
uint64_t getUncompressedLength(const char* src, uint64_t srcLength)
314+
int64_t getDecompressedLength(const char* src, uint64_t srcLength)
308315
const override;
309316
};
310317

@@ -313,7 +320,7 @@ uint64_t SnappyDecompressor::decompress(
313320
uint64_t srcLength,
314321
char* dest,
315322
uint64_t destLength) {
316-
auto length = getUncompressedLength(src, srcLength);
323+
auto length = std::abs(getDecompressedLength(src, srcLength));
317324
DWIO_ENSURE_GE(destLength, length);
318325
DWIO_ENSURE(
319326
snappy::RawUncompress(src, srcLength, dest),
@@ -322,14 +329,14 @@ uint64_t SnappyDecompressor::decompress(
322329
return length;
323330
}
324331

325-
uint64_t SnappyDecompressor::getUncompressedLength(
332+
int64_t SnappyDecompressor::getDecompressedLength(
326333
const char* src,
327334
uint64_t srcLength) const {
328335
size_t uncompressedLength;
329336
// in the case when decompression size is not available, return the upper
330337
// bound
331338
if (!snappy::GetUncompressedLength(src, srcLength, &uncompressedLength)) {
332-
return blockSize_;
339+
return -blockSize_;
333340
}
334341
DWIO_ENSURE_LE(
335342
uncompressedLength,
@@ -339,6 +346,7 @@ uint64_t SnappyDecompressor::getUncompressedLength(
339346
return uncompressedLength;
340347
}
341348

349+
// TODO: Is this really needed?
342350
class ZlibDecompressionStream : public PagedInputStream,
343351
private ZlibDecompressor {
344352
public:
@@ -355,13 +363,18 @@ class ZlibDecompressionStream : public PagedInputStream,
355363
ZlibDecompressor{blockSize, windowBits, streamDebugInfo, isGzip} {}
356364
~ZlibDecompressionStream() override = default;
357365

358-
bool Next(const void** data, int32_t* size) override;
366+
bool readOrSkip(const void** data, int32_t* size) override;
359367
};
360368

361-
bool ZlibDecompressionStream::Next(const void** data, int32_t* size) {
369+
bool ZlibDecompressionStream::readOrSkip(const void** data, int32_t* size) {
370+
if (data) {
371+
VELOX_CHECK_EQ(pendingSkip_, 0);
372+
}
362373
// if the user pushed back, return them the partial buffer
363374
if (outputBufferLength_) {
364-
*data = outputBufferPtr_;
375+
if (data) {
376+
*data = outputBufferPtr_;
377+
}
365378
*size = static_cast<int32_t>(outputBufferLength_);
366379
outputBufferPtr_ += outputBufferLength_;
367380
bytesReturned_ += outputBufferLength_;
@@ -381,7 +394,9 @@ bool ZlibDecompressionStream::Next(const void** data, int32_t* size) {
381394
static_cast<size_t>(inputBufferPtrEnd_ - inputBufferPtr_),
382395
remainingLength_);
383396
if (state_ == State::ORIGINAL) {
384-
*data = inputBufferPtr_;
397+
if (data) {
398+
*data = inputBufferPtr_;
399+
}
385400
*size = static_cast<int32_t>(availSize);
386401
outputBufferPtr_ = inputBufferPtr_ + availSize;
387402
outputBufferLength_ = 0;
@@ -393,7 +408,8 @@ bool ZlibDecompressionStream::Next(const void** data, int32_t* size) {
393408
getName(),
394409
" Info: ",
395410
ZlibDecompressor::streamDebugInfo_);
396-
prepareOutputBuffer(getUncompressedLength(inputBufferPtr_, availSize));
411+
prepareOutputBuffer(
412+
std::abs(getDecompressedLength(inputBufferPtr_, availSize)));
397413

398414
reset();
399415
zstream_.next_in =
@@ -432,7 +448,9 @@ bool ZlibDecompressionStream::Next(const void** data, int32_t* size) {
432448
}
433449
} while (result != Z_STREAM_END);
434450
*size = static_cast<int32_t>(blockSize_ - zstream_.avail_out);
435-
*data = outputBufferPtr_;
451+
if (data) {
452+
*data = outputBufferPtr_;
453+
}
436454
outputBufferLength_ = 0;
437455
outputBufferPtr_ += *size;
438456
}

velox/dwio/common/compression/Compression.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,17 @@ class Compressor {
4646
class Decompressor {
4747
public:
4848
explicit Decompressor(uint64_t blockSize, const std::string& streamDebugInfo)
49-
: blockSize_{blockSize}, streamDebugInfo_{streamDebugInfo} {}
49+
: blockSize_{static_cast<int64_t>(blockSize)},
50+
streamDebugInfo_{streamDebugInfo} {}
5051

5152
virtual ~Decompressor() = default;
5253

53-
virtual uint64_t getUncompressedLength(
54-
const char* /* unused */,
55-
uint64_t /* unused */) const {
56-
return blockSize_;
54+
// Return negative number indicating this is an estimation and should not be
55+
// used for compressed skipping.
56+
virtual int64_t getDecompressedLength(
57+
const char* /* src */,
58+
uint64_t /* srcLength */) const {
59+
return -blockSize_;
5760
}
5861

5962
virtual uint64_t decompress(
@@ -63,7 +66,7 @@ class Decompressor {
6366
uint64_t destLength) = 0;
6467

6568
protected:
66-
uint64_t blockSize_;
69+
int64_t blockSize_;
6770
const std::string streamDebugInfo_;
6871
};
6972

velox/dwio/common/compression/PagedInputStream.cpp

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,21 @@ const char* PagedInputStream::ensureInput(size_t availableInputBytes) {
104104
}
105105

106106
bool PagedInputStream::Next(const void** data, int32_t* size) {
107+
VELOX_CHECK_NOT_NULL(data);
108+
skipAllPending();
109+
return readOrSkip(data, size);
110+
}
111+
112+
// Read into `data' if it is not null; otherwise skip some of the pending.
113+
bool PagedInputStream::readOrSkip(const void** data, int32_t* size) {
114+
if (data) {
115+
VELOX_CHECK_EQ(pendingSkip_, 0);
116+
}
107117
// if the user pushed back, return them the partial buffer
108118
if (outputBufferLength_) {
109-
*data = outputBufferPtr_;
119+
if (data) {
120+
*data = outputBufferPtr_;
121+
}
110122
*size = static_cast<int32_t>(outputBufferLength_);
111123
outputBufferPtr_ += outputBufferLength_;
112124
bytesReturned_ += outputBufferLength_;
@@ -139,7 +151,9 @@ bool PagedInputStream::Next(const void** data, int32_t* size) {
139151
// if no decompression or decryption is needed, simply adjust the output
140152
// pointer. Otherwise, make sure we have continuous block
141153
if (original) {
142-
*data = inputBufferPtr_;
154+
if (data) {
155+
*data = inputBufferPtr_;
156+
}
143157
*size = static_cast<int32_t>(availSize);
144158
outputBufferPtr_ = inputBufferPtr_ + availSize;
145159
inputBufferPtr_ += availSize;
@@ -154,24 +168,36 @@ bool PagedInputStream::Next(const void** data, int32_t* size) {
154168
decrypter_->decrypt(folly::StringPiece{input, remainingLength_});
155169
input = reinterpret_cast<const char*>(decryptionBuffer_->data());
156170
remainingLength_ = decryptionBuffer_->length();
157-
*data = input;
171+
if (data) {
172+
*data = input;
173+
}
158174
*size = remainingLength_;
159175
outputBufferPtr_ = input + remainingLength_;
160176
}
161177

162178
// perform decompression
163179
if (state_ == State::START) {
164180
DWIO_ENSURE_NOT_NULL(decompressor_.get(), "invalid stream state");
165-
prepareOutputBuffer(
166-
decompressor_->getUncompressedLength(input, remainingLength_));
167-
outputBufferLength_ = decompressor_->decompress(
168-
input,
169-
remainingLength_,
170-
outputBuffer_->data(),
171-
outputBuffer_->capacity());
172-
*data = outputBuffer_->data();
173-
*size = static_cast<int32_t>(outputBufferLength_);
174-
outputBufferPtr_ = outputBuffer_->data() + outputBufferLength_;
181+
DWIO_ENSURE_NOT_NULL(input);
182+
auto decompressedLength =
183+
decompressor_->getDecompressedLength(input, remainingLength_);
184+
if (!data && 0 <= decompressedLength &&
185+
decompressedLength <= pendingSkip_) {
186+
*size = decompressedLength;
187+
outputBufferPtr_ = nullptr;
188+
} else {
189+
prepareOutputBuffer(std::abs(decompressedLength));
190+
outputBufferLength_ = decompressor_->decompress(
191+
input,
192+
remainingLength_,
193+
outputBuffer_->data(),
194+
outputBuffer_->capacity());
195+
if (data) {
196+
*data = outputBuffer_->data();
197+
}
198+
*size = static_cast<int32_t>(outputBufferLength_);
199+
outputBufferPtr_ = outputBuffer_->data() + outputBufferLength_;
200+
}
175201
// release decryption buffer
176202
decryptionBuffer_ = nullptr;
177203
}
@@ -188,6 +214,14 @@ bool PagedInputStream::Next(const void** data, int32_t* size) {
188214
}
189215

190216
void PagedInputStream::BackUp(int32_t count) {
217+
if (pendingSkip_ > 0) {
218+
auto len = std::min<int64_t>(count, pendingSkip_);
219+
pendingSkip_ -= len;
220+
count -= len;
221+
if (count == 0) {
222+
return;
223+
}
224+
}
191225
DWIO_ENSURE(
192226
outputBufferPtr_ != nullptr,
193227
"Backup without previous Next in ",
@@ -207,25 +241,29 @@ void PagedInputStream::BackUp(int32_t count) {
207241
bytesReturned_ -= count;
208242
}
209243

210-
bool PagedInputStream::Skip(int32_t count) {
211-
// this is a stupid implementation for now.
212-
// should skip entire blocks without decompressing
213-
while (count > 0) {
214-
const void* ptr;
244+
bool PagedInputStream::skipAllPending() {
245+
while (pendingSkip_ > 0) {
215246
int32_t len;
216-
if (!Next(&ptr, &len)) {
247+
if (!readOrSkip(nullptr, &len)) {
217248
return false;
218249
}
219-
if (len > count) {
220-
BackUp(len - count);
221-
count = 0;
250+
if (len > pendingSkip_) {
251+
auto toBackUp = len - pendingSkip_;
252+
pendingSkip_ = 0;
253+
BackUp(toBackUp);
222254
} else {
223-
count -= len;
255+
pendingSkip_ -= len;
224256
}
225257
}
226258
return true;
227259
}
228260

261+
bool PagedInputStream::Skip(int32_t count) {
262+
pendingSkip_ += count;
263+
// We never use the return value of this function so this is OK.
264+
return true;
265+
}
266+
229267
void PagedInputStream::clearDecompressionState() {
230268
state_ = State::HEADER;
231269
outputBufferLength_ = 0;
@@ -243,7 +281,8 @@ void PagedInputStream::seekToPosition(
243281
// to the beginning of the last view or last header, whichever is
244282
// later. If we are returning views into the decompression buffer,
245283
// we can backup to the beginning of the decompressed buffer
246-
auto alreadyRead = bytesReturned_ - bytesReturnedAtLastHeaderOffset_;
284+
auto alreadyRead =
285+
bytesReturned_ - bytesReturnedAtLastHeaderOffset_ + pendingSkip_;
247286

248287
// outsideOriginalWindow is true if we are returning views into
249288
// the input stream's buffer and we are seeking below the start of the last
@@ -260,12 +299,12 @@ void PagedInputStream::seekToPosition(
260299
auto provider = dwio::common::PositionProvider(positions);
261300
input_->seekToPosition(provider);
262301
clearDecompressionState();
263-
Skip(uncompressedOffset);
302+
pendingSkip_ = uncompressedOffset;
264303
} else {
265304
if (uncompressedOffset < alreadyRead) {
266305
BackUp(alreadyRead - uncompressedOffset);
267306
} else {
268-
Skip(uncompressedOffset - alreadyRead);
307+
pendingSkip_ += uncompressedOffset - alreadyRead;
269308
}
270309
}
271310
}

0 commit comments

Comments
 (0)