Skip to content

Commit daed824

Browse files
committed
Add metrics iceberg.numSplits and iceberg.numDeletes
1 parent 7effb9e commit daed824

File tree

4 files changed

+101
-3
lines changed

4 files changed

+101
-3
lines changed

velox/connectors/hive/iceberg/IcebergSplitReader.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ void IcebergSplitReader::prepareSplit(
7575
splitOffset_ = baseRowReader_->nextRowNumber();
7676
positionalDeleteFileReaders_.clear();
7777

78+
runtimeStats_ = &runtimeStats;
79+
runtimeStats.unitLoaderStats.addCounter("iceberg.numSplits", RuntimeCounter(1));
80+
7881
const auto& deleteFiles = icebergSplit->deleteFiles;
7982
for (const auto& deleteFile : deleteFiles) {
8083
if (deleteFile.content == FileContent::kPositionalDeletes) {
@@ -138,6 +141,11 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) {
138141

139142
auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation);
140143

144+
if (mutation.deletedRows != nullptr && runtimeStats_ != nullptr) {
145+
auto numDeletes = bits::countBits(mutation.deletedRows, 0, actualSize);
146+
runtimeStats_->unitLoaderStats.addCounter("iceberg.numDeletes", RuntimeCounter(numDeletes));
147+
}
148+
141149
return rowsScanned;
142150
}
143151

velox/connectors/hive/iceberg/IcebergSplitReader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,7 @@ class IcebergSplitReader : public SplitReader {
101101
std::list<std::unique_ptr<PositionalDeleteFileReader>>
102102
positionalDeleteFileReaders_;
103103
BufferPtr deleteBitmap_;
104+
// Pointer to runtime stats for tracking iceberg metrics
105+
dwio::common::RuntimeStatistics* runtimeStats_{nullptr};
104106
};
105107
} // namespace facebook::velox::connector::hive::iceberg

velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
2727
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
2828
#include "velox/exec/tests/utils/PlanBuilder.h"
29+
#include "velox/exec/TaskStats.h"
2930

3031
#ifdef VELOX_ENABLE_PARQUET
3132
#include "velox/dwio/parquet/RegisterParquetReader.h"
@@ -181,7 +182,7 @@ class HiveIcebergTest : public HiveConnectorTestBase {
181182
/// positions for data_file_1 and data_file_2. THere are 3 RowGroups in this
182183
/// delete file, the first two contain positions for data_file_1, and the last
183184
/// contain positions for data_file_2
184-
void assertPositionalDeletes(
185+
std::shared_ptr<exec::Task> assertPositionalDeletes(
185186
const std::map<std::string, std::vector<int64_t>>& rowGroupSizesForFiles,
186187
const std::unordered_map<
187188
std::string,
@@ -242,8 +243,11 @@ class HiveIcebergTest : public HiveConnectorTestBase {
242243
auto planStats = toPlanStats(task->taskStats());
243244

244245
auto it = planStats.find(plan->id());
245-
ASSERT_TRUE(it != planStats.end());
246-
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
246+
EXPECT_TRUE(it != planStats.end());
247+
if (it != planStats.end()) {
248+
EXPECT_TRUE(it->second.peakMemoryBytes > 0);
249+
}
250+
return task;
247251
}
248252

249253
const static int rowCount = 20000;
@@ -947,4 +951,48 @@ TEST_F(HiveIcebergTest, positionalDeleteFileWithRowGroupFilter) {
947951
0);
948952
}
949953
#endif
954+
955+
TEST_F(HiveIcebergTest, icebergMetrics) {
956+
folly::SingletonVault::singleton()->registrationComplete();
957+
958+
// Helper function to aggregate a runtime metric across all pipelines and operators
959+
auto getAggregatedRuntimeMetric = [](const exec::TaskStats& taskStats, const std::string& metricName) -> int64_t {
960+
int64_t total = 0;
961+
for (const auto& pipelineStats : taskStats.pipelineStats) {
962+
for (const auto& operatorStats : pipelineStats.operatorStats) {
963+
auto it = operatorStats.runtimeStats.find(metricName);
964+
if (it != operatorStats.runtimeStats.end()) {
965+
total += it->second.sum;
966+
}
967+
}
968+
}
969+
return total;
970+
};
971+
972+
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles = {
973+
{"data_file_1", {100, 85}}};
974+
std::unordered_map<
975+
std::string,
976+
std::multimap<std::string, std::vector<int64_t>>>
977+
deleteFilesForBaseDatafiles;
978+
deleteFilesForBaseDatafiles["delete_file_1"] = {{"data_file_1", {0, 1, 99}}};
979+
auto task =
980+
assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles);
981+
const auto& taskStats = task->taskStats();
982+
983+
ASSERT_EQ(getAggregatedRuntimeMetric(taskStats, "iceberg.numSplits"), 1);
984+
ASSERT_EQ(getAggregatedRuntimeMetric(taskStats, "iceberg.numDeletes"), 3);
985+
986+
rowGroupSizesForFiles = {
987+
{"data_file_1", {100, 85}}, {"data_file_2", {99, 1}}};
988+
deleteFilesForBaseDatafiles.clear();
989+
deleteFilesForBaseDatafiles["delete_file_1"] = {
990+
{"data_file_1", {0, 100, 102, 184}}, {"data_file_2", {1, 98, 99}}};
991+
task =
992+
assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles);
993+
const auto& taskStats2 = task->taskStats();
994+
995+
ASSERT_EQ(getAggregatedRuntimeMetric(taskStats2, "iceberg.numSplits"), 2);
996+
ASSERT_EQ(getAggregatedRuntimeMetric(taskStats2, "iceberg.numDeletes"), 7);
997+
}
950998
} // namespace facebook::velox::connector::hive::iceberg

