Skip to content

Commit 3987d35

Browse files
authored
Merge pull request #6546 from gchq/6538-add-configuration-option-to-set-parquet-maximum-row-group-size-by-rows
Issue 6538 - Add configuration option to set parquet maximum row group size by rows
2 parents b2c311c + 05d0509 commit 3987d35

File tree

11 files changed

+50
-13
lines changed

11 files changed

+50
-13
lines changed

docs/usage/properties/instance/user/table_property_defaults.md

Lines changed: 2 additions & 1 deletion
Large diffs are not rendered by default.

docs/usage/properties/table/data_storage.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ The following table properties relate to the storage of data inside a table.
44

55
| Property Name | Description | Default Value |
66
|----------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
7-
| sleeper.table.rowgroup.size | The size of the row group in the Parquet files - defaults to the value in the instance properties. | 8388608 |
7+
| sleeper.table.rowgroup.size | Maximum number of bytes to write in a Parquet row group (defaults to value set in instance properties). This property is NOT used by DataFusion data engine. | 8388608 |
88
| sleeper.table.page.size | The size of the page in the Parquet files - defaults to the value in the instance properties. | 131072 |
99
| sleeper.table.parquet.dictionary.encoding.rowkey.fields | Whether dictionary encoding should be used for row key columns in the Parquet files. | false |
1010
| sleeper.table.parquet.dictionary.encoding.sortkey.fields | Whether dictionary encoding should be used for sort key columns in the Parquet files. | false |
@@ -14,6 +14,7 @@ The following table properties relate to the storage of data inside a table.
1414
| sleeper.table.datafusion.s3.readahead.enabled | Enables a cache of data when reading from S3 with the DataFusion data engine, to hold data in larger blocks than are requested by DataFusion. | true |
1515
| sleeper.table.parquet.writer.version | Used to set parquet.writer.version, see documentation here:<br>https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md<br>Can be either v1 or v2. The v2 pages store levels uncompressed while v1 pages compress levels with the data. | v2 |
1616
| sleeper.table.parquet.query.column.index.enabled | Used during Parquet queries to determine whether the column indexes are used. | false |
17+
| sleeper.table.parquet.rowgroup.rows.max | Maximum number of rows to write in a Parquet row group. | 1000000 |
1718
| sleeper.table.fs.s3a.readahead.range | The S3 readahead range - defaults to the row group size. | 8388608 |
1819
| sleeper.table.compression.codec | The compression codec to use for this table. Defaults to the value in the instance properties.<br>Valid values are: [uncompressed, snappy, gzip, lzo, brotli, lz4, zstd] | zstd |
1920
| sleeper.table.gc.delay.minutes | A file will not be deleted until this number of minutes have passed after it has been marked as ready for garbage collection. The reason for not deleting files immediately after they have been marked as ready for garbage collection is that they may still be in use by queries. Defaults to the value set in the instance properties. | 15 |

example/full/instance.properties

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1774,7 +1774,8 @@ sleeper.logging.root.level=INFO
17741774
# (default value shown below, uncomment to set a value)
17751775
# sleeper.default.table.parquet.query.column.index.enabled=false
17761776

1777-
# The size of the row group in the Parquet files (default is 8MiB).
1777+
# Maximum number of bytes to write in a Parquet row group (default is 8MiB). This property is NOT used
1778+
# by DataFusion data engine.
17781779
# (default value shown below, uncomment to set a value)
17791780
# sleeper.default.table.rowgroup.size=8388608
17801781

@@ -1823,6 +1824,10 @@ sleeper.logging.root.level=INFO
18231824
# (default value shown below, uncomment to set a value)
18241825
# sleeper.default.table.parquet.writer.version=v2
18251826

1827+
# Maximum number of rows to write in a Parquet row group.
1828+
# (default value shown below, uncomment to set a value)
1829+
# sleeper.default.table.parquet.rowgroup.rows.max=1000000
1830+
18261831
# The number of attempts to make when applying a transaction to the state store. This default can be
18271832
# overridden by a table property.
18281833
# (default value shown below, uncomment to set a value)

example/full/table.properties

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ sleeper.table.splits.file=example/full/splits.txt
1717

1818
## The following table properties relate to the storage of data inside a table.
1919

