Skip to content

Commit 4db8d47

Browse files
Yuhtaericyuliu
authored andcommitted
Optimize PagedInputStream::Skip (facebookincubator#6699)
Summary: Pull Request resolved: facebookincubator#6699 Currently when we skip bytes in `PagedInputStream`, we do the decompression unconditionally and it is expensive. Some optimizations are added to address this: 1. Skip decompression of the whole block (frame in case of ZSTD) if 1. We can get the precise decompressed size, and 2. The decompressed size is no larger than the bytes need to skip 2. Accumulate contiguous skip calls to create larger skip region (delayed skipping) Reviewed By: oerling Differential Revision: D49501856 fbshipit-source-id: 07241aaf71e83f0f491050a9be6075dd5500dd52
1 parent 00cfe77 commit 4db8d47

File tree

5 files changed

+178
-54
lines changed

5 files changed

+178
-54
lines changed

velox/dwio/common/compression/Compression.cpp

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -246,24 +246,32 @@ class ZstdDecompressor : public Decompressor {
246246
explicit ZstdDecompressor(
247247
uint64_t blockSize,
248248
const std::string& streamDebugInfo)
249-
: Decompressor{blockSize, streamDebugInfo} {}
249+
: Decompressor{blockSize, streamDebugInfo}, context_{ZSTD_createDCtx()} {}
250+
251+
~ZstdDecompressor() override {
252+
ZSTD_freeDCtx(context_);
253+
}
250254

251255
uint64_t decompress(
252256
const char* src,
253257
uint64_t srcLength,
254258
char* dest,
255259
uint64_t destLength) override;
256260

257-
uint64_t getUncompressedLength(const char* src, uint64_t srcLength)
258-
const override;
261+
std::pair<int64_t, bool> getDecompressedLength(
262+
const char* src,
263+
uint64_t srcLength) const override;
264+
265+
private:
266+
ZSTD_DCtx* context_;
259267
};
260268

261269
uint64_t ZstdDecompressor::decompress(
262270
const char* src,
263271
uint64_t srcLength,
264272
char* dest,
265273
uint64_t destLength) {
266-
auto ret = ZSTD_decompress(dest, destLength, src, srcLength);
274+
auto ret = ZSTD_decompressDCtx(context_, dest, destLength, src, srcLength);
267275
DWIO_ENSURE(
268276
!ZSTD_isError(ret),
269277
"ZSTD returned an error: ",
@@ -273,22 +281,22 @@ uint64_t ZstdDecompressor::decompress(
273281
return ret;
274282
}
275283

276-
uint64_t ZstdDecompressor::getUncompressedLength(
284+
std::pair<int64_t, bool> ZstdDecompressor::getDecompressedLength(
277285
const char* src,
278286
uint64_t srcLength) const {
279287
auto uncompressedLength = ZSTD_getFrameContentSize(src, srcLength);
280288
// in the case when decompression size is not available, return the upper
281289
// bound
282290
if (uncompressedLength == ZSTD_CONTENTSIZE_UNKNOWN ||
283291
uncompressedLength == ZSTD_CONTENTSIZE_ERROR) {
284-
return blockSize_;
292+
return {blockSize_, false};
285293
}
286294
DWIO_ENSURE_LE(
287295
uncompressedLength,
288296
blockSize_,
289297
"Insufficient buffer size. Info: ",
290298
streamDebugInfo_);
291-
return uncompressedLength;
299+
return {uncompressedLength, true};
292300
}
293301

294302
class SnappyDecompressor : public Decompressor {
@@ -304,16 +312,17 @@ class SnappyDecompressor : public Decompressor {
304312
char* dest,
305313
uint64_t destLength) override;
306314

307-
uint64_t getUncompressedLength(const char* src, uint64_t srcLength)
308-
const override;
315+
std::pair<int64_t, bool> getDecompressedLength(
316+
const char* src,
317+
uint64_t srcLength) const override;
309318
};
310319

311320
uint64_t SnappyDecompressor::decompress(
312321
const char* src,
313322
uint64_t srcLength,
314323
char* dest,
315324
uint64_t destLength) {
316-
auto length = getUncompressedLength(src, srcLength);
325+
auto [length, _] = getDecompressedLength(src, srcLength);
317326
DWIO_ENSURE_GE(destLength, length);
318327
DWIO_ENSURE(
319328
snappy::RawUncompress(src, srcLength, dest),
@@ -322,23 +331,24 @@ uint64_t SnappyDecompressor::decompress(
322331
return length;
323332
}
324333

325-
uint64_t SnappyDecompressor::getUncompressedLength(
334+
std::pair<int64_t, bool> SnappyDecompressor::getDecompressedLength(
326335
const char* src,
327336
uint64_t srcLength) const {
328337
size_t uncompressedLength;
329338
// in the case when decompression size is not available, return the upper
330339
// bound
331340
if (!snappy::GetUncompressedLength(src, srcLength, &uncompressedLength)) {
332-
return blockSize_;
341+
return {blockSize_, false};
333342
}
334343
DWIO_ENSURE_LE(
335344
uncompressedLength,
336345
blockSize_,
337346
"Insufficient buffer size. Info: ",
338347
streamDebugInfo_);
339-
return uncompressedLength;
348+
return {uncompressedLength, true};
340349
}
341350

351+
// TODO: Is this really needed?
342352
class ZlibDecompressionStream : public PagedInputStream,
343353
private ZlibDecompressor {
344354
public:
@@ -355,13 +365,18 @@ class ZlibDecompressionStream : public PagedInputStream,
355365
ZlibDecompressor{blockSize, windowBits, streamDebugInfo, isGzip} {}
356366
~ZlibDecompressionStream() override = default;
357367

358-
bool Next(const void** data, int32_t* size) override;
368+
bool readOrSkip(const void** data, int32_t* size) override;
359369
};
360370

361-
bool ZlibDecompressionStream::Next(const void** data, int32_t* size) {
371+
bool ZlibDecompressionStream::readOrSkip(const void** data, int32_t* size) {
372+
if (data) {
373+
VELOX_CHECK_EQ(pendingSkip_, 0);
374+
}
362375
// if the user pushed back, return them the partial buffer
363376
if (outputBufferLength_) {
364-
*data = outputBufferPtr_;
377+
if (data) {
378+
*data = outputBufferPtr_;
379+
}
365380
*size = static_cast<int32_t>(outputBufferLength_);
366381
outputBufferPtr_ += outputBufferLength_;
367382
bytesReturned_ += outputBufferLength_;
@@ -381,7 +396,9 @@ bool ZlibDecompressionStream::Next(const void** data, int32_t* size) {
381396
static_cast<size_t>(inputBufferPtrEnd_ - inputBufferPtr_),
382397
remainingLength_);
383398
if (state_ == State::ORIGINAL) {
384-
*data = inputBufferPtr_;
399+
if (data) {
400+
*data = inputBufferPtr_;
401+
}
385402
*size = static_cast<int32_t>(availSize);
386403
outputBufferPtr_ = inputBufferPtr_ + availSize;
387404
outputBufferLength_ = 0;
@@ -393,7 +410,8 @@ bool ZlibDecompressionStream::Next(const void** data, int32_t* size) {
393410
getName(),
394411
" Info: ",
395412
ZlibDecompressor::streamDebugInfo_);
396-
prepareOutputBuffer(getUncompressedLength(inputBufferPtr_, availSize));
413+
prepareOutputBuffer(
414+
getDecompressedLength(inputBufferPtr_, availSize).first);
397415

398416
reset();
399417
zstream_.next_in =
@@ -432,7 +450,9 @@ bool ZlibDecompressionStream::Next(const void** data, int32_t* size) {
432450
}
433451
} while (result != Z_STREAM_END);
434452
*size = static_cast<int32_t>(blockSize_ - zstream_.avail_out);
435-
*data = outputBufferPtr_;
453+
if (data) {
454+
*data = outputBufferPtr_;
455+
}
436456
outputBufferLength_ = 0;
437457
outputBufferPtr_ += *size;
438458
}

velox/dwio/common/compression/Compression.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ class Compressor {
4646
class Decompressor {
4747
public:
4848
explicit Decompressor(uint64_t blockSize, const std::string& streamDebugInfo)
49-
: blockSize_{blockSize}, streamDebugInfo_{streamDebugInfo} {}
49+
: blockSize_{static_cast<int64_t>(blockSize)},
50+
streamDebugInfo_{streamDebugInfo} {}
5051

5152
virtual ~Decompressor() = default;
5253

53-
virtual uint64_t getUncompressedLength(
54-
const char* /* unused */,
55-
uint64_t /* unused */) const {
56-
return blockSize_;
54+
virtual std::pair<int64_t, bool /* Is the size exact? */>
55+
getDecompressedLength(const char* /* src */, uint64_t /* srcLength */) const {
56+
return {blockSize_, false};
5757
}
5858

5959
virtual uint64_t decompress(
@@ -63,7 +63,7 @@ class Decompressor {
6363
uint64_t destLength) = 0;
6464

6565
protected:
66-
uint64_t blockSize_;
66+
int64_t blockSize_;
6767
const std::string streamDebugInfo_;
6868
};
6969

velox/dwio/common/compression/PagedInputStream.cpp

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,21 @@ const char* PagedInputStream::ensureInput(size_t availableInputBytes) {
104104
}
105105

106106
bool PagedInputStream::Next(const void** data, int32_t* size) {
107+
VELOX_CHECK_NOT_NULL(data);
108+
skipAllPending();
109+
return readOrSkip(data, size);
110+
}
111+
112+
// Read into `data' if it is not null; otherwise skip some of the pending.
113+
bool PagedInputStream::readOrSkip(const void** data, int32_t* size) {
114+
if (data) {
115+
VELOX_CHECK_EQ(pendingSkip_, 0);
116+
}
107117
// if the user pushed back, return them the partial buffer
108118
if (outputBufferLength_) {
109-
*data = outputBufferPtr_;
119+
if (data) {
120+
*data = outputBufferPtr_;
121+
}
110122
*size = static_cast<int32_t>(outputBufferLength_);
111123
outputBufferPtr_ += outputBufferLength_;
112124
bytesReturned_ += outputBufferLength_;
@@ -139,7 +151,9 @@ bool PagedInputStream::Next(const void** data, int32_t* size) {
139151
// if no decompression or decryption is needed, simply adjust the output
140152
// pointer. Otherwise, make sure we have continuous block
141153
if (original) {
142-
*data = inputBufferPtr_;
154+
if (data) {
155+
*data = inputBufferPtr_;
156+
}
143157
*size = static_cast<int32_t>(availSize);
144158
outputBufferPtr_ = inputBufferPtr_ + availSize;
145159
inputBufferPtr_ += availSize;
@@ -154,24 +168,35 @@ bool PagedInputStream::Next(const void** data, int32_t* size) {
154168
decrypter_->decrypt(folly::StringPiece{input, remainingLength_});
155169
input = reinterpret_cast<const char*>(decryptionBuffer_->data());
156170
remainingLength_ = decryptionBuffer_->length();
157-
*data = input;
171+
if (data) {
172+
*data = input;
173+
}
158174
*size = remainingLength_;
159175
outputBufferPtr_ = input + remainingLength_;
160176
}
161177

162178
// perform decompression
163179
if (state_ == State::START) {
164180
DWIO_ENSURE_NOT_NULL(decompressor_.get(), "invalid stream state");
165-
prepareOutputBuffer(
166-
decompressor_->getUncompressedLength(input, remainingLength_));
167-
outputBufferLength_ = decompressor_->decompress(
168-
input,
169-
remainingLength_,
170-
outputBuffer_->data(),
171-
outputBuffer_->capacity());
172-
*data = outputBuffer_->data();
173-
*size = static_cast<int32_t>(outputBufferLength_);
174-
outputBufferPtr_ = outputBuffer_->data() + outputBufferLength_;
181+
DWIO_ENSURE_NOT_NULL(input);
182+
auto [decompressedLength, exact] =
183+
decompressor_->getDecompressedLength(input, remainingLength_);
184+
if (!data && exact && decompressedLength <= pendingSkip_) {
185+
*size = decompressedLength;
186+
outputBufferPtr_ = nullptr;
187+
} else {
188+
prepareOutputBuffer(decompressedLength);
189+
outputBufferLength_ = decompressor_->decompress(
190+
input,
191+
remainingLength_,
192+
outputBuffer_->data(),
193+
outputBuffer_->capacity());
194+
if (data) {
195+
*data = outputBuffer_->data();
196+
}
197+
*size = static_cast<int32_t>(outputBufferLength_);
198+
outputBufferPtr_ = outputBuffer_->data() + outputBufferLength_;
199+
}
175200
// release decryption buffer
176201
decryptionBuffer_ = nullptr;
177202
}
@@ -188,6 +213,14 @@ bool PagedInputStream::Next(const void** data, int32_t* size) {
188213
}
189214

190215
void PagedInputStream::BackUp(int32_t count) {
216+
if (pendingSkip_ > 0) {
217+
auto len = std::min<int64_t>(count, pendingSkip_);
218+
pendingSkip_ -= len;
219+
count -= len;
220+
if (count == 0) {
221+
return;
222+
}
223+
}
191224
DWIO_ENSURE(
192225
outputBufferPtr_ != nullptr,
193226
"Backup without previous Next in ",
@@ -207,25 +240,29 @@ void PagedInputStream::BackUp(int32_t count) {
207240
bytesReturned_ -= count;
208241
}
209242

210-
bool PagedInputStream::Skip(int32_t count) {
211-
// this is a stupid implementation for now.
212-
// should skip entire blocks without decompressing
213-
while (count > 0) {
214-
const void* ptr;
243+
bool PagedInputStream::skipAllPending() {
244+
while (pendingSkip_ > 0) {
215245
int32_t len;
216-
if (!Next(&ptr, &len)) {
246+
if (!readOrSkip(nullptr, &len)) {
217247
return false;
218248
}
219-
if (len > count) {
220-
BackUp(len - count);
221-
count = 0;
249+
if (len > pendingSkip_) {
250+
auto toBackUp = len - pendingSkip_;
251+
pendingSkip_ = 0;
252+
BackUp(toBackUp);
222253
} else {
223-
count -= len;
254+
pendingSkip_ -= len;
224255
}
225256
}
226257
return true;
227258
}
228259

260+
bool PagedInputStream::Skip(int32_t count) {
261+
pendingSkip_ += count;
262+
// We never use the return value of this function so this is OK.
263+
return true;
264+
}
265+
229266
void PagedInputStream::clearDecompressionState() {
230267
state_ = State::HEADER;
231268
outputBufferLength_ = 0;
@@ -243,7 +280,8 @@ void PagedInputStream::seekToPosition(
243280
// to the beginning of the last view or last header, whichever is
244281
// later. If we are returning views into the decompression buffer,
245282
// we can backup to the beginning of the decompressed buffer
246-
auto alreadyRead = bytesReturned_ - bytesReturnedAtLastHeaderOffset_;
283+
auto alreadyRead =
284+
bytesReturned_ - bytesReturnedAtLastHeaderOffset_ + pendingSkip_;
247285

248286
// outsideOriginalWindow is true if we are returning views into
249287
// the input stream's buffer and we are seeking below the start of the last
@@ -260,12 +298,12 @@ void PagedInputStream::seekToPosition(
260298
auto provider = dwio::common::PositionProvider(positions);
261299
input_->seekToPosition(provider);
262300
clearDecompressionState();
263-
Skip(uncompressedOffset);
301+
pendingSkip_ = uncompressedOffset;
264302
} else {
265303
if (uncompressedOffset < alreadyRead) {
266304
BackUp(alreadyRead - uncompressedOffset);
267305
} else {
268-
Skip(uncompressedOffset - alreadyRead);
306+
pendingSkip_ += uncompressedOffset - alreadyRead;
269307
}
270308
}
271309
}

0 commit comments

Comments
 (0)