-
Notifications
You must be signed in to change notification settings - Fork 35
feat: implement snappy for gossip objects follow the spec #240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ const zeam_utils = @import("@zeam/utils"); | |
|
|
||
| const interface = @import("./interface.zig"); | ||
| const NetworkInterface = interface.NetworkInterface; | ||
| const snappyz = @import("snappyz"); | ||
|
|
||
| /// Writes failed deserialization bytes to disk for debugging purposes | ||
| /// Returns the filename if the file was successfully created, null otherwise | ||
|
|
@@ -55,12 +56,23 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const | |
| }; | ||
|
|
||
| const message_bytes: []const u8 = message_ptr[0..message_len]; | ||
|
|
||
| const uncompressed_message = snappyz.decode(zigHandler.allocator, message_bytes) catch |e| { | ||
| zigHandler.logger.err("Error in snappyz decoding the message for topic={s}: {any}", .{ std.mem.span(topic_str), e }); | ||
| if (writeFailedBytes(message_bytes, "snappyz_decode", zigHandler.allocator, null, zigHandler.logger)) |filename| { | ||
| zigHandler.logger.err("Snappyz decode failed - debug file created: {s}", .{filename}); | ||
| } else { | ||
| zigHandler.logger.err("Snappyz decode failed - could not create debug file", .{}); | ||
| } | ||
| return; | ||
| }; | ||
| defer zigHandler.allocator.free(uncompressed_message); | ||
| const message: interface.GossipMessage = switch (topic.gossip_topic) { | ||
| .block => blockmessage: { | ||
| var message_data: types.SignedBeamBlock = undefined; | ||
| ssz.deserialize(types.SignedBeamBlock, message_bytes, &message_data, zigHandler.allocator) catch |e| { | ||
| ssz.deserialize(types.SignedBeamBlock, uncompressed_message, &message_data, zigHandler.allocator) catch |e| { | ||
| zigHandler.logger.err("Error in deserializing the signed block message: {any}", .{e}); | ||
| if (writeFailedBytes(message_bytes, "block", zigHandler.allocator, null, zigHandler.logger)) |filename| { | ||
| if (writeFailedBytes(uncompressed_message, "block", zigHandler.allocator, null, zigHandler.logger)) |filename| { | ||
| zigHandler.logger.err("Block deserialization failed - debug file created: {s}", .{filename}); | ||
| } else { | ||
| zigHandler.logger.err("Block deserialization failed - could not create debug file", .{}); | ||
|
|
@@ -72,9 +84,9 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const | |
| }, | ||
| .vote => votemessage: { | ||
| var message_data: types.SignedVote = undefined; | ||
| ssz.deserialize(types.SignedVote, message_bytes, &message_data, zigHandler.allocator) catch |e| { | ||
| ssz.deserialize(types.SignedVote, uncompressed_message, &message_data, zigHandler.allocator) catch |e| { | ||
| zigHandler.logger.err("Error in deserializing the signed vote message: {any}", .{e}); | ||
| if (writeFailedBytes(message_bytes, "vote", zigHandler.allocator, null, zigHandler.logger)) |filename| { | ||
| if (writeFailedBytes(uncompressed_message, "vote", zigHandler.allocator, null, zigHandler.logger)) |filename| { | ||
| zigHandler.logger.err("Vote deserialization failed - debug file created: {s}", .{filename}); | ||
| } else { | ||
| zigHandler.logger.err("Vote deserialization failed - could not create debug file", .{}); | ||
|
|
@@ -189,19 +201,25 @@ pub const EthLibp2p = struct { | |
| const message = switch (topic.gossip_topic) { | ||
| .block => blockbytes: { | ||
| var serialized = std.ArrayList(u8).init(self.allocator); | ||
| defer serialized.deinit(); | ||
| try ssz.serialize(types.SignedBeamBlock, data.block, &serialized); | ||
|
|
||
| break :blockbytes serialized.items; | ||
| break :blockbytes try serialized.toOwnedSlice(); | ||
| }, | ||
| .vote => votebytes: { | ||
| var serialized = std.ArrayList(u8).init(self.allocator); | ||
| defer serialized.deinit(); | ||
|
Comment on lines
+204
to
+211
|
||
| try ssz.serialize(types.SignedVote, data.vote, &serialized); | ||
|
|
||
| break :votebytes serialized.items; | ||
| break :votebytes try serialized.toOwnedSlice(); | ||
| }, | ||
| }; | ||
| self.logger.debug("network-{d}:: calling publish_msg_to_rust_bridge with message={any} for data={any}", .{ self.params.networkId, message, data }); | ||
| publish_msg_to_rust_bridge(self.params.networkId, topic_str.ptr, message.ptr, message.len); | ||
| defer self.allocator.free(message); | ||
|
|
||
| const compressed_message = try snappyz.encode(self.allocator, message); | ||
| defer self.allocator.free(compressed_message); | ||
| self.logger.debug("network-{d}:: calling publish_msg_to_rust_bridge with message={any} for data={any}", .{ self.params.networkId, compressed_message, data }); | ||
| publish_msg_to_rust_bridge(self.params.networkId, topic_str.ptr, compressed_message.ptr, compressed_message.len); | ||
| } | ||
|
|
||
| pub fn subscribe(ptr: *anyopaque, topics: []interface.GossipTopic, handler: interface.OnGossipCbHandler) anyerror!void { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using
defer serialized.deinit()followed byserialized.toOwnedSlice()is incorrect. ThetoOwnedSlice()method transfers ownership of the memory to the caller and clears the ArrayList, so the subsequentdeinit()call will attempt to free already-transferred memory. Remove thedefer serialized.deinit()line sincetoOwnedSlice()handles the cleanup.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is wrong,
toOwnedSlice()allocated a new memoryThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually toOwnedSlice does clear the memory as well so deinit is unnecessary but not harmful either

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, got it. unnecessary to call