20-
# The size of the row group in the Parquet files - defaults to the value in the instance properties.
20+
# Maximum number of bytes to write in a Parquet row group (defaults to value set in instance
21+
# properties). This property is NOT used by DataFusion data engine.
2122
sleeper.table.rowgroup.size=8388608
2223

2324
# The size of the page in the Parquet files - defaults to the value in the instance properties.
@@ -177,6 +178,10 @@ sleeper.table.statestore.classname=DynamoDBTransactionLogStateStore
177178
# (default value shown below, uncomment to set a value)
178179
# sleeper.table.parquet.query.column.index.enabled=false
179180

181+
# Maximum number of rows to write in a Parquet row group.
182+
# (default value shown below, uncomment to set a value)
183+
# sleeper.table.parquet.rowgroup.rows.max=1000000
184+
180185
# The S3 readahead range - defaults to the row group size.
181186
# (default value shown below, uncomment to set a value)
182187
# sleeper.table.fs.s3a.readahead.range=8388608

java/clients/src/test/java/sleeper/clients/admin/InstanceConfigurationScreenTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,9 @@ void shouldEditAPropertyThatWasPreviouslyUnsetButHadADefaultProperty() throws Ex
622622
.contains("Found changes to properties:\n" +
623623
"\n" +
624624
"sleeper.table.rowgroup.size\n" +
625-
"The size of the row group in the Parquet files - defaults to the value in the instance properties.\n" +
625+
"Maximum number of bytes to write in a Parquet row group " +
626+
"(defaults to value set in instance\n" +
627+
"properties). This property is NOT used by DataFusion data engine.\n" +
626628
"Unset before, default value: 8388608\n" +
627629
"After: 123\n")
628630
.endsWith(PROPERTY_SAVE_CHANGES_SCREEN + PROMPT_SAVE_SUCCESSFUL_RETURN_TO_MAIN + DISPLAY_MAIN_SCREEN);

java/common/parquet/src/main/java/sleeper/parquet/row/ParquetRowWriterFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import static sleeper.core.properties.table.TableProperty.DICTIONARY_ENCODING_FOR_SORT_KEY_FIELDS;
4040
import static sleeper.core.properties.table.TableProperty.DICTIONARY_ENCODING_FOR_VALUE_FIELDS;
4141
import static sleeper.core.properties.table.TableProperty.PAGE_SIZE;
42+
import static sleeper.core.properties.table.TableProperty.PARQUET_ROW_GROUP_SIZE_ROWS;
4243
import static sleeper.core.properties.table.TableProperty.PARQUET_WRITER_VERSION;
4344
import static sleeper.core.properties.table.TableProperty.ROW_GROUP_SIZE;
4445
import static sleeper.core.properties.table.TableProperty.STATISTICS_TRUNCATE_LENGTH;
@@ -72,6 +73,7 @@ public static Builder parquetRowWriterBuilder(Path path, TableProperties tablePr
7273
return new Builder(path, tableProperties.getSchema())
7374
.withCompressionCodec(tableProperties.get(COMPRESSION_CODEC))
7475
.withRowGroupSize(tableProperties.getLong(ROW_GROUP_SIZE))
76+
.withRowGroupRowCountLimit(tableProperties.getInt(PARQUET_ROW_GROUP_SIZE_ROWS))
7577
.withPageSize(tableProperties.getInt(PAGE_SIZE))
7678
.withDictionaryEncodingForRowKeyFields(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_ROW_KEY_FIELDS))
7779
.withDictionaryEncodingForSortKeyFields(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_SORT_KEY_FIELDS))

java/compaction/compaction-datafusion/src/main/java/sleeper/compaction/datafusion/DataFusionCompactionRunner.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,13 @@
4747
import static sleeper.core.properties.table.TableProperty.DICTIONARY_ENCODING_FOR_SORT_KEY_FIELDS;
4848
import static sleeper.core.properties.table.TableProperty.DICTIONARY_ENCODING_FOR_VALUE_FIELDS;
4949
import static sleeper.core.properties.table.TableProperty.PAGE_SIZE;
50+
import static sleeper.core.properties.table.TableProperty.PARQUET_ROW_GROUP_SIZE_ROWS;
5051
import static sleeper.core.properties.table.TableProperty.PARQUET_WRITER_VERSION;
5152
import static sleeper.core.properties.table.TableProperty.STATISTICS_TRUNCATE_LENGTH;
5253

