Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkgs/cli/src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const Chain = configs.Chain;
const ChainOptions = configs.ChainOptions;

const utils_lib = @import("@zeam/utils");
const zeam_metrics = @import("@zeam/metrics");

const database = @import("@zeam/database");

Expand Down Expand Up @@ -305,6 +306,10 @@ fn mainInner() !void {
return err;
};

// Set node lifecycle metrics
zeam_metrics.metrics.lean_node_info.set(.{ .name = "zeam", .version = build_options.version }, 1) catch {};
zeam_metrics.metrics.lean_node_start_time_seconds.set(@intCast(std.time.timestamp()));

// Create logger config for API server
var api_logger_config = utils_lib.getLoggerConfig(console_log_level, utils_lib.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = beamcmd.data_dir, .fileName = log_filename, .monocolorFile = monocolor_file_log });

Expand Down
5 changes: 5 additions & 0 deletions pkgs/cli/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const database = @import("@zeam/database");
const json = std.json;
const utils = @import("@zeam/utils");
const ssz = @import("ssz");
const zeam_metrics = @import("@zeam/metrics");
const build_options = @import("build_options");

// Structure to hold parsed ENR fields from validator-config.yaml
const EnrFields = struct {
Expand Down Expand Up @@ -237,6 +239,9 @@ pub const Node = struct {
// Start API server after chain is initialized so we can pass the chain pointer
if (options.metrics_enable) {
try api.init(allocator);
// Set node lifecycle metrics
zeam_metrics.metrics.lean_node_info.set(.{ .name = "zeam", .version = build_options.version }, 1) catch {};
zeam_metrics.metrics.lean_node_start_time_seconds.set(@intCast(std.time.timestamp()));
try api_server.startAPIServer(allocator, options.api_port, options.logger_config, self.beam_node.chain);
}
}
Expand Down
30 changes: 30 additions & 0 deletions pkgs/metrics/src/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ const Metrics = struct {
lean_connected_peers: LeanConnectedPeersGauge,
lean_peer_connection_events_total: PeerConnectionEventsCounter,
lean_peer_disconnection_events_total: PeerDisconnectionEventsCounter,
// Node lifecycle metrics
lean_node_info: LeanNodeInfoGauge,
lean_node_start_time_seconds: LeanNodeStartTimeGauge,
lean_current_slot: LeanCurrentSlotGauge,
lean_safe_target_slot: LeanSafeTargetSlotGauge,
// Fork choice reorg metrics
lean_fork_choice_reorgs_total: LeanForkChoiceReorgsTotalCounter,
lean_fork_choice_reorg_depth: LeanForkChoiceReorgDepthHistogram,
// Finalization metrics
lean_finalizations_total: LeanFinalizationsTotalCounter,

const ChainHistogram = metrics_lib.Histogram(f32, &[_]f32{ 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 });
const BlockProcessingHistogram = metrics_lib.Histogram(f32, &[_]f32{ 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 });
Expand All @@ -73,6 +83,16 @@ const Metrics = struct {
const LeanConnectedPeersGauge = metrics_lib.Gauge(u64);
const PeerConnectionEventsCounter = metrics_lib.CounterVec(u64, struct { direction: []const u8, result: []const u8 });
const PeerDisconnectionEventsCounter = metrics_lib.CounterVec(u64, struct { direction: []const u8, reason: []const u8 });
// Node lifecycle metric types
const LeanNodeInfoGauge = metrics_lib.GaugeVec(u64, struct { name: []const u8, version: []const u8 });
const LeanNodeStartTimeGauge = metrics_lib.Gauge(u64);
const LeanCurrentSlotGauge = metrics_lib.Gauge(u64);
const LeanSafeTargetSlotGauge = metrics_lib.Gauge(u64);
// Fork choice reorg metric types
const LeanForkChoiceReorgsTotalCounter = metrics_lib.Counter(u64);
const LeanForkChoiceReorgDepthHistogram = metrics_lib.Histogram(f32, &[_]f32{ 1, 2, 3, 5, 7, 10, 20, 30, 50, 100 });
// Finalization metric types
const LeanFinalizationsTotalCounter = metrics_lib.CounterVec(u64, struct { result: []const u8 });
};

/// Timer struct returned to the application.
Expand Down Expand Up @@ -247,6 +267,16 @@ pub fn init(allocator: std.mem.Allocator) !void {
.lean_connected_peers = Metrics.LeanConnectedPeersGauge.init("lean_connected_peers", .{ .help = "Number of currently connected peers." }, .{}),
.lean_peer_connection_events_total = try Metrics.PeerConnectionEventsCounter.init(allocator, "lean_peer_connection_events_total", .{ .help = "Total peer connection events by direction and result." }, .{}),
.lean_peer_disconnection_events_total = try Metrics.PeerDisconnectionEventsCounter.init(allocator, "lean_peer_disconnection_events_total", .{ .help = "Total peer disconnection events by direction and reason." }, .{}),
// Node lifecycle metrics
.lean_node_info = try Metrics.LeanNodeInfoGauge.init(allocator, "lean_node_info", .{ .help = "Node information (always 1)." }, .{}),
.lean_node_start_time_seconds = Metrics.LeanNodeStartTimeGauge.init("lean_node_start_time_seconds", .{ .help = "Unix timestamp when the node started." }, .{}),
.lean_current_slot = Metrics.LeanCurrentSlotGauge.init("lean_current_slot", .{ .help = "Current slot of the lean chain based on wall clock." }, .{}),
.lean_safe_target_slot = Metrics.LeanSafeTargetSlotGauge.init("lean_safe_target_slot", .{ .help = "Safe target slot with 2/3 weight threshold." }, .{}),
// Fork choice reorg metrics
.lean_fork_choice_reorgs_total = Metrics.LeanForkChoiceReorgsTotalCounter.init("lean_fork_choice_reorgs_total", .{ .help = "Total number of fork choice reorganizations." }, .{}),
.lean_fork_choice_reorg_depth = Metrics.LeanForkChoiceReorgDepthHistogram.init("lean_fork_choice_reorg_depth", .{ .help = "Depth of fork choice reorgs in blocks." }, .{}),
// Finalization metrics
.lean_finalizations_total = try Metrics.LeanFinalizationsTotalCounter.init(allocator, "lean_finalizations_total", .{ .help = "Total finalization attempts by result." }, .{}),
};

// Set context for histogram wrappers (observe functions already assigned at compile time)
Expand Down
8 changes: 8 additions & 0 deletions pkgs/node/src/chain.zig
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ pub const BeamChain = struct {
const slot = @divFloor(time_intervals, constants.INTERVALS_PER_SLOT);
const interval = time_intervals % constants.INTERVALS_PER_SLOT;

// Update current slot metric (wall-clock time slot)
zeam_metrics.metrics.lean_current_slot.set(slot);

var has_proposal = false;
if (interval == 0) {
const num_validators: usize = @intCast(self.config.genesis.numValidators());
Expand Down Expand Up @@ -760,6 +763,8 @@ pub const BeamChain = struct {
// note use presaved local last_emitted_finalized as self.last_emitted_finalized has been updated above
if (latest_finalized.slot > last_emitted_finalized.slot) {
self.processFinalizationAdvancement(last_emitted_finalized, latest_finalized, pruneForkchoice) catch |err| {
// Record failed finalization attempt
zeam_metrics.metrics.lean_finalizations_total.incr(.{ .result = "error" }) catch {};
self.module_logger.err("failed to process finalization advancement from slot {d} to {d}: {any}", .{
last_emitted_finalized.slot,
latest_finalized.slot,
Expand Down Expand Up @@ -939,6 +944,9 @@ pub const BeamChain = struct {
// }
// }

// Record successful finalization
zeam_metrics.metrics.lean_finalizations_total.incr(.{ .result = "success" }) catch {};

self.module_logger.debug("finalization advanced previousFinalized slot={d} to latestFinalized slot={d}", .{ previousFinalized.slot, latestFinalized.slot });
}

Expand Down
75 changes: 75 additions & 0 deletions pkgs/node/src/forkchoice.zig
Original file line number Diff line number Diff line change
Expand Up @@ -750,15 +750,90 @@ pub const ForkChoice = struct {
}

pub fn updateHead(self: *Self) !ProtoBlock {
const previous_head = self.head;
self.head = try self.computeFCHead(true, 0);

// Update the lean_head_slot metric
zeam_metrics.metrics.lean_head_slot.set(self.head.slot);

// Detect reorg: if head changed and previous head is not an ancestor of new head
if (!std.mem.eql(u8, &self.head.blockRoot, &previous_head.blockRoot)) {
const is_extension = self.isAncestorOf(previous_head.blockRoot, self.head.blockRoot);
if (!is_extension) {
// Reorg detected - previous head is NOT an ancestor of new head
const depth = self.calculateReorgDepth(previous_head.blockRoot, self.head.blockRoot);
zeam_metrics.metrics.lean_fork_choice_reorgs_total.incr();
zeam_metrics.metrics.lean_fork_choice_reorg_depth.observe(@floatFromInt(depth));
self.logger.info("fork choice reorg detected: depth={d} old_head_slot={d} new_head_slot={d}", .{
depth,
previous_head.slot,
self.head.slot,
});
}
}

return self.head;
}

/// Checks if potential_ancestor is an ancestor of descendant by walking up parent chain.
/// Note: descendant must exist in protoArray (it comes from computeFCHead which retrieves
/// it directly from protoArray.nodes). If not found, it indicates a bug in the code.
fn isAncestorOf(self: *Self, potential_ancestor: types.Root, descendant: types.Root) bool {
Comment thread
bomanaps marked this conversation as resolved.
Outdated
// descendant is guaranteed to exist - it comes from computeFCHead() which
// retrieves it directly from protoArray.nodes.
var maybe_idx: ?usize = self.protoArray.indices.get(descendant);
if (maybe_idx == null) unreachable; // invariant violation - descendant must exist

while (maybe_idx) |idx| {
const current_node = self.protoArray.nodes.items[idx];
if (std.mem.eql(u8, &current_node.blockRoot, &potential_ancestor)) {
return true;
}
maybe_idx = current_node.parent;
}
return false;
}

/// Calculate the reorg depth by finding the common ancestor and counting
/// how many blocks were "rolled back" from old head to common ancestor.
fn calculateReorgDepth(self: *Self, old_head_root: types.Root, new_head_root: types.Root) usize {
// Build set of ancestors of new head
var new_head_ancestors = std.AutoHashMap(types.Root, void).init(self.allocator);
defer new_head_ancestors.deinit();

// new_head_root is guaranteed to exist - it comes from computeFCHead() which
// retrieves it directly from protoArray.nodes.
var maybe_idx: ?usize = self.protoArray.indices.get(new_head_root);
if (maybe_idx == null) unreachable; // invariant violation - new_head must exist

while (maybe_idx) |idx| {
const current_node = self.protoArray.nodes.items[idx];
new_head_ancestors.put(current_node.blockRoot, {}) catch {};
maybe_idx = current_node.parent;
}

// Walk up from old head counting blocks until we hit a common ancestor
// old_head_root could potentially be pruned in edge cases, so use defensive return 0
var depth: usize = 0;
var maybe_old_idx: ?usize = self.protoArray.indices.get(old_head_root);
if (maybe_old_idx == null) return 0; // defensive - old head could be pruned

while (maybe_old_idx) |idx| {
const old_node = self.protoArray.nodes.items[idx];
if (new_head_ancestors.contains(old_node.blockRoot)) {
return depth;
}
depth += 1;
maybe_old_idx = old_node.parent;
}
return depth;
}

pub fn updateSafeTarget(self: *Self) !ProtoBlock {
const cutoff_weight = try std.math.divCeil(u64, 2 * self.config.genesis.numValidators(), 3);
self.safeTarget = try self.computeFCHead(false, cutoff_weight);
// Update safe target slot metric
zeam_metrics.metrics.lean_safe_target_slot.set(self.safeTarget.slot);
return self.safeTarget;
}

Expand Down