Skip to content

Commit 28ea04e

Browse files
committed
Reconnect on max_age GoAway
1 parent 95d6574 commit 28ea04e

File tree

4 files changed

+118
-54
lines changed

4 files changed

+118
-54
lines changed

linera-rpc/src/grpc/client.rs

Lines changed: 96 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Zefchain Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::{fmt, future::Future, iter};
4+
use std::{fmt, future::Future, iter, sync::Arc};
55

66
use futures::{future, stream, StreamExt};
77
use linera_base::{
@@ -29,6 +29,7 @@ use tracing::{debug, info, instrument, warn, Level};
2929

3030
use super::{
3131
api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
32+
pool::GrpcConnectionPool,
3233
transport, GRPC_MAX_MESSAGE_SIZE,
3334
};
3435
use crate::{
@@ -39,57 +40,74 @@ use crate::{
3940
#[derive(Clone)]
4041
pub struct GrpcClient {
4142
address: String,
42-
client: ValidatorNodeClient<transport::Channel>,
43+
pool: Arc<GrpcConnectionPool>,
4344
retry_delay: Duration,
4445
max_retries: u32,
4546
}
4647

4748
impl GrpcClient {
4849
pub fn new(
4950
address: String,
50-
channel: transport::Channel,
51+
pool: Arc<GrpcConnectionPool>,
5152
retry_delay: Duration,
5253
max_retries: u32,
53-
) -> Self {
54-
let client = ValidatorNodeClient::new(channel)
55-
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
56-
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
57-
Self {
54+
) -> Result<Self, super::GrpcError> {
55+
// Just verify we can get a channel to this address
56+
let _ = pool.channel(address.clone())?;
57+
Ok(Self {
5858
address,
59-
client,
59+
pool,
6060
retry_delay,
6161
max_retries,
62-
}
62+
})
6363
}
6464

6565
pub fn address(&self) -> &str {
6666
&self.address
6767
}
6868

69+
fn make_client(&self) -> Result<ValidatorNodeClient<transport::Channel>, super::GrpcError> {
70+
let channel = self.pool.channel(self.address.clone())?;
71+
Ok(ValidatorNodeClient::new(channel)
72+
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
73+
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE))
74+
}
75+
6976
/// Returns whether this gRPC status means the server stream should be reconnected to, or not.
7077
/// Logs a warning on unexpected status codes.
71-
fn is_retryable(status: &Status) -> bool {
78+
fn is_retryable_needs_reconnect(status: &Status) -> (bool, bool) {
7279
match status.code() {
7380
Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
7481
info!("gRPC request interrupted: {}; retrying", status);
75-
true
82+
(true, false)
7683
}
7784
Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
7885
info!("Unexpected gRPC status: {}; retrying", status);
79-
true
86+
(true, false)
87+
}
88+
Code::NotFound => (false, false), // This code is used if e.g. the validator is missing blobs.
89+
Code::Internal => {
90+
let error_string = status.to_string();
91+
if error_string.contains("GoAway") && error_string.contains("max_age") {
92+
info!(
93+
"gRPC connection hit max_age and got a GoAway: {}; reconnecting then retrying",
94+
status
95+
);
96+
return (true, true);
97+
}
98+
info!("Unexpected gRPC status: {}", status);
99+
(false, false)
80100
}
81-
Code::NotFound => false, // This code is used if e.g. the validator is missing blobs.
82101
Code::InvalidArgument
83102
| Code::AlreadyExists
84103
| Code::PermissionDenied
85104
| Code::FailedPrecondition
86105
| Code::OutOfRange
87106
| Code::Unimplemented
88-
| Code::Internal
89107
| Code::DataLoss
90108
| Code::Unauthenticated => {
91109
info!("Unexpected gRPC status: {}", status);
92-
false
110+
(false, false)
93111
}
94112
}
95113
}
@@ -109,15 +127,36 @@ impl GrpcClient {
109127
let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
110128
error: "could not convert request to proto".to_string(),
111129
})?;
130+
131+
let mut reconnected = false;
112132
loop {
113-
match f(self.client.clone(), Request::new(request_inner.clone())).await {
114-
Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
115-
let delay = self.retry_delay.saturating_mul(retry_count);
116-
retry_count += 1;
117-
linera_base::time::timer::sleep(delay).await;
118-
continue;
133+
// Create client on-demand for each attempt
134+
let client = match self.make_client() {
135+
Ok(client) => client,
136+
Err(e) => {
137+
return Err(NodeError::GrpcError {
138+
error: format!("Failed to create client: {}", e),
139+
});
119140
}
141+
};
142+
143+
match f(client, Request::new(request_inner.clone())).await {
120144
Err(s) => {
145+
let (is_retryable, needs_reconnect) = Self::is_retryable_needs_reconnect(&s);
146+
if is_retryable && retry_count < self.max_retries {
147+
// If this error indicates we need a connection refresh and we haven't already tried, do it
148+
if needs_reconnect && !reconnected {
149+
info!("Connection error detected, invalidating channel: {}", s);
150+
self.pool.invalidate_channel(&self.address);
151+
reconnected = true;
152+
}
153+
154+
let delay = self.retry_delay.saturating_mul(retry_count);
155+
retry_count += 1;
156+
linera_base::time::timer::sleep(delay).await;
157+
continue;
158+
}
159+
121160
return Err(NodeError::GrpcError {
122161
error: format!("remote request [{handler}] failed with status: {s:?}"),
123162
});
@@ -270,32 +309,56 @@ impl ValidatorNode for GrpcClient {
270309
let subscription_request = SubscriptionRequest {
271310
chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
272311
};
273-
let mut client = self.client.clone();
312+
let pool = self.pool.clone();
313+
let address = self.address.clone();
274314

275315
// Make the first connection attempt before returning from this method.
276-
let mut stream = Some(
316+
let mut stream = Some({
317+
let mut client = self
318+
.make_client()
319+
.map_err(|e| NodeError::SubscriptionFailed {
320+
status: format!("Failed to create client: {}", e),
321+
})?;
277322
client
278323
.subscribe(subscription_request.clone())
279324
.await
280325
.map_err(|status| NodeError::SubscriptionFailed {
281326
status: status.to_string(),
282327
})?
283-
.into_inner(),
284-
);
328+
.into_inner()
329+
});
285330

286331
// A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
287332
// `client.subscribe(request)` endlessly and without delay.
288333
let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
289-
let mut client = client.clone();
334+
let pool = pool.clone();
335+
let address = address.clone();
290336
let subscription_request = subscription_request.clone();
291337
let mut stream = stream.take();
292338
async move {
293339
let stream = if let Some(stream) = stream.take() {
294340
future::Either::Right(stream)
295341
} else {
296-
match client.subscribe(subscription_request.clone()).await {
297-
Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
298-
Ok(response) => future::Either::Right(response.into_inner()),
342+
// Create a new client for each reconnection attempt
343+
match pool.channel(address.clone()) {
344+
Ok(channel) => {
345+
let mut client = ValidatorNodeClient::new(channel)
346+
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
347+
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
348+
match client.subscribe(subscription_request.clone()).await {
349+
Err(err) => {
350+
future::Either::Left(stream::iter(iter::once(Err(err))))
351+
}
352+
Ok(response) => future::Either::Right(response.into_inner()),
353+
}
354+
}
355+
Err(e) => {
356+
let status = tonic::Status::unavailable(format!(
357+
"Failed to create channel: {}",
358+
e
359+
));
360+
future::Either::Left(stream::iter(iter::once(Err(status))))
361+
}
299362
}
300363
};
301364
Some((stream, ()))
@@ -319,7 +382,9 @@ impl ValidatorNode for GrpcClient {
319382
return future::Either::Left(future::ready(true));
320383
};
321384

322-
if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries {
385+
let (is_retryable, _) =
386+
span.in_scope(|| Self::is_retryable_needs_reconnect(status));
387+
if !is_retryable || retry_count >= max_retries {
323388
return future::Either::Left(future::ready(false));
324389
}
325390
let delay = retry_delay.saturating_mul(retry_count);

linera-rpc/src/grpc/node_provider.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Zefchain Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::str::FromStr as _;
4+
use std::{str::FromStr as _, sync::Arc};
55

66
use linera_base::time::Duration;
77
use linera_core::node::{NodeError, ValidatorNodeProvider};
@@ -15,7 +15,7 @@ use crate::{
1515

1616
#[derive(Clone)]
1717
pub struct GrpcNodeProvider {
18-
pool: GrpcConnectionPool,
18+
pool: Arc<GrpcConnectionPool>,
1919
retry_delay: Duration,
2020
max_retries: u32,
2121
}
@@ -25,7 +25,7 @@ impl GrpcNodeProvider {
2525
let transport_options = transport::Options::from(&options);
2626
let retry_delay = options.retry_delay;
2727
let max_retries = options.max_retries;
28-
let pool = GrpcConnectionPool::new(transport_options);
28+
let pool = Arc::new(GrpcConnectionPool::new(transport_options));
2929
Self {
3030
pool,
3131
retry_delay,
@@ -44,18 +44,15 @@ impl ValidatorNodeProvider for GrpcNodeProvider {
4444
}
4545
})?;
4646
let http_address = network.http_address();
47-
let channel =
48-
self.pool
49-
.channel(http_address.clone())
50-
.map_err(|error| NodeError::GrpcError {
51-
error: format!("error creating channel: {}", error),
52-
})?;
53-
54-
Ok(GrpcClient::new(
47+
48+
GrpcClient::new(
5549
http_address,
56-
channel,
50+
self.pool.clone(),
5751
self.retry_delay,
5852
self.max_retries,
59-
))
53+
)
54+
.map_err(|error| NodeError::GrpcError {
55+
error: format!("error creating client: {}", error),
56+
})
6057
}
6158
}

linera-rpc/src/grpc/pool.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl GrpcConnectionPool {
3232

3333
/// Obtains a channel for the current address. Either clones an existing one (thereby
3434
/// reusing the connection), or creates one if needed. New channels do not create a
35-
/// connection immediately.
35+
/// connection immediately and will automatically reconnect when needed.
3636
pub fn channel(&self, address: String) -> Result<transport::Channel, GrpcError> {
3737
let pinned = self.channels.pin();
3838
if let Some(channel) = pinned.get(&address) {
@@ -41,4 +41,10 @@ impl GrpcConnectionPool {
4141
let channel = transport::create_channel(address.clone(), &self.options)?;
4242
Ok(pinned.get_or_insert(address, channel).clone())
4343
}
44+
45+
/// Removes a channel from the pool, forcing a new connection to be created on the next request.
46+
/// This should be called when a channel is known to be broken (e.g., received GOAWAY).
47+
pub fn invalidate_channel(&self, address: &str) {
48+
self.channels.pin().remove(address);
49+
}
4450
}

linera-rpc/tests/transport.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1111
async fn client() {
1212
use linera_base::time::Duration;
1313
use linera_core::node::ValidatorNode as _;
14-
use linera_rpc::grpc::{
15-
transport::{create_channel, Options},
16-
GrpcClient,
17-
};
14+
use linera_rpc::grpc::{transport::Options, GrpcClient};
1815

1916
let retry_delay = Duration::from_millis(100);
2017
let max_retries = 5;
@@ -23,9 +20,8 @@ async fn client() {
2320
connect_timeout: Some(Duration::from_millis(100)),
2421
timeout: Some(Duration::from_millis(100)),
2522
};
26-
let channel = create_channel(address.clone(), &options).unwrap();
27-
let _ = GrpcClient::new(address, channel, retry_delay, max_retries)
28-
.get_version_info()
29-
.await
30-
.unwrap();
23+
24+
let pool = std::sync::Arc::new(linera_rpc::grpc::pool::GrpcConnectionPool::new(options));
25+
let client = GrpcClient::new(address, pool, retry_delay, max_retries).unwrap();
26+
let _ = client.get_version_info().await.unwrap();
3127
}

0 commit comments

Comments
 (0)