5354
@SuppressFBWarnings("UUF_UNUSED_FIELD")
5455
public class DataFusionCompactionRunner implements CompactionRunner {
5556
private static final Logger LOGGER = LoggerFactory.getLogger(DataFusionCompactionRunner.class);
56-
/** Maximum number of rows in a Parquet row group. */
57-
public static final long DATAFUSION_MAX_ROW_GROUP_ROWS = 1_000_000;
5857

5958
private final DataFusionAwsConfig awsConfig;
6059
private final Configuration hadoopConf;
@@ -118,7 +117,7 @@ private static FFICommonConfig createCompactionParams(CompactionJob job, TablePr
118117
params.row_key_cols.populate(schema.getRowKeyFieldNames().toArray(String[]::new), false);
119118
params.row_key_schema.populate(FFICommonConfig.getKeyTypes(schema.getRowKeyTypes()), false);
120119
params.sort_key_cols.populate(schema.getSortKeyFieldNames().toArray(String[]::new), false);
121-
params.max_row_group_size.set(DATAFUSION_MAX_ROW_GROUP_ROWS);
120+
params.max_row_group_size.set(tableProperties.getInt(PARQUET_ROW_GROUP_SIZE_ROWS));
122121
params.max_page_size.set(tableProperties.getInt(PAGE_SIZE));
123122
params.compression.set(tableProperties.get(COMPRESSION_CODEC));
124123
params.writer_version.set(tableProperties.get(PARQUET_WRITER_VERSION));

java/core/src/main/java/sleeper/core/properties/instance/TableDefaultProperty.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public interface TableDefaultProperty {
4040
.validationPredicate(SleeperPropertyValueUtils::isTrueOrFalse)
4141
.propertyGroup(InstancePropertyGroup.TABLE_PROPERTY_DEFAULT).build();
4242
UserDefinedInstanceProperty DEFAULT_ROW_GROUP_SIZE = Index.propertyBuilder("sleeper.default.table.rowgroup.size")
43-
.description("The size of the row group in the Parquet files (default is 8MiB).")
43+
.description("Maximum number of bytes to write in a Parquet row group (default is 8MiB). " +
44+
"This property is NOT used by DataFusion data engine.")
4445
.defaultValue("" + (8 * 1024 * 1024)) // 8 MiB
4546
.propertyGroup(InstancePropertyGroup.TABLE_PROPERTY_DEFAULT).build();
4647
UserDefinedInstanceProperty DEFAULT_PAGE_SIZE = Index.propertyBuilder("sleeper.default.table.page.size")
@@ -88,14 +89,18 @@ public interface TableDefaultProperty {
8889
.defaultValue("true")
8990
.validationPredicate(SleeperPropertyValueUtils::isTrueOrFalse)
9091
.propertyGroup(InstancePropertyGroup.TABLE_PROPERTY_DEFAULT).build();
91-
9292
UserDefinedInstanceProperty DEFAULT_PARQUET_WRITER_VERSION = Index.propertyBuilder("sleeper.default.table.parquet.writer.version")
9393
.description("Used to set parquet.writer.version, see documentation here:\n" +
9494
"https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md\n" +
9595
"Can be either v1 or v2. The v2 pages store levels uncompressed while v1 pages compress levels with the data.")
9696
.defaultValue("v2")
9797
.validationPredicate(List.of("v1", "v2")::contains)
9898
.propertyGroup(InstancePropertyGroup.TABLE_PROPERTY_DEFAULT).build();
99+
UserDefinedInstanceProperty DEFAULT_PARQUET_ROWGROUP_ROWS = Index.propertyBuilder("sleeper.default.table.parquet.rowgroup.rows.max")
100+
.description("Maximum number of rows to write in a Parquet row group.")
101+
.defaultValue("1000000")
102+
.validationPredicate(SleeperPropertyValueUtils::isPositiveInteger)
103+
.propertyGroup(InstancePropertyGroup.TABLE_PROPERTY_DEFAULT).build();
99104
UserDefinedInstanceProperty DEFAULT_ADD_TRANSACTION_MAX_ATTEMPTS = Index.propertyBuilder("sleeper.default.table.statestore.transactionlog.add.transaction.max.attempts")
100105
.description("The number of attempts to make when applying a transaction to the state store. " +
101106
"This default can be overridden by a table property.")

0 commit comments

Comments
 (0)