Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,8 @@ Transform Name Source Types
``Identity`` ``boolean``, ``int``, ``bigint``, ``real``, ``double``, ``decimal``,
``varchar``, ``varbinary``, ``date``, ``time``, ``timestamp``

``Bucket`` ``int``, ``bigint``, ``decimal``, ``varchar``, ``varbinary``, ``date``
``Bucket`` ``int``, ``bigint``, ``decimal``, ``varchar``, ``varbinary``, ``date``,
``time``
Comment thread
ZacBlanco marked this conversation as resolved.

``Truncate`` ``int``, ``bigint``, ``decimal``, ``varchar``, ``varbinary``

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static com.facebook.presto.common.type.Decimals.isShortDecimal;
import static com.facebook.presto.common.type.Decimals.readBigDecimal;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.TimeType.TIME;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TypeUtils.readNativeValue;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
Expand All @@ -51,6 +52,7 @@
import static java.lang.Math.floorDiv;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.joda.time.chrono.ISOChronology.getInstanceUTC;

public final class PartitionTransforms
Expand Down Expand Up @@ -157,6 +159,11 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type type
block -> bucketDate(block, count),
(block, position) -> bucketValueDate(block, position, count));
}
if (type.equals(TIME)) {
return new ColumnTransform(transform, INTEGER,
block -> bucketTime(block, count),
(block, position) -> bucketValueTime(block, position, count));
}
if (type instanceof VarcharType) {
return new ColumnTransform(transform, INTEGER,
block -> bucketVarchar(block, count),
Expand Down Expand Up @@ -309,6 +316,16 @@ private static int bucketValueDate(Block block, int position, int count)
return bucketValue(block, position, count, pos -> bucketHash(DATE.getLong(block, pos)));
}

private static Block bucketTime(Block block, int count)
{
return bucketBlock(block, count, position -> bucketHash(MILLISECONDS.toMicros(TIME.getLong(block, position))));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm my understanding, we need to do this MILLISECONDS.toMicros here because the bucketing needs to be compatible with the bucketing of other systems? Is ther anything else stopping us from just doing bucketHash directly on the TIME.getLong(...)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be compatible with the bucketing of other systems is one of the reason, but the more important reason is to be compatible with Iceberg's file planning with filter. When we query the partitioned table with a filter on column of type TimeType as follows:

select * from test_bucket_transform_on_time where a = time '01:02:03.123';

We will actually first use the predicate a = time '01:02:03.123' to execute Iceberg's file planning. Iceberg will use this time value internally to calculate partition values, and then scan some specific partitions, if the calculated partition value is not compatible, we won't find the corresponding data. So we have to make sure that the calculation of partition value is exactly the same as Iceberg lib, that is, use the micro seconds value to calculate the partition value.

So if we doing bucketHash directly on the TIME.getLong(...), we will find that the example query above get an empty result.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was roughly my thinking. Thanks for confirming

}

private static int bucketValueTime(Block block, int position, int count)
{
return bucketValue(block, position, count, pos -> bucketHash(MILLISECONDS.toMicros(TIME.getLong(block, pos))));
}

private static Block bucketVarchar(Block block, int count)
{
return bucketBlock(block, count, position -> bucketHash(VARCHAR.getSlice(block, position)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1537,12 +1537,39 @@ public void testPartitionTransformOnUUID()
@Test
public void testBucketTransformOnTime()
{
//TODO: Not yet support bucket transform for time, which was supported by Iceberg
assertUpdate("create table test_bucket_transform_time(col_time time)" +
"with (partitioning = ARRAY['bucket(col_time, 2)'])");
assertQueryFails("insert into test_bucket_transform_time values(time '01:02:03.123')",
"Unsupported type for 'bucket': 1000: col_time_bucket: bucket\\[2\\]\\(1\\)");
assertUpdate("drop table if exists test_bucket_transform_time");
testWithAllFileFormats(this::testBucketTransformsOnTimeForFormat);
}

private void testBucketTransformsOnTimeForFormat(Session session, FileFormat format)
{
String select = "SELECT a_bucket, row_count, a.min AS a_min, a.max AS a_max, b.min AS b_min, b.max AS b_max FROM \"test_bucket_transform_on_time$partitions\"";

assertUpdate(session, format("CREATE TABLE test_bucket_transform_on_time (a TIME, b BIGINT)" +
" WITH (\"write.format.default\" = '%s', partitioning = ARRAY['bucket(a, 4)'])", format.name()));
String insertSql = "INSERT INTO test_bucket_transform_on_time VALUES" +
"(time '01:02:03.123', 1)," +
"(time '21:22:50.002', 2)," +
"(time '12:13:14.345', 3)," +
"(time '00:00:01.001', 4)," +
"(time '23:23:59.999', 5)," +
"(time '00:00:00.000', 6)," +
"(time '07:31:55.425', 7)";
assertUpdate(session, insertSql, 7);

assertQuery(session, "SELECT COUNT(*) FROM \"test_bucket_transform_on_time$partitions\"", "SELECT 4");

assertQuery(session, select + " WHERE a_bucket = 0", "VALUES(0, 2, time '00:00:00.000', time '12:13:14.345', 3, 6)");
assertQuery(session, select + " WHERE a_bucket = 1", "VALUES(1, 1, time '23:23:59.999', time '23:23:59.999', 5, 5)");
assertQuery(session, select + " WHERE a_bucket = 2", "VALUES(2, 1, time '21:22:50.002', time '21:22:50.002', 2, 2)");
assertQuery(session, select + " WHERE a_bucket = 3", "VALUES(3, 3, time '00:00:01.001', time '07:31:55.425', 1, 7)");

assertQuery(session, "select * from test_bucket_transform_on_time where a = time '01:02:03.123'",
"VALUES(time '01:02:03.123', 1)");
assertQuery(session, "select * from test_bucket_transform_on_time where a > time '01:02:03.123' and a <= time '12:13:14.345'",
"VALUES(time '07:31:55.425', 7), (time '12:13:14.345', 3)");
assertQuery(session, "select * from test_bucket_transform_on_time where a in (time '00:00:01.001', time '21:22:50.002')",
"VALUES(time '00:00:01.001', 4), (time '21:22:50.002', 2)");
dropTable(session, "test_bucket_transform_on_time");
}

@Test
Expand Down