Skip to content

Commit e577fe0

Browse files
anshalshuklag11techzclawzclaude
authored
fix: robust syncing, misc changes (#591)
* fix: robust syncing * fix recency calc for fc tree log * chain: queue gossip blocks that arrive before forkchoice ticks to the… (#593) * chain: queue gossip blocks that arrive before forkchoice ticks to their slot When a peer gossips a block for slot N and our local interval timer hasn't yet advanced the forkchoice clock to N*INTERVALS_PER_SLOT, forkChoice.onBlockUnlocked returns FutureSlot and the block is dropped. Fix by queuing such blocks in BeamChain.pending_blocks (an ArrayList of SSZ-cloned SignedBlockWithAttestation). After validateBlock passes, we check if block.slot * INTERVALS_PER_SLOT > fcStore.time; if so the block is cloned into the queue instead of being processed immediately. In onInterval, after forkChoice.onInterval has advanced the clock, the new processPendingBlocks helper iterates the queue and replays every entry whose slot is now reachable (slot * INTERVALS_PER_SLOT <= fc_time) by calling onBlock + onBlockFollowup in the normal way. Entries still in the future stay in the queue for the next tick. https://claude.ai/code/session_0116R4pk8fiu5Ve5PAfjE3op * node: call processPendingBlocks from onInterval and fetch missing roots Move the processPendingBlocks call out of chain.onInterval and into node.onInterval so that any missing attestation-head roots discovered while replaying queued blocks can be immediately fetched via fetchBlockByRoots, mirroring the pattern used after onGossip in handleGossipProcessingResult. - Make processPendingBlocks pub and change its return type from void to []types.Root, accumulating all missing roots across every replayed block into a single caller-owned slice. - Remove the processPendingBlocks call from chain.onInterval (it now belongs to the node layer). - In node.onInterval, call chain.processPendingBlocks() right after chain.onInterval() and pass the returned roots to fetchBlockByRoots. https://claude.ai/code/session_0116R4pk8fiu5Ve5PAfjE3op --------- Co-authored-by: Claude <noreply@anthropic.com> * further improvs * remove redudant check and improve the comment --------- Co-authored-by: harkamal <gajinder@zeam.in> Co-authored-by: zclawz <claw@zeam.in> Co-authored-by: Claude <noreply@anthropic.com>
1 parent f05c9dd commit e577fe0

File tree

10 files changed

+344
-65
lines changed

10 files changed

+344
-65
lines changed

.github/workflows/ci.yml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ jobs:
2828
uses: actions/cache@v4
2929
with:
3030
path: ~/.cache/zig
31-
key: ${{ runner.os }}-zig-packages-${{ hashFiles('build.zig.zon') }}
31+
key: ${{ runner.os }}-zig-${{ hashFiles('build.zig.zon', 'build.zig') }}
3232
restore-keys: |
33-
${{ runner.os }}-zig-packages-
33+
${{ runner.os }}-zig-
3434
3535
- name: Fetch Zig dependencies with retry
3636
run: |
@@ -97,9 +97,9 @@ jobs:
9797
uses: actions/cache@v4
9898
with:
9999
path: ~/.cache/zig
100-
key: ${{ runner.os }}-zig-packages-${{ hashFiles('build.zig.zon') }}
100+
key: ${{ runner.os }}-zig-${{ hashFiles('build.zig.zon', 'build.zig') }}
101101
restore-keys: |
102-
${{ runner.os }}-zig-packages-
102+
${{ runner.os }}-zig-
103103
104104
- name: Cache Rust dependencies
105105
uses: Swatinem/rust-cache@v2
@@ -151,9 +151,9 @@ jobs:
151151
uses: actions/cache@v4
152152
with:
153153
path: ~/.cache/zig
154-
key: ${{ runner.os }}-zig-packages-${{ hashFiles('build.zig.zon') }}
154+
key: ${{ runner.os }}-zig-${{ hashFiles('build.zig.zon', 'build.zig') }}
155155
restore-keys: |
156-
${{ runner.os }}-zig-packages-
156+
${{ runner.os }}-zig-
157157
158158
- name: Cache Rust dependencies
159159
uses: Swatinem/rust-cache@v2
@@ -205,9 +205,9 @@ jobs:
205205
uses: actions/cache@v4
206206
with:
207207
path: ~/.cache/zig
208-
key: ${{ runner.os }}-zig-packages-${{ hashFiles('build.zig.zon') }}
208+
key: ${{ runner.os }}-zig-${{ hashFiles('build.zig.zon', 'build.zig') }}
209209
restore-keys: |
210-
${{ runner.os }}-zig-packages-
210+
${{ runner.os }}-zig-
211211
212212
- name: Cache Rust dependencies
213213
uses: Swatinem/rust-cache@v2
@@ -276,9 +276,9 @@ jobs:
276276
uses: actions/cache@v4
277277
with:
278278
path: ~/.cache/zig
279-
key: ${{ runner.os }}-zig-packages-${{ hashFiles('build.zig.zon') }}
279+
key: ${{ runner.os }}-zig-${{ hashFiles('build.zig.zon', 'build.zig') }}
280280
restore-keys: |
281-
${{ runner.os }}-zig-packages-
281+
${{ runner.os }}-zig-
282282
283283
- name: Cache Rust dependencies
284284
uses: Swatinem/rust-cache@v2
@@ -315,9 +315,9 @@ jobs:
315315
uses: actions/cache@v4
316316
with:
317317
path: ~/.cache/zig
318-
key: ${{ runner.os }}-zig-packages-${{ hashFiles('build.zig.zon') }}
318+
key: ${{ runner.os }}-zig-${{ hashFiles('build.zig.zon', 'build.zig') }}
319319
restore-keys: |
320-
${{ runner.os }}-zig-packages-
320+
${{ runner.os }}-zig-
321321
322322
- name: Cache Rust dependencies
323323
uses: Swatinem/rust-cache@v2

build.zig

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,10 @@ pub fn build(b: *Builder) !void {
127127
.optimize = optimize,
128128
}).module("yaml");
129129

130-
// add rocksdb — always build with ReleaseSafe to avoid LLD UnableToWriteArchive
131-
// on Debug builds (the Debug archive exceeds LLD's size limits on CI runners)
130+
// add rocksdb
132131
const rocksdb = b.dependency("rocksdb", .{
133132
.target = target,
134-
.optimize = .ReleaseSafe,
133+
.optimize = optimize,
135134
}).module("bindings");
136135

137136
// add snappyz
@@ -552,6 +551,8 @@ pub fn build(b: *Builder) !void {
552551
const database_tests = b.addTest(.{
553552
.root_module = zeam_database,
554553
});
554+
database_tests.step.dependOn(&build_rust_lib_steps.step);
555+
addRustGlueLib(b, database_tests, target, prover);
555556
const run_database_tests = b.addRunArtifact(database_tests);
556557
setTestRunLabelFromCompile(b, run_database_tests, database_tests);
557558
test_step.dependOn(&run_database_tests.step);

pkgs/api/src/event_broadcaster.zig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ test "global broadcaster functionality" {
272272
.root = [_]u8{5} ** 32,
273273
};
274274

275-
const just_event = try events.NewJustificationEvent.fromCheckpoint(allocator, checkpoint, 123);
275+
const just_event = try events.NewJustificationEvent.fromCheckpoint(allocator, checkpoint, 123, null);
276276

277277
var chain_event = events.ChainEvent{ .new_justification = just_event };
278278

pkgs/api/src/events.zig

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub const NewHeadEvent = struct {
2626
parent_root: []const u8,
2727
state_root: []const u8,
2828
timely: bool,
29+
node_id: ?u32,
2930

3031
pub fn fromProtoBlock(allocator: Allocator, proto_block: types.ProtoBlock) !NewHeadEvent {
3132
const block_root_hex = try std.fmt.allocPrint(allocator, "0x{x}", .{&proto_block.blockRoot});
@@ -38,6 +39,7 @@ pub const NewHeadEvent = struct {
3839
.parent_root = parent_root_hex,
3940
.state_root = state_root_hex,
4041
.timely = proto_block.timeliness,
42+
.node_id = null,
4143
};
4244
}
4345

@@ -48,6 +50,9 @@ pub const NewHeadEvent = struct {
4850
try obj.put("parent_root", json.Value{ .string = self.parent_root });
4951
try obj.put("state_root", json.Value{ .string = self.state_root });
5052
try obj.put("timely", json.Value{ .bool = self.timely });
53+
if (self.node_id) |node_id| {
54+
try obj.put("node_id", json.Value{ .integer = @as(i64, @intCast(node_id)) });
55+
}
5156
return json.Value{ .object = obj };
5257
}
5358

@@ -69,14 +74,16 @@ pub const NewJustificationEvent = struct {
6974
slot: u64,
7075
root: []const u8,
7176
justified_slot: u64,
77+
node_id: ?u32,
7278

73-
pub fn fromCheckpoint(allocator: Allocator, checkpoint: Checkpoint, current_slot: u64) !NewJustificationEvent {
79+
pub fn fromCheckpoint(allocator: Allocator, checkpoint: Checkpoint, current_slot: u64, node_id: ?u32) !NewJustificationEvent {
7480
const root_hex = try std.fmt.allocPrint(allocator, "0x{x}", .{&checkpoint.root});
7581

7682
return NewJustificationEvent{
7783
.slot = current_slot,
7884
.root = root_hex,
7985
.justified_slot = checkpoint.slot,
86+
.node_id = node_id,
8087
};
8188
}
8289

@@ -85,6 +92,9 @@ pub const NewJustificationEvent = struct {
8592
try obj.put("slot", json.Value{ .integer = @as(i64, @intCast(self.slot)) });
8693
try obj.put("root", json.Value{ .string = self.root });
8794
try obj.put("justified_slot", json.Value{ .integer = @as(i64, @intCast(self.justified_slot)) });
95+
if (self.node_id) |node_id| {
96+
try obj.put("node_id", json.Value{ .integer = @as(i64, @intCast(node_id)) });
97+
}
8898
return json.Value{ .object = obj };
8999
}
90100

@@ -104,14 +114,16 @@ pub const NewFinalizationEvent = struct {
104114
slot: u64,
105115
root: []const u8,
106116
finalized_slot: u64,
117+
node_id: ?u32,
107118

108-
pub fn fromCheckpoint(allocator: Allocator, checkpoint: Checkpoint, current_slot: u64) !NewFinalizationEvent {
119+
pub fn fromCheckpoint(allocator: Allocator, checkpoint: Checkpoint, current_slot: u64, node_id: ?u32) !NewFinalizationEvent {
109120
const root_hex = try std.fmt.allocPrint(allocator, "0x{x}", .{&checkpoint.root});
110121

111122
return NewFinalizationEvent{
112123
.slot = current_slot,
113124
.root = root_hex,
114125
.finalized_slot = checkpoint.slot,
126+
.node_id = node_id,
115127
};
116128
}
117129

@@ -120,6 +132,9 @@ pub const NewFinalizationEvent = struct {
120132
try obj.put("slot", json.Value{ .integer = @as(i64, @intCast(self.slot)) });
121133
try obj.put("root", json.Value{ .string = self.root });
122134
try obj.put("finalized_slot", json.Value{ .integer = @as(i64, @intCast(self.finalized_slot)) });
135+
if (self.node_id) |node_id| {
136+
try obj.put("node_id", json.Value{ .integer = @as(i64, @intCast(node_id)) });
137+
}
123138
return json.Value{ .object = obj };
124139
}
125140

@@ -217,7 +232,7 @@ test "serialize new justification event" {
217232
.root = [_]u8{5} ** 32,
218233
};
219234

220-
var just_event = try NewJustificationEvent.fromCheckpoint(allocator, checkpoint, 123);
235+
var just_event = try NewJustificationEvent.fromCheckpoint(allocator, checkpoint, 123, null);
221236
defer just_event.deinit(allocator);
222237

223238
const chain_event = ChainEvent{ .new_justification = just_event };
@@ -237,7 +252,7 @@ test "serialize new finalization event" {
237252
.root = [_]u8{4} ** 32,
238253
};
239254

240-
var final_event = try NewFinalizationEvent.fromCheckpoint(allocator, checkpoint, 123);
255+
var final_event = try NewFinalizationEvent.fromCheckpoint(allocator, checkpoint, 123, null);
241256
defer final_event.deinit(allocator);
242257

243258
const chain_event = ChainEvent{ .new_finalization = final_event };
@@ -270,15 +285,15 @@ fn makeSampleChainEvent(allocator: Allocator, tag: ChainEventType) !ChainEvent {
270285
.slot = 999_999,
271286
.root = [_]u8{0x12} ** 32,
272287
};
273-
const just_event = try NewJustificationEvent.fromCheckpoint(allocator, checkpoint, 999_999);
288+
const just_event = try NewJustificationEvent.fromCheckpoint(allocator, checkpoint, 999_999, null);
274289
break :blk ChainEvent{ .new_justification = just_event };
275290
},
276291
.new_finalization => blk: {
277292
const checkpoint = Checkpoint{
278293
.slot = 999_999,
279294
.root = [_]u8{0x34} ** 32,
280295
};
281-
const final_event = try NewFinalizationEvent.fromCheckpoint(allocator, checkpoint, 999_999);
296+
const final_event = try NewFinalizationEvent.fromCheckpoint(allocator, checkpoint, 999_999, null);
282297
break :blk ChainEvent{ .new_finalization = final_event };
283298
},
284299
};

pkgs/cli/test/integration.zig

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ const ChainEvent = struct {
227227
event_type: []const u8,
228228
justified_slot: ?u64,
229229
finalized_slot: ?u64,
230+
node_id: ?u32,
230231

231232
/// Free the memory allocated for this event
232233
fn deinit(self: ChainEvent, allocator: std.mem.Allocator) void {
@@ -380,6 +381,7 @@ const SSEClient = struct {
380381

381382
var justified_slot: ?u64 = null;
382383
var finalized_slot: ?u64 = null;
384+
var node_id: ?u32 = null;
383385

384386
if (parsed.value.object.get("justified_slot")) |js| {
385387
switch (js) {
@@ -395,10 +397,18 @@ const SSEClient = struct {
395397
}
396398
}
397399

400+
if (parsed.value.object.get("node_id")) |nid| {
401+
switch (nid) {
402+
.integer => |ival| node_id = @intCast(ival),
403+
else => {},
404+
}
405+
}
406+
398407
return ChainEvent{
399408
.event_type = event_type_owned,
400409
.justified_slot = justified_slot,
401410
.finalized_slot = finalized_slot,
411+
.node_id = node_id,
402412
};
403413
}
404414

@@ -499,6 +509,7 @@ test "CLI beam command with mock network - complete integration test" {
499509

500510
test "SSE events integration test - wait for justification and finalization" {
501511
const allocator = std.testing.allocator;
512+
const node3_id = 2;
502513

503514
// Get executable path
504515
const exe_path = try getZeamExecutable();
@@ -519,18 +530,14 @@ test "SSE events integration test - wait for justification and finalization" {
519530

520531
std.debug.print("INFO: Connected to SSE endpoint, waiting for events...\n", .{});
521532

522-
// Read events until justification, finalization, AND node3 parent sync are verified, or timeout.
523-
// Node3 starts after first finalization and syncs via parent block requests (blocks_by_root).
524-
// We verify sync by waiting for finalization to advance beyond the first finalized slot,
525-
// which proves the chain continued progressing after node3 joined.
526-
const timeout_ms: u64 = 240000; // 240 seconds timeout
533+
// Read events until justification, any finalization, AND explicit node3 finalization sync are verified, or timeout.
534+
// Node3 sync is proven only when node3 itself emits new_finalization with finalized_slot > 0.
535+
const timeout_ms: u64 = 480000; // 480 seconds timeout
527536
const start_ns = std.time.nanoTimestamp();
528537
const deadline_ns = start_ns + timeout_ms * std.time.ns_per_ms;
529538
var got_justification = false;
530539
var got_finalization = false;
531540
var got_node3_sync = false;
532-
var first_finalized_slot: u64 = 0;
533-
var head_count_at_finalization: usize = 0;
534541

535542
var current_ns = std.time.nanoTimestamp();
536543
while (current_ns < deadline_ns and !(got_justification and got_finalization and got_node3_sync)) {
@@ -549,27 +556,22 @@ test "SSE events integration test - wait for justification and finalization" {
549556
// Check for finalization events
550557
if (std.mem.eql(u8, e.event_type, "new_finalization")) {
551558
if (e.finalized_slot) |slot| {
552-
std.debug.print("DEBUG: Found finalization event with slot {}\n", .{slot});
559+
std.debug.print("DEBUG: Found finalization event with slot {} node_id={any}\n", .{ slot, e.node_id });
553560
if (slot > 0 and !got_finalization) {
554561
// First finalization — this triggers node3 to start syncing
555562
got_finalization = true;
556-
first_finalized_slot = slot;
557-
head_count_at_finalization = sse_client.getEventCount("new_head");
558-
std.debug.print("INFO: First finalization at slot {} — node 3 will start syncing via parent block requests\n", .{slot});
559-
std.debug.print("INFO: Head events at finalization: {}\n", .{head_count_at_finalization});
560-
} else if (got_finalization and slot > first_finalized_slot and !got_node3_sync) {
561-
// Finalization advanced beyond the first finalized slot.
562-
// This means the chain continued progressing after node3 joined.
563+
std.debug.print("INFO: Found first finalization with slot {}\n", .{slot});
564+
}
565+
566+
if (!got_node3_sync and slot > 0 and e.node_id != null and e.node_id.? == node3_id) {
563567
got_node3_sync = true;
564-
const head_count_now = sse_client.getEventCount("new_head");
565-
std.debug.print("INFO: Advanced finalization at slot {} (first was {}) — chain progressed after node 3 joined\n", .{ slot, first_finalized_slot });
566-
std.debug.print("INFO: Head events since finalization: {} (total: {})\n", .{ head_count_now - head_count_at_finalization, head_count_now });
568+
std.debug.print("INFO: Found node3 finalization with slot {}\n", .{slot});
567569
}
568-
} else {
569-
std.debug.print("DEBUG: Found finalization event with null slot\n", .{});
570570
}
571571
}
572572

573+
std.debug.print("SUCCESS: SSE events integration test completed — including node 3 finalization sync verification\n", .{});
574+
573575
// IMPORTANT: Free the event memory after processing
574576
e.deinit(allocator);
575577
}

0 commit comments

Comments
 (0)