Skip to content

Commit e14fc1d

Browse files
svm1facebook-github-bot
authored andcommitted
feat(parquet): Add config for datapage version (#11151)
Summary: Add config and session properties `hive.parquet.writer.datapage-version`, `hive.parquet.writer.datapage_version`, to determine the parquet writer datapage version (V1 or V2). Defaults to V1. Pull Request resolved: #11151 Reviewed By: kagamiori Differential Revision: D69496525 Pulled By: kgpai fbshipit-source-id: 0a25a7f3383ad779a44e38b0c49825c52080ae92
1 parent 8e96f69 commit e14fc1d

File tree

4 files changed

+161
-0
lines changed

4 files changed

+161
-0
lines changed

velox/docs/configs.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,12 @@ Each query can override the config by setting corresponding query session proper
657657
- 9
658658
- Timestamp unit used when writing timestamps into Parquet through Arrow bridge.
659659
Valid values are 3 (millisecond), 6 (microsecond), and 9 (nanosecond).
660+
* - hive.parquet.writer.datapage-version
661+
- hive.parquet.writer.datapage_version
662+
- string
663+
- V1
664+
- Data Page version used when writing into Parquet through Arrow bridge.
665+
Valid values are "V1" and "V2".
660666

661667
``Amazon S3 Configuration``
662668
^^^^^^^^^^^^^^^^^^^^^^^^^^^

velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "velox/connectors/hive/HiveConnector.h" // @manual
2323
#include "velox/core/QueryCtx.h"
2424
#include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual
25+
#include "velox/dwio/parquet/reader/PageReader.h"
2526
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
2627
#include "velox/exec/Cursor.h"
2728
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
@@ -146,6 +147,124 @@ TEST_F(ParquetWriterTest, compression) {
146147
assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_);
147148
};
148149

150+
TEST_F(ParquetWriterTest, toggleDataPageVersion) {
151+
auto schema = ROW({"c0"}, {INTEGER()});
152+
const int64_t kRows = 1;
153+
const auto data = makeRowVector({
154+
makeFlatVector<int32_t>(kRows, [](auto row) { return 987; }),
155+
});
156+
157+
// Write Parquet test data, then read and return the DataPage
158+
// (thrift::PageType::type) used.
159+
const auto testDataPageVersion =
160+
[&](std::unordered_map<std::string, std::string> configFromFile,
161+
std::unordered_map<std::string, std::string> sessionProperties) {
162+
// Create an in-memory writer.
163+
auto sink = std::make_unique<MemorySink>(
164+
200 * 1024 * 1024,
165+
dwio::common::FileSink::Options{.pool = leafPool_.get()});
166+
auto sinkPtr = sink.get();
167+
parquet::WriterOptions writerOptions;
168+
writerOptions.memoryPool = leafPool_.get();
169+
170+
// Simulate setting of Hive config & connector session properties, then
171+
// write test data.
172+
auto connectorConfig = config::ConfigBase(std::move(configFromFile));
173+
auto connectorSessionProperties =
174+
config::ConfigBase(std::move(sessionProperties));
175+
176+
writerOptions.processConfigs(
177+
connectorConfig, connectorSessionProperties);
178+
auto writer = std::make_unique<parquet::Writer>(
179+
std::move(sink), writerOptions, rootPool_, schema);
180+
writer->write(data);
181+
writer->close();
182+
183+
// Read to identify DataPage used.
184+
dwio::common::ReaderOptions readerOptions{leafPool_.get()};
185+
auto reader = createReaderInMemory(*sinkPtr, readerOptions);
186+
187+
auto colChunkPtr = reader->fileMetaData().rowGroup(0).columnChunk(0);
188+
std::string_view sinkData(sinkPtr->data(), sinkPtr->size());
189+
190+
auto readFile = std::make_shared<InMemoryReadFile>(sinkData);
191+
auto file = std::make_shared<ReadFileInputStream>(std::move(readFile));
192+
193+
auto inputStream = std::make_unique<SeekableFileInputStream>(
194+
std::move(file),
195+
colChunkPtr.dataPageOffset(),
196+
150,
197+
*leafPool_,
198+
LogType::TEST);
199+
auto pageReader = std::make_unique<PageReader>(
200+
std::move(inputStream),
201+
*leafPool_,
202+
colChunkPtr.compression(),
203+
colChunkPtr.totalCompressedSize());
204+
205+
return pageReader->readPageHeader().type;
206+
};
207+
208+
// Test default behavior - DataPage should be V1.
209+
ASSERT_EQ(testDataPageVersion({}, {}), thrift::PageType::type::DATA_PAGE);
210+
211+
// Simulate setting DataPage version to V2 via Hive config from file.
212+
std::unordered_map<std::string, std::string> configFromFile = {
213+
{parquet::WriterOptions::kParquetHiveConnectorDataPageVersion, "V2"}};
214+
215+
ASSERT_EQ(
216+
testDataPageVersion(configFromFile, {}),
217+
thrift::PageType::type::DATA_PAGE_V2);
218+
219+
// Simulate setting DataPage version to V1 via Hive config from file.
220+
configFromFile = {
221+
{parquet::WriterOptions::kParquetHiveConnectorDataPageVersion, "V1"}};
222+
223+
ASSERT_EQ(
224+
testDataPageVersion(configFromFile, {}),
225+
thrift::PageType::type::DATA_PAGE);
226+
227+
// Simulate setting DataPage version to V2 via connector session property.
228+
std::unordered_map<std::string, std::string> sessionProperties = {
229+
{parquet::WriterOptions::kParquetSessionDataPageVersion, "V2"}};
230+
231+
ASSERT_EQ(
232+
testDataPageVersion({}, sessionProperties),
233+
thrift::PageType::type::DATA_PAGE_V2);
234+
235+
// Simulate setting DataPage version to V1 via connector session property.
236+
sessionProperties = {
237+
{parquet::WriterOptions::kParquetSessionDataPageVersion, "V1"}};
238+
239+
ASSERT_EQ(
240+
testDataPageVersion({}, sessionProperties),
241+
thrift::PageType::type::DATA_PAGE);
242+
243+
// Simulate setting DataPage version to V1 via connector session property,
244+
// and to V2 via Hive config from file. Session property should take
245+
// precedence.
246+
sessionProperties = {
247+
{parquet::WriterOptions::kParquetSessionDataPageVersion, "V1"}};
248+
configFromFile = {
249+
{parquet::WriterOptions::kParquetHiveConnectorDataPageVersion, "V2"}};
250+
251+
ASSERT_EQ(
252+
testDataPageVersion({}, sessionProperties),
253+
thrift::PageType::type::DATA_PAGE);
254+
255+
// Simulate setting DataPage version to V2 via connector session property,
256+
// and to V1 via Hive config from file. Session property should take
257+
// precedence.
258+
sessionProperties = {
259+
{parquet::WriterOptions::kParquetSessionDataPageVersion, "V2"}};
260+
configFromFile = {
261+
{parquet::WriterOptions::kParquetHiveConnectorDataPageVersion, "V1"}};
262+
263+
ASSERT_EQ(
264+
testDataPageVersion({}, sessionProperties),
265+
thrift::PageType::type::DATA_PAGE_V2);
266+
}
267+
149268
DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
150269
SCOPED_TESTVALUE_SET(
151270
"facebook::velox::parquet::Writer::write",

velox/dwio/parquet/writer/Writer.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,13 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
147147
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
148148
properties = properties->codec_options(options.codecOptions);
149149
properties = properties->enable_store_decimal_as_integer();
150+
if (options.useParquetDataPageV2.value_or(false)) {
151+
properties =
152+
properties->data_page_version(arrow::ParquetDataPageVersion::V2);
153+
} else {
154+
properties =
155+
properties->data_page_version(arrow::ParquetDataPageVersion::V1);
156+
}
150157
return properties->build();
151158
}
152159

