Skip to content

Commit 059de39

Browse files
ch4r10t33ranshalshuklag11tech
authored
hosting and consuming checkpoint sync server API (#479)
* feat:Implement checkpoint sync: serve finalized state API and client checkpoint sync * chore: updated documentation * fix: Addressed segmentation fault error * fix: pushed missing changes * fix: corrected the race condition with SSE events * fix: Addressed review comments * fix: addressed review comments * fix: Addressed review comments * fix: addressed review comments * Update pkgs/node/src/chain.zig Co-authored-by: g11tech <gajinder@zeam.in> * fix: addressed issue with cached finalised state * chore: Added testing scenario for checkpoint sync * Update pkgs/cli/src/node.zig Co-authored-by: g11tech <gajinder@zeam.in> * fixed comments * chore renamed function --------- Co-authored-by: Anshal Shukla <53994948+anshalshukla@users.noreply.github.com> Co-authored-by: g11tech <gajinder@zeam.in>
1 parent 12b2267 commit 059de39

File tree

9 files changed

+591
-67
lines changed

9 files changed

+591
-67
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,14 @@ To run a local devnet with multiple nodes for testing and development, see the [
105105

106106
or checkout the [lean-quickstart](https://github.com/blockblaz/lean-quickstart) submodule (`git submodule update --init lean-quickstart`) use the handy command line tool to spin up two nodes for local interop.
107107

108+
#### Checkpoint Sync
109+
110+
Zeam supports checkpoint sync for faster initial synchronization. You can start a node from a trusted finalized checkpoint state by using the `--checkpoint-sync-url` parameter. See the [Local Devnet Setup Guide](./pkgs/cli/test/fixtures/README.md#checkpoint-sync) for detailed documentation and examples.
111+
108112
### Testing Scenarios
109113

110114
1. Test blocks by root [parent-sync](./resources/parent-sync.md)
111-
115+
2. Test checkpoint sync [checkpoint-sync](./resources/checkpoint-sync.md)
112116

113117
### Reporting Issues
114118

pkgs/cli/src/api_server.zig

Lines changed: 140 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,119 @@ const std = @import("std");
22
const api = @import("@zeam/api");
33
const constants = @import("constants.zig");
44
const event_broadcaster = api.event_broadcaster;
5-
6-
/// Simple metrics server that runs in a background thread
7-
pub fn startAPIServer(allocator: std.mem.Allocator, port: u16) !void {
8-
// Initialize the global event broadcaster
5+
const types = @import("@zeam/types");
6+
const ssz = @import("ssz");
7+
const utils_lib = @import("@zeam/utils");
8+
const LoggerConfig = utils_lib.ZeamLoggerConfig;
9+
const ModuleLogger = utils_lib.ModuleLogger;
10+
const node_lib = @import("@zeam/node");
11+
const BeamChain = node_lib.chainFactory.BeamChain;
12+
13+
/// API server that runs in a background thread
14+
/// Handles metrics, SSE events, health checks, and checkpoint state endpoints
15+
/// chain is optional - if null, the finalized state endpoint will return 503
16+
/// (API server starts before chain initialization, so chain may not be available yet)
17+
pub fn startAPIServer(allocator: std.mem.Allocator, port: u16, logger_config: *LoggerConfig, chain: ?*BeamChain) !void {
18+
// Initialize the global event broadcaster for SSE events
19+
// This is idempotent - safe to call even if already initialized elsewhere (e.g., node.zig)
920
try event_broadcaster.initGlobalBroadcaster(allocator);
1021

11-
// Create a simple HTTP server context
12-
const ctx = try allocator.create(SimpleMetricsServer);
22+
// Create a logger instance for the API server
23+
const logger = logger_config.logger(.api_server);
24+
25+
// Create the API server context
26+
const ctx = try allocator.create(ApiServer);
1327
errdefer allocator.destroy(ctx);
1428
ctx.* = .{
1529
.allocator = allocator,
1630
.port = port,
31+
.logger = logger,
32+
.chain = chain,
1733
};
1834

1935
// Start server in background thread
20-
const thread = try std.Thread.spawn(.{}, SimpleMetricsServer.run, .{ctx});
36+
const thread = try std.Thread.spawn(.{}, ApiServer.run, .{ctx});
2137
thread.detach();
2238

23-
std.log.info("Metrics server started on port {d}", .{port});
39+
logger.info("API server thread spawned for port {d}", .{port});
2440
}
2541

26-
/// Handle individual HTTP connections in a separate thread
27-
fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Allocator) void {
28-
defer connection.stream.close();
42+
/// API server context
43+
const ApiServer = struct {
44+
allocator: std.mem.Allocator,
45+
port: u16,
46+
logger: ModuleLogger,
47+
chain: ?*BeamChain,
48+
49+
const Self = @This();
2950

30-
var buffer: [4096]u8 = undefined;
31-
var http_server = std.http.Server.init(connection, &buffer);
32-
var request = http_server.receiveHead() catch |err| {
33-
std.log.warn("Failed to receive HTTP head: {}", .{err});
34-
return;
35-
};
51+
fn run(self: *Self) void {
52+
// `startAPIServer` creates this, so we need to free it here
53+
defer self.allocator.destroy(self);
54+
55+
const address = std.net.Address.parseIp4("0.0.0.0", self.port) catch |err| {
56+
self.logger.err("failed to parse server address 0.0.0.0:{d}: {}", .{ self.port, err });
57+
return;
58+
};
59+
60+
var server = address.listen(.{ .reuse_address = true }) catch |err| {
61+
self.logger.err("failed to listen on port {d}: {}", .{ self.port, err });
62+
return;
63+
};
64+
defer server.deinit();
65+
66+
self.logger.info("HTTP server listening on http://0.0.0.0:{d}", .{self.port});
3667

37-
// Route handling
38-
if (std.mem.eql(u8, request.head.target, "/events")) {
39-
// Handle SSE connection - this will keep the connection alive
40-
SimpleMetricsServer.handleSSEEvents(connection.stream, allocator) catch |err| {
41-
std.log.warn("SSE connection failed: {}", .{err});
68+
while (true) {
69+
const connection = server.accept() catch continue;
70+
71+
// For SSE connections, we need to handle them differently
72+
// We'll spawn a new thread for each connection to handle persistence
73+
_ = std.Thread.spawn(.{}, Self.handleConnection, .{ self, connection }) catch |err| {
74+
self.logger.warn("failed to spawn connection handler: {}", .{err});
75+
connection.stream.close();
76+
continue;
77+
};
78+
}
79+
}
80+
81+
/// Handle individual HTTP connections in a separate thread
82+
fn handleConnection(self: *const Self, connection: std.net.Server.Connection) void {
83+
defer connection.stream.close();
84+
85+
var buffer: [4096]u8 = undefined;
86+
var http_server = std.http.Server.init(connection, &buffer);
87+
var request = http_server.receiveHead() catch |err| {
88+
self.logger.warn("failed to receive HTTP head: {}", .{err});
89+
return;
4290
};
43-
} else if (std.mem.eql(u8, request.head.target, "/metrics")) {
44-
// Handle metrics request
45-
var metrics_output = std.ArrayList(u8).init(allocator);
91+
92+
// Route handling
93+
if (std.mem.eql(u8, request.head.target, "/events")) {
94+
// Handle SSE connection - this will keep the connection alive
95+
self.handleSSEEvents(connection.stream) catch |err| {
96+
self.logger.warn("SSE connection failed: {}", .{err});
97+
};
98+
} else if (std.mem.eql(u8, request.head.target, "/metrics")) {
99+
// Handle metrics request
100+
self.handleMetrics(&request);
101+
} else if (std.mem.eql(u8, request.head.target, "/health")) {
102+
// Handle health check
103+
self.handleHealth(&request);
104+
} else if (std.mem.eql(u8, request.head.target, "/lean/states/finalized")) {
105+
// Handle finalized checkpoint state endpoint
106+
self.handleFinalizedCheckpointState(&request) catch |err| {
107+
self.logger.warn("failed to handle finalized checkpoint state request: {}", .{err});
108+
_ = request.respond("Internal Server Error\n", .{ .status = .internal_server_error }) catch {};
109+
};
110+
} else {
111+
_ = request.respond("Not Found\n", .{ .status = .not_found }) catch {};
112+
}
113+
}
114+
115+
/// Handle metrics endpoint
116+
fn handleMetrics(self: *const Self, request: *std.http.Server.Request) void {
117+
var metrics_output = std.ArrayList(u8).init(self.allocator);
46118
defer metrics_output.deinit();
47119

48120
api.writeMetrics(metrics_output.writer()) catch {
@@ -55,48 +127,61 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al
55127
.{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" },
56128
},
57129
}) catch {};
58-
} else if (std.mem.eql(u8, request.head.target, "/health")) {
59-
// Handle health check
60-
const response = "{\"status\":\"healthy\",\"service\":\"zeam-metrics\"}";
130+
}
131+
132+
/// Handle health check endpoint
133+
fn handleHealth(_: *const Self, request: *std.http.Server.Request) void {
134+
const response = "{\"status\":\"healthy\",\"service\":\"zeam-api\"}";
61135
_ = request.respond(response, .{
62136
.extra_headers = &.{
63137
.{ .name = "content-type", .value = "application/json; charset=utf-8" },
64138
},
65139
}) catch {};
66-
} else {
67-
_ = request.respond("Not Found\n", .{ .status = .not_found }) catch {};
68140
}
69-
}
70141

71-
/// Simple metrics server context
72-
const SimpleMetricsServer = struct {
73-
allocator: std.mem.Allocator,
74-
port: u16,
142+
/// Handle finalized checkpoint state endpoint
143+
/// Serves the finalized checkpoint lean state (BeamState) as SSZ octet-stream at /lean/states/finalized
144+
fn handleFinalizedCheckpointState(self: *const Self, request: *std.http.Server.Request) !void {
145+
// Get the chain (may be null if API server started before chain initialization)
146+
const chain = self.chain orelse {
147+
_ = request.respond("Service Unavailable: Chain not initialized\n", .{ .status = .service_unavailable }) catch {};
148+
return;
149+
};
75150

76-
fn run(self: *SimpleMetricsServer) !void {
77-
// `startMetricsServer` creates this, so we need to free it here
78-
defer self.allocator.destroy(self);
79-
const address = try std.net.Address.parseIp4("0.0.0.0", self.port);
80-
var server = try address.listen(.{ .reuse_address = true });
81-
defer server.deinit();
151+
// Get finalized state from chain (chain handles its own locking internally)
152+
const finalized_lean_state = chain.getFinalizedState() orelse {
153+
_ = request.respond("Not Found: Finalized checkpoint lean state not available\n", .{ .status = .not_found }) catch {};
154+
return;
155+
};
82156

83-
std.log.info("HTTP server listening on http://0.0.0.0:{d}", .{self.port});
157+
// Serialize lean state (BeamState) to SSZ
158+
var ssz_output = std.ArrayList(u8).init(self.allocator);
159+
defer ssz_output.deinit();
84160

85-
while (true) {
86-
const connection = server.accept() catch continue;
161+
ssz.serialize(types.BeamState, finalized_lean_state.*, &ssz_output) catch |err| {
162+
self.logger.err("failed to serialize finalized lean state to SSZ: {}", .{err});
163+
_ = request.respond("Internal Server Error: Serialization failed\n", .{ .status = .internal_server_error }) catch {};
164+
return;
165+
};
87166

88-
// For SSE connections, we need to handle them differently
89-
// We'll spawn a new thread for each connection to handle persistence
90-
_ = std.Thread.spawn(.{}, handleConnection, .{ connection, self.allocator }) catch |err| {
91-
std.log.warn("Failed to spawn connection handler: {}", .{err});
92-
connection.stream.close();
93-
continue;
94-
};
95-
}
167+
// Format content-length header value
168+
var content_length_buf: [32]u8 = undefined;
169+
const content_length_str = try std.fmt.bufPrint(&content_length_buf, "{d}", .{ssz_output.items.len});
170+
171+
// Respond with lean state (BeamState) as SSZ octet-stream
172+
_ = request.respond(ssz_output.items, .{
173+
.extra_headers = &.{
174+
.{ .name = "content-type", .value = "application/octet-stream" },
175+
.{ .name = "content-length", .value = content_length_str },
176+
},
177+
}) catch |err| {
178+
self.logger.warn("failed to respond with finalized lean state: {}", .{err});
179+
return err;
180+
};
96181
}
97182

98-
fn handleSSEEvents(stream: std.net.Stream, allocator: std.mem.Allocator) !void {
99-
_ = allocator;
183+
/// Handle SSE events endpoint
184+
fn handleSSEEvents(self: *const Self, stream: std.net.Stream) !void {
100185
// Set SSE headers manually by writing HTTP response
101186
const sse_headers = "HTTP/1.1 200 OK\r\n" ++
102187
"Content-Type: text/event-stream\r\n" ++
@@ -122,7 +207,7 @@ const SimpleMetricsServer = struct {
122207
// Send periodic heartbeat to keep connection alive
123208
const heartbeat = ": heartbeat\n\n";
124209
stream.writeAll(heartbeat) catch |err| {
125-
std.log.warn("SSE connection closed: {}", .{err});
210+
self.logger.warn("SSE connection closed: {}", .{err});
126211
break;
127212
};
128213

pkgs/cli/src/main.zig

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub const NodeCommand = struct {
5656
@"sig-keys-dir": []const u8 = "hash-sig-keys",
5757
@"network-dir": []const u8 = "./network",
5858
@"data-dir": []const u8 = constants.DEFAULT_DATA_DIR,
59+
@"checkpoint-sync-url": ?[]const u8 = null,
5960

6061
pub const __shorts__ = .{
6162
.help = .h,
@@ -73,6 +74,7 @@ pub const NodeCommand = struct {
7374
.override_genesis_time = "Override genesis time in the config.yaml",
7475
.@"sig-keys-dir" = "Relative path of custom genesis to signature key directory",
7576
.@"data-dir" = "Path to the data directory",
77+
.@"checkpoint-sync-url" = "URL to fetch finalized checkpoint state from for checkpoint sync (e.g., http://localhost:5052/lean/states/finalized)",
7678
.help = "Show help information for the node command",
7779
};
7880
};
@@ -303,8 +305,12 @@ fn mainInner() !void {
303305
return err;
304306
};
305307

308+
// Create logger config for API server
309+
var api_logger_config = utils_lib.getLoggerConfig(console_log_level, utils_lib.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = beamcmd.data_dir, .fileName = log_filename, .monocolorFile = monocolor_file_log });
310+
306311
// Start metrics HTTP server
307-
api_server.startAPIServer(allocator, beamcmd.metricsPort) catch |err| {
312+
// Pass null for chain - in .beam command mode, chains are created later and the checkpoint sync endpoint won't be available
313+
api_server.startAPIServer(allocator, beamcmd.metricsPort, &api_logger_config, null) catch |err| {
308314
ErrorHandler.logErrorWithDetails(err, "start API server", .{ .port = beamcmd.metricsPort });
309315
return err;
310316
};

0 commit comments

Comments
 (0)