velox/docs/develop/debugging/print-plan-with-stats.rst

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,43 @@ TableScan operator shows how many rows were processed by pushing down aggregatio
283283
.. code-block::
284284
285285
loadedToValueHook sum: 50000, count: 5, min: 10000, max: 10000
286+
287+
For Iceberg tables, TableScan operator reports Iceberg-specific statistics:
288+
289+
.. code-block::
290+
291+
-> TableScan[Table: iceberg_table]
292+
iceberg.numSplits sum: 2, count: 1, min: 2, max: 2
293+
iceberg.numDeletes sum: 7, count: 1, min: 7, max: 7
294+
295+
The `iceberg.numSplits` metric shows the total number of Iceberg splits processed,
296+
while `iceberg.numDeletes` shows the total number of rows deleted.
297+
These metrics help understand the overhead of Iceberg's delete
298+
operations and split processing.
299+
300+
**Note:** These Iceberg metrics are collected by the IcebergSplitReader and are
301+
only available in the TableScan operator statistics after all splits have been
302+
processed (when the query completes or when no more splits are available).
303+
304+
To programmatically aggregate these metrics across all operators in a task:
305+
306+
.. code-block:: cpp
307+
308+
auto getAggregatedRuntimeMetric = [](const exec::TaskStats& taskStats,
309+
const std::string& metricName) -> int64_t {
310+
int64_t total = 0;
311+
for (const auto& pipelineStats : taskStats.pipelineStats) {
312+
for (const auto& operatorStats : pipelineStats.operatorStats) {
313+
auto it = operatorStats.runtimeStats.find(metricName);
314+
if (it != operatorStats.runtimeStats.end()) {
315+
total += it->second.sum;
316+
}
317+
}
318+
}
319+
return total;
320+
};
321+
322+
// Example usage:
323+
const auto& taskStats = task->taskStats();
324+
int64_t totalSplits = getAggregatedRuntimeMetric(taskStats, "iceberg.numSplits");
325+
int64_t totalDeletes = getAggregatedRuntimeMetric(taskStats, "iceberg.numDeletes");

0 commit comments

Comments
 (0)