From efbdb35a15e87d93f67c4cdd07a9b21cd29c3d5d Mon Sep 17 00:00:00 2001 From: Reetika Agrawal Date: Sat, 29 Mar 2025 02:18:43 +0530 Subject: [PATCH 1/2] Add Iceberg StatisticsFile cache invalidation procedure --- .../src/main/sphinx/connector/iceberg.rst | 7 +++ .../presto/iceberg/IcebergCommonModule.java | 2 + ...tisticsFileCacheInvalidationProcedure.java | 60 +++++++++++++++++++ .../iceberg/IcebergDistributedTestBase.java | 36 +++++++++++ 4 files changed, 105 insertions(+) create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/StatisticsFileCacheInvalidationProcedure.java diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 68c74720c37fc..9b357839ee06f 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1073,6 +1073,13 @@ Examples: CALL iceberg.system.fast_forward('schema_name', 'table_name', 'branch1', 'main'); +Statistics file cache invalidation procedure +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* Invalidate Statistics file cache :: + + CALL .system.invalidate_statistics_file_cache(); + Set Table Property ^^^^^^^^^^^^^^^^^^ diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 59c19af45b08b..2e0fe16d81dad 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -52,6 +52,7 @@ import com.facebook.presto.iceberg.procedure.RollbackToTimestampProcedure; import com.facebook.presto.iceberg.procedure.SetCurrentSnapshotProcedure; import com.facebook.presto.iceberg.procedure.SetTablePropertyProcedure; +import com.facebook.presto.iceberg.procedure.StatisticsFileCacheInvalidationProcedure; import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure; import com.facebook.presto.iceberg.statistics.StatisticsFileCache; import com.facebook.presto.iceberg.statistics.StatisticsFileCacheKey; @@ -183,6 +184,7 @@ protected void setup(Binder binder) procedures.addBinding().toProvider(FastForwardBranchProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(SetCurrentSnapshotProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(SetTablePropertyProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON); if (buildConfigObject(MetastoreClientConfig.class).isInvalidateMetastoreCacheProcedureEnabled()) { procedures.addBinding().toProvider(InvalidateMetastoreCacheProcedure.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/StatisticsFileCacheInvalidationProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/StatisticsFileCacheInvalidationProcedure.java new file mode 100644 index 0000000000000..0ddc4fd2c98f4 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/StatisticsFileCacheInvalidationProcedure.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.procedure; + +import com.facebook.presto.iceberg.statistics.StatisticsFileCache; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.procedure.Procedure; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.lang.invoke.MethodHandle; + +import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle; +import static java.util.Objects.requireNonNull; + +public class StatisticsFileCacheInvalidationProcedure + implements Provider +{ + private static final MethodHandle CACHE_DATA_INVALIDATION = methodHandle( + StatisticsFileCacheInvalidationProcedure.class, + "statisticsFileCacheInvalidation"); + + private final StatisticsFileCache statisticsFileCache; + + @Inject + public StatisticsFileCacheInvalidationProcedure(StatisticsFileCache statisticsFileCache) + { + this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + "system", + "invalidate_statistics_file_cache", + ImmutableList.of(), + CACHE_DATA_INVALIDATION.bindTo(this)); + } + + public void statisticsFileCacheInvalidation() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + statisticsFileCache.invalidateAll(); + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 504cfc6696c66..1d5c3c2b70002 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -2744,6 +2744,42 @@ public void testCreateSortedTableWithSortTransform(String columnName, String sor assertQueryFails(query, Pattern.quote(format("Unable to parse sort field: [%s]", sortField))); } + public void testStatisticsFileCacheInvalidationProcedure() + { + assertQuerySucceeds("CREATE TABLE test_statistics_file_cache_procedure(i int)"); + assertUpdate("INSERT INTO test_statistics_file_cache_procedure VALUES 1, 2, 3, 4, 5", 5); + assertQuerySucceeds("ANALYZE test_statistics_file_cache_procedure"); + for (int i = 0; i < 3; i++) { + assertQuerySucceeds("SHOW STATS FOR test_statistics_file_cache_procedure"); + } + + String jmxMetricsQuery = format("SELECT sum(\"cachestats.hitcount\"), sum(\"cachestats.size\"), sum(\"cachestats.misscount\") " + + "from jmx.current.\"com.facebook.presto.iceberg.statistics:name=%s,type=statisticsfilecache\"", getSession().getCatalog().get()); + + MaterializedResult result = computeActual(jmxMetricsQuery); + long afterHitCount = (long) result.getMaterializedRows().get(0).getField(0); + long afterCacheSize = (long) result.getMaterializedRows().get(0).getField(1); + long afterMissCount = (long) result.getMaterializedRows().get(0).getField(2); + assertTrue(afterHitCount > 0); + assertTrue(afterCacheSize > 0); + + //test invalidate_statistics_file_cache procedure + assertQuerySucceeds(format("CALL %s.system.invalidate_statistics_file_cache()", getSession().getCatalog().get())); + MaterializedResult resultAfterProcedure = computeActual(jmxMetricsQuery); + long afterProcedureCacheSize = (long) resultAfterProcedure.getMaterializedRows().get(0).getField(1); + assertTrue(afterProcedureCacheSize == 0); + + assertQuerySucceeds("SHOW STATS FOR test_statistics_file_cache_procedure"); + + MaterializedResult resultAfter = computeActual(jmxMetricsQuery); + long newCacheSize = (long) resultAfter.getMaterializedRows().get(0).getField(1); + long newMissCount = (long) resultAfter.getMaterializedRows().get(0).getField(2); + assertTrue(newCacheSize > 0); + assertTrue(afterMissCount < newMissCount); + + getQueryRunner().execute("DROP TABLE test_statistics_file_cache_procedure"); + } + @DataProvider(name = "sortedTableWithSortTransform") public static Object[][] sortedTableWithSortTransform() { From bdf92049ead17a3fab4e12ae9e7bcf7c7671275b Mon Sep 17 00:00:00 2001 From: Reetika Agrawal Date: Wed, 2 Apr 2025 00:06:21 +0530 Subject: [PATCH 2/2] Add Iceberg ManifestFile cache invalidation procedure --- .../src/main/sphinx/connector/iceberg.rst | 7 +++ .../presto/iceberg/IcebergCommonModule.java | 2 + ...anifestFileCacheInvalidationProcedure.java | 60 +++++++++++++++++++ .../hive/TestIcebergDistributedHive.java | 8 +++ 4 files changed, 77 insertions(+) create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ManifestFileCacheInvalidationProcedure.java diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 9b357839ee06f..9b417a2efbd5d 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1080,6 +1080,13 @@ Statistics file cache invalidation procedure CALL .system.invalidate_statistics_file_cache(); +Manifest file cache invalidation procedure +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* Invalidate Manifest file cache :: + + CALL .system.invalidate_manifest_file_cache(); + Set Table Property ^^^^^^^^^^^^^^^^^^ diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 2e0fe16d81dad..129a3fda539f8 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -46,6 +46,7 @@ import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider; import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure; import com.facebook.presto.iceberg.procedure.FastForwardBranchProcedure; +import com.facebook.presto.iceberg.procedure.ManifestFileCacheInvalidationProcedure; import com.facebook.presto.iceberg.procedure.RegisterTableProcedure; import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles; import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure; @@ -185,6 +186,7 @@ protected void setup(Binder binder) procedures.addBinding().toProvider(SetCurrentSnapshotProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(SetTablePropertyProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON); if (buildConfigObject(MetastoreClientConfig.class).isInvalidateMetastoreCacheProcedureEnabled()) { procedures.addBinding().toProvider(InvalidateMetastoreCacheProcedure.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ManifestFileCacheInvalidationProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ManifestFileCacheInvalidationProcedure.java new file mode 100644 index 0000000000000..6779af23be3e8 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ManifestFileCacheInvalidationProcedure.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.procedure; + +import com.facebook.presto.iceberg.ManifestFileCache; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.procedure.Procedure; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.lang.invoke.MethodHandle; + +import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle; +import static java.util.Objects.requireNonNull; + +public class ManifestFileCacheInvalidationProcedure + implements Provider +{ + private static final MethodHandle CACHE_DATA_INVALIDATION = methodHandle( + ManifestFileCacheInvalidationProcedure.class, + "manifestFileCacheInvalidation"); + + private final ManifestFileCache manifestFileCache; + + @Inject + public ManifestFileCacheInvalidationProcedure(ManifestFileCache manifestFileCache) + { + this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + "system", + "invalidate_manifest_file_cache", + ImmutableList.of(), + CACHE_DATA_INVALIDATION.bindTo(this)); + } + + public void manifestFileCacheInvalidation() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + manifestFileCache.invalidateAll(); + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java index 32b4893126f3a..73f322d856153 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java @@ -45,6 +45,7 @@ import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; +import static java.lang.String.format; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -125,6 +126,13 @@ public void testManifestFileCaching() assertTrue(secondQuery.minus(firstQuery).hitCount() > 0); assertTrue(manifestFileCache.size() > 0); + //test invalidate_manifest_file_cache procedure + assertQuerySucceeds(session, format("CALL %s.system.invalidate_manifest_file_cache()", catalogName)); + assertTrue(manifestFileCache.size() == 0); + assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i"); + CacheStats thirdQuery = manifestFileCache.stats(); + assertTrue(secondQuery.missCount() < thirdQuery.missCount()); + assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache"); assertQuerySucceeds(session, "DROP SCHEMA default"); }