Skip to content

inspector: add protocol method Network.dataReceived #58001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions doc/api/inspector.md
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,19 @@ inspector.Network.requestWillBeSent({
});
```

### `inspector.Network.dataReceived([params])`

<!-- YAML
added: REPLACEME
-->

* `params` {Object}

This feature is only available with the `--experimental-network-inspection` flag enabled.

Broadcasts the `Network.dataReceived` event to connected frontends, or buffers the data if
`Network.streamResourceContent` command was not invoked for the given request yet.

### `inspector.Network.requestWillBeSent([params])`

<!-- YAML
Expand Down
1 change: 1 addition & 0 deletions lib/inspector.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ const Network = {
responseReceived: (params) => broadcastToFrontend('Network.responseReceived', params),
loadingFinished: (params) => broadcastToFrontend('Network.loadingFinished', params),
loadingFailed: (params) => broadcastToFrontend('Network.loadingFailed', params),
dataReceived: (params) => broadcastToFrontend('Network.dataReceived', params),
};

module.exports = {
Expand Down
91 changes: 91 additions & 0 deletions src/inspector/network_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::Uint8Array;
using v8::Value;

// Get a protocol string property from the object.
Expand Down Expand Up @@ -183,6 +184,7 @@ NetworkAgent::NetworkAgent(NetworkInspector* inspector,
event_notifier_map_["responseReceived"] = &NetworkAgent::responseReceived;
event_notifier_map_["loadingFailed"] = &NetworkAgent::loadingFailed;
event_notifier_map_["loadingFinished"] = &NetworkAgent::loadingFinished;
event_notifier_map_["dataReceived"] = &NetworkAgent::dataReceived;
}

void NetworkAgent::emitNotification(v8::Local<v8::Context> context,
Expand Down Expand Up @@ -211,6 +213,30 @@ protocol::DispatchResponse NetworkAgent::disable() {
return protocol::DispatchResponse::Success();
}

protocol::DispatchResponse NetworkAgent::streamResourceContent(
const protocol::String& in_requestId, protocol::Binary* out_bufferedData) {
if (!requests_.contains(in_requestId)) {
// Request not found, ignore it.
return protocol::DispatchResponse::InvalidParams("Request not found");
}

auto& it = requests_[in_requestId];

it.is_streaming = true;

// Concat response bodies.
*out_bufferedData = protocol::Binary::concat(it.response_data_blobs);
// Clear buffered data.
it.response_data_blobs.clear();

if (it.is_finished) {
// If the request is finished, remove the entry.
requests_.erase(in_requestId);
}

return protocol::DispatchResponse::Success();
}

void NetworkAgent::requestWillBeSent(v8::Local<v8::Context> context,
v8::Local<v8::Object> params) {
protocol::String request_id;
Expand Down Expand Up @@ -247,6 +273,12 @@ void NetworkAgent::requestWillBeSent(v8::Local<v8::Context> context,
std::move(initiator),
timestamp,
wall_time);

if (requests_.contains(request_id)) {
// Duplicate entry, ignore it.
return;
}
requests_.emplace(request_id, RequestEntry{timestamp, false, false, {}});
}

void NetworkAgent::responseReceived(v8::Local<v8::Context> context,
Expand Down Expand Up @@ -295,6 +327,8 @@ void NetworkAgent::loadingFailed(v8::Local<v8::Context> context,
}

frontend_->loadingFailed(request_id, timestamp, type, error_text);

requests_.erase(request_id);
}

void NetworkAgent::loadingFinished(v8::Local<v8::Context> context,
Expand All @@ -309,6 +343,63 @@ void NetworkAgent::loadingFinished(v8::Local<v8::Context> context,
}

frontend_->loadingFinished(request_id, timestamp);

auto request_entry = requests_.find(request_id);
if (request_entry == requests_.end()) {
// No entry found. Ignore it.
return;
}

if (request_entry->second.is_streaming) {
// Streaming finished, remove the entry.
requests_.erase(request_id);
} else {
request_entry->second.is_finished = true;
}
}

void NetworkAgent::dataReceived(v8::Local<v8::Context> context,
v8::Local<v8::Object> params) {
protocol::String request_id;
if (!ObjectGetProtocolString(context, params, "requestId").To(&request_id)) {
return;
}

auto request_entry = requests_.find(request_id);
if (request_entry == requests_.end()) {
// No entry found. Ignore it.
return;
}

double timestamp;
if (!ObjectGetDouble(context, params, "timestamp").To(&timestamp)) {
return;
}
int data_length;
if (!ObjectGetInt(context, params, "dataLength").To(&data_length)) {
return;
}
int encoded_data_length;
if (!ObjectGetInt(context, params, "encodedDataLength")
.To(&encoded_data_length)) {
return;
}
Local<Object> data_obj;
if (!ObjectGetObject(context, params, "data").ToLocal(&data_obj)) {
return;
}
if (!data_obj->IsUint8Array()) {
return;
}
Local<Uint8Array> data = data_obj.As<Uint8Array>();
auto data_bin = protocol::Binary::fromUint8Array(data);

if (request_entry->second.is_streaming) {
frontend_->dataReceived(
request_id, timestamp, data_length, encoded_data_length, data_bin);
} else {
requests_[request_id].response_data_blobs.push_back(data_bin);
}
}

} // namespace inspector
Expand Down
16 changes: 16 additions & 0 deletions src/inspector/network_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@

#include "node/inspector/protocol/Network.h"

#include <map>
#include <unordered_map>

namespace node {
namespace inspector {

class NetworkInspector;

struct RequestEntry {
double timestamp;
bool is_finished;
bool is_streaming;
std::vector<protocol::Binary> response_data_blobs;
};

class NetworkAgent : public protocol::Network::Backend {
public:
explicit NetworkAgent(NetworkInspector* inspector,
Expand All @@ -21,6 +29,10 @@ class NetworkAgent : public protocol::Network::Backend {

protocol::DispatchResponse disable() override;

protocol::DispatchResponse streamResourceContent(
const protocol::String& in_requestId,
protocol::Binary* out_bufferedData) override;

void emitNotification(v8::Local<v8::Context> context,
const protocol::String& event,
v8::Local<v8::Object> params);
Expand All @@ -37,13 +49,17 @@ class NetworkAgent : public protocol::Network::Backend {
void loadingFinished(v8::Local<v8::Context> context,
v8::Local<v8::Object> params);

void dataReceived(v8::Local<v8::Context> context,
v8::Local<v8::Object> params);

private:
NetworkInspector* inspector_;
v8_inspector::V8Inspector* v8_inspector_;
std::shared_ptr<protocol::Network::Frontend> frontend_;
using EventNotifier = void (NetworkAgent::*)(v8::Local<v8::Context> context,
v8::Local<v8::Object>);
std::unordered_map<protocol::String, EventNotifier> event_notifier_map_;
std::map<protocol::String, RequestEntry> requests_;
};

} // namespace inspector
Expand Down
24 changes: 24 additions & 0 deletions src/inspector/node_protocol.pdl
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ experimental domain Network
# Enables network tracking, network events will now be delivered to the client.
command enable

# Enables streaming of the response for the given requestId.
# If enabled, the dataReceived event contains the data that was received during streaming.
experimental command streamResourceContent
parameters
# Identifier of the request to stream.
RequestId requestId
returns
# Data that has been buffered until streaming is enabled.
binary bufferedData

# Fired when page is about to send HTTP request.
event requestWillBeSent
parameters
Expand Down Expand Up @@ -227,6 +237,20 @@ experimental domain Network
# Timestamp.
MonotonicTime timestamp

# Fired when data chunk was received over the network.
event dataReceived
parameters
# Request identifier.
RequestId requestId
# Timestamp.
MonotonicTime timestamp
# Data chunk length.
integer dataLength
# Actual bytes received (might be less than dataLength for compressed encodings).
integer encodedDataLength
# Data that was received.
experimental optional binary data

# Support for inspecting node process state.
experimental domain NodeRuntime
# Enable the NodeRuntime events except by `NodeRuntime.waitingForDisconnect`.
Expand Down
66 changes: 66 additions & 0 deletions src/inspector/node_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ void ProtocolTypeTraits<std::string>::Serialize(const std::string& value,
cbor::EncodeString8(SpanFrom(value), bytes);
}

bool ProtocolTypeTraits<node::inspector::protocol::Binary>::Deserialize(
DeserializerState* state, node::inspector::protocol::Binary* value) {
CHECK(state->tokenizer()->TokenTag() == cbor::CBORTokenTag::BINARY);
span<uint8_t> cbor_span = state->tokenizer()->GetBinary();
*value = node::inspector::protocol::Binary::fromSpan(cbor_span);
return true;
}

void ProtocolTypeTraits<node::inspector::protocol::Binary>::Serialize(
const node::inspector::protocol::Binary& value,
std::vector<uint8_t>* bytes) {
cbor::EncodeString8(SpanFrom(value.toBase64()), bytes);
}

} // namespace crdtp

namespace node {
Expand Down Expand Up @@ -93,6 +107,58 @@ size_t StringUtil::CharacterCount(const std::string_view s) {
return s.length();
}

String Binary::toBase64() const {
MaybeStackBuffer<char> buffer;
size_t str_len = simdutf::base64_length_from_binary(bytes_->size());
buffer.SetLength(str_len);

size_t len =
simdutf::binary_to_base64(reinterpret_cast<const char*>(bytes_->data()),
bytes_->size(),
buffer.out());
CHECK_EQ(len, str_len);
return buffer.ToString();
}

// static
Binary Binary::concat(const std::vector<Binary>& binaries) {
size_t total_size = 0;
for (const auto& binary : binaries) {
total_size += binary.size();
}
auto bytes = std::make_shared<std::vector<uint8_t>>(total_size);
uint8_t* data_ptr = bytes->data();
for (const auto& binary : binaries) {
memcpy(data_ptr, binary.data(), binary.size());
data_ptr += binary.size();
}
return Binary(bytes);
}

// static
Binary Binary::fromBase64(const String& base64, bool* success) {
Binary binary{};
size_t base64_len = simdutf::maximal_binary_length_from_base64(
base64.data(), base64.length());
binary.bytes_->resize(base64_len);

simdutf::result result;
result =
simdutf::base64_to_binary(base64.data(),
base64.length(),
reinterpret_cast<char*>(binary.bytes_->data()));
CHECK_EQ(result.error, simdutf::error_code::SUCCESS);
return binary;
}

// static
Binary Binary::fromUint8Array(v8::Local<v8::Uint8Array> data) {
auto bytes = std::make_shared<std::vector<uint8_t>>(data->ByteLength());
size_t size = data->CopyContents(bytes->data(), data->ByteLength());
CHECK_EQ(size, data->ByteLength());
return Binary(bytes);
}

} // namespace protocol
} // namespace inspector
} // namespace node
Loading
Loading