Skip to content

Commit 3aa512b

Browse files
committed
Merge branch 'main' into sfleen/opentelemetry-vendor
2 parents bd4d4a3 + 4d8dd60 commit 3aa512b

File tree

2 files changed

+96
-93
lines changed

2 files changed

+96
-93
lines changed

linkerd/app/inbound/src/lib.rs

Lines changed: 3 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,18 @@ pub use self::{
2424
detect::MetricsFamilies as DetectMetrics, metrics::InboundMetrics, policy::DefaultPolicy,
2525
};
2626
use linkerd_app_core::{
27-
config::{ConnectConfig, ProxyConfig, QueueConfig},
27+
config::{ProxyConfig, QueueConfig},
2828
drain,
2929
http_tracing::SpanSink,
30-
identity, io,
30+
identity,
3131
metrics::prom,
32-
proxy::{tap, tcp},
32+
proxy::tap,
3333
svc,
3434
transport::{self, Remote, ServerAddr},
3535
Error, NameAddr, NameMatch, ProxyRuntime,
3636
};
3737
use std::{fmt::Debug, time::Duration};
3838
use thiserror::Error;
39-
use tracing::debug_span;
4039

4140
#[derive(Clone, Debug)]
4241
pub struct Config {
@@ -183,99 +182,12 @@ impl Inbound<()> {
183182
pub fn with_stack<S>(self, stack: S) -> Inbound<S> {
184183
self.map_stack(move |_, _, _| svc::stack(stack))
185184
}
186-
187-
/// Readies the inbound stack to make TCP connections (for both TCP
188-
// forwarding and HTTP proxying).
189-
pub fn into_tcp_connect<T>(
190-
self,
191-
proxy_port: u16,
192-
) -> Inbound<
193-
impl svc::MakeConnection<
194-
T,
195-
Connection = impl Send + Unpin,
196-
Metadata = impl Send + Unpin,
197-
Error = Error,
198-
Future = impl Send,
199-
> + Clone,
200-
>
201-
where
202-
T: svc::Param<Remote<ServerAddr>> + 'static,
203-
{
204-
self.map_stack(|config, _, _| {
205-
// Establishes connections to remote peers (for both TCP
206-
// forwarding and HTTP proxying).
207-
let ConnectConfig {
208-
ref keepalive,
209-
ref user_timeout,
210-
ref timeout,
211-
..
212-
} = config.proxy.connect;
213-
214-
#[derive(Debug, thiserror::Error)]
215-
#[error("inbound connection must not target port {0}")]
216-
struct Loop(u16);
217-
218-
svc::stack(transport::ConnectTcp::new(*keepalive, *user_timeout))
219-
// Limits the time we wait for a connection to be established.
220-
.push_connect_timeout(*timeout)
221-
// Prevent connections that would target the inbound proxy port from looping.
222-
.push_filter(move |t: T| {
223-
let addr = t.param();
224-
let port = addr.port();
225-
if port == proxy_port {
226-
return Err(Loop(port));
227-
}
228-
Ok(addr)
229-
})
230-
})
231-
}
232185
}
233186

234187
impl<S> Inbound<S> {
235188
pub fn push<L: svc::layer::Layer<S>>(self, layer: L) -> Inbound<L::Service> {
236189
self.map_stack(|_, _, stack| stack.push(layer))
237190
}
238-
239-
// Forwards TCP streams that cannot be decoded as HTTP.
240-
//
241-
// Looping is always prevented.
242-
pub fn push_tcp_forward<T, I>(
243-
self,
244-
) -> Inbound<
245-
svc::ArcNewService<
246-
T,
247-
impl svc::Service<I, Response = (), Error = ForwardError, Future = impl Send> + Clone,
248-
>,
249-
>
250-
where
251-
T: svc::Param<transport::labels::Key>
252-
+ svc::Param<Remote<ServerAddr>>
253-
+ Clone
254-
+ Send
255-
+ Sync
256-
+ 'static,
257-
I: io::AsyncRead + io::AsyncWrite,
258-
I: Debug + Send + Unpin + 'static,
259-
S: svc::MakeConnection<T> + Clone + Send + Sync + Unpin + 'static,
260-
S::Connection: Send + Unpin,
261-
S::Metadata: Send + Unpin,
262-
S::Future: Send,
263-
{
264-
self.map_stack(|_, rt, connect| {
265-
connect
266-
.push(transport::metrics::Client::layer(
267-
rt.metrics.proxy.transport.clone(),
268-
))
269-
.push(svc::stack::WithoutConnectionMetadata::layer())
270-
.push_new_thunk()
271-
.push_on_service(tcp::Forward::layer())
272-
.push_on_service(drain::Retain::layer(rt.drain.clone()))
273-
.instrument(|_: &_| debug_span!("tcp"))
274-
.push(svc::NewMapErr::layer_from_target::<ForwardError, _>())
275-
.push(svc::ArcNewService::layer())
276-
.check_new::<T>()
277-
})
278-
}
279191
}
280192

281193
fn stack_labels(proto: &'static str, name: &'static str) -> metrics::StackLabels {

linkerd/app/inbound/src/server.rs

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
use crate::{direct, policy, Inbound};
1+
use crate::{direct, policy, ForwardError, Inbound};
22
use linkerd_app_core::{
3+
config::ConnectConfig,
4+
drain,
35
exp_backoff::ExponentialBackoff,
46
io, profiles,
5-
proxy::http,
7+
proxy::{http, tcp},
68
svc,
79
transport::{self, addrs::*},
810
Error,
@@ -103,6 +105,95 @@ impl Inbound<()> {
103105
.push_accept(addr.port(), policies, direct)
104106
.into_inner()
105107
}
108+
109+
/// Readies the inbound stack to make TCP connections (for both TCP
110+
/// forwarding and HTTP proxying).
111+
fn into_tcp_connect<T>(
112+
self,
113+
proxy_port: u16,
114+
) -> Inbound<
115+
impl svc::MakeConnection<
116+
T,
117+
Connection = impl Send + Unpin,
118+
Metadata = impl Send + Unpin,
119+
Error = Error,
120+
Future = impl Send,
121+
> + Clone,
122+
>
123+
where
124+
T: svc::Param<Remote<ServerAddr>> + 'static,
125+
{
126+
self.map_stack(|config, _, _| {
127+
// Establishes connections to remote peers (for both TCP
128+
// forwarding and HTTP proxying).
129+
let ConnectConfig {
130+
ref keepalive,
131+
ref user_timeout,
132+
ref timeout,
133+
..
134+
} = config.proxy.connect;
135+
136+
#[derive(Debug, thiserror::Error)]
137+
#[error("inbound connection must not target port {0}")]
138+
struct Loop(u16);
139+
140+
svc::stack(transport::ConnectTcp::new(*keepalive, *user_timeout))
141+
// Limits the time we wait for a connection to be established.
142+
.push_connect_timeout(*timeout)
143+
// Prevent connections that would target the inbound proxy port from looping.
144+
.push_filter(move |t: T| {
145+
let addr = t.param();
146+
let port = addr.port();
147+
if port == proxy_port {
148+
return Err(Loop(port));
149+
}
150+
Ok(addr)
151+
})
152+
})
153+
}
154+
}
155+
156+
impl<S> Inbound<S> {
157+
// Forwards TCP streams that cannot be decoded as HTTP.
158+
//
159+
// Looping is always prevented.
160+
fn push_tcp_forward<T, I>(
161+
self,
162+
) -> Inbound<
163+
svc::ArcNewService<
164+
T,
165+
impl svc::Service<I, Response = (), Error = ForwardError, Future = impl Send> + Clone,
166+
>,
167+
>
168+
where
169+
T: svc::Param<transport::labels::Key>
170+
+ svc::Param<Remote<ServerAddr>>
171+
+ Clone
172+
+ Send
173+
+ Sync
174+
+ 'static,
175+
I: io::AsyncRead + io::AsyncWrite,
176+
I: Debug + Send + Unpin + 'static,
177+
S: svc::MakeConnection<T> + Clone + Send + Sync + Unpin + 'static,
178+
S::Connection: Send + Unpin,
179+
S::Metadata: Send + Unpin,
180+
S::Future: Send,
181+
{
182+
self.map_stack(|_, rt, connect| {
183+
connect
184+
.push(transport::metrics::Client::layer(
185+
rt.metrics.proxy.transport.clone(),
186+
))
187+
.push(svc::stack::WithoutConnectionMetadata::layer())
188+
.push_new_thunk()
189+
.push_on_service(tcp::Forward::layer())
190+
.push_on_service(drain::Retain::layer(rt.drain.clone()))
191+
.instrument(|_: &_| debug_span!("tcp"))
192+
.push(svc::NewMapErr::layer_from_target::<ForwardError, _>())
193+
.push(svc::ArcNewService::layer())
194+
.check_new::<T>()
195+
})
196+
}
106197
}
107198

108199
// === impl TcpEndpoint ===

0 commit comments

Comments
 (0)