@@ -238,6 +245,21 @@ std::optional<std::string> getTimestampTimeZone(
238245
return std::nullopt;
239246
}
240247

248+
std::optional<bool> getParquetDataPageVersion(
249+
const config::ConfigBase& config,
250+
const char* configKey) {
251+
if (const auto version = config.get<std::string>(configKey)) {
252+
if (version == "V1") {
253+
return false;
254+
} else if (version == "V2") {
255+
return true;
256+
} else {
257+
VELOX_FAIL("Unsupported parquet datapage version {}", version.value());
258+
}
259+
}
260+
return std::nullopt;
261+
}
262+
241263
} // namespace
242264

243265
Writer::Writer(
@@ -467,6 +489,15 @@ void WriterOptions::processConfigs(
467489
if (!parquetWriteTimestampTimeZone) {
468490
parquetWriteTimestampTimeZone = parquetWriterOptions->sessionTimezoneName;
469491
}
492+
493+
if (!useParquetDataPageV2) {
494+
useParquetDataPageV2 =
495+
getParquetDataPageVersion(session, kParquetSessionDataPageVersion)
496+
.has_value()
497+
? getParquetDataPageVersion(session, kParquetSessionDataPageVersion)
498+
: getParquetDataPageVersion(
499+
connectorConfig, kParquetHiveConnectorDataPageVersion);
500+
}
470501
}
471502

472503
} // namespace facebook::velox::parquet

velox/dwio/parquet/writer/Writer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ struct WriterOptions : public dwio::common::WriterOptions {
108108
/// Timestamp time zone for Parquet write through Arrow bridge.
109109
std::optional<std::string> parquetWriteTimestampTimeZone;
110110
bool writeInt96AsTimestamp = false;
111+
std::optional<bool> useParquetDataPageV2;
111112

112113
// Parsing session and hive configs.
113114

@@ -117,6 +118,10 @@ struct WriterOptions : public dwio::common::WriterOptions {
117118
"hive.parquet.writer.timestamp_unit";
118119
static constexpr const char* kParquetHiveConnectorWriteTimestampUnit =
119120
"hive.parquet.writer.timestamp-unit";
121+
static constexpr const char* kParquetSessionDataPageVersion =
122+
"hive.parquet.writer.datapage_version";
123+
static constexpr const char* kParquetHiveConnectorDataPageVersion =
124+
"hive.parquet.writer.datapage-version";
120125

121126
// Process hive connector and session configs.
122127
void processConfigs(

0 commit comments

Comments
 (0)