Skip to content

Expose streaming API #1013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions examples/http-axum-streaming/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "http-axum-streaming"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = "0.8"
bytes = "1"
futures-util = "0.3"
lambda_http = { path = "../../lambda-http", default-features = false, features = [
"apigw_rest", "apigw_http", "tracing"
] }
thiserror = "2.0"
tokio = { version = "1", features = ["macros"] }
tokio-stream = "0.1.2"
15 changes: 15 additions & 0 deletions examples/http-axum-streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# AWS Lambda Function example

This example demonstrates building a **streaming** HTTP response with Axum, deployed on AWS Lambda using a custom runtime.

## Build & Deploy

1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
2. Build the function with `cargo lambda build --release`
3. Deploy the function to AWS Lambda with `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE`
4. Enable Lambda streaming response on Lambda console: change the function url's invoke mode to `RESPONSE_STREAM`
5. Verify the function works: `curl -v -N <function-url>`. The results should be streamed back with 0.5 second pause between each word.

## Build for ARM 64

Build the function with `cargo lambda build --release --arm64`
62 changes: 62 additions & 0 deletions examples/http-axum-streaming/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use axum::{
body::Body,
http,
http::{
header::{CACHE_CONTROL, CONTENT_TYPE},
StatusCode,
},
response::{IntoResponse, Response},
routing::get,
Router,
};
use bytes::Bytes;
use lambda_http::{lambda_runtime, tracing, Error, StreamAdapter};
use std::{convert::Infallible, time::Duration};
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

#[derive(Debug, Error)]
pub enum AppError {
#[error("{0}")]
Http(#[from] http::Error),
}

impl IntoResponse for AppError {
fn into_response(self) -> Response {
(StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response()
}
}

type AppResult<T = Response> = Result<T, AppError>;

async fn stream_handler() -> AppResult {
let (tx, rx) = mpsc::channel::<Result<Bytes, Infallible>>(8);
let body = Body::from_stream(ReceiverStream::new(rx));

tokio::spawn(async move {
for msg in ["Hello", "world", "from", "Lambda!"] {
tokio::time::sleep(Duration::from_millis(500)).await;
if tx.send(Ok(Bytes::from(format!("{msg}\n")))).await.is_err() {
break;
}
}
});

Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
.header(CACHE_CONTROL, "no-cache")
.body(body)?)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();

let app = Router::new().route("/", get(stream_handler));

let runtime = lambda_runtime::Runtime::new(StreamAdapter::from(app));

runtime.run().await
}
2 changes: 1 addition & 1 deletion lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use std::{
};

mod streaming;
pub use streaming::run_with_streaming_response;
pub use streaming::{run_with_streaming_response, StreamAdapter};

/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
pub type Request = http::Request<Body>;
Expand Down
210 changes: 175 additions & 35 deletions lambda-http/src/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,88 @@
use crate::{http::header::SET_COOKIE, request::LambdaRequest, tower::ServiceBuilder, Request, RequestExt};
use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestExt};
use bytes::Bytes;
pub use http::{self, Response};
use http_body::Body;
use lambda_runtime::Diagnostic;
pub use lambda_runtime::{self, tower::ServiceExt, Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
use std::{
use core::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
};
use tokio_stream::Stream;
use futures_util::{Stream, TryFutureExt};
pub use http::{self, Response};
use http_body::Body;
use lambda_runtime::{
tower::{
util::{MapRequest, MapResponse},
ServiceBuilder, ServiceExt,
},
Diagnostic,
};
pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse};
use std::{future::Future, marker::PhantomData};

/// An adapter that lifts a standard [`Service<Request>`] into a
/// [`Service<LambdaEvent<LambdaRequest>>`] which produces streaming Lambda HTTP
/// responses.
pub struct StreamAdapter<'a, S, B> {
service: S,
_phantom_data: PhantomData<&'a B>,
}

impl<'a, S, B, E> From<S> for StreamAdapter<'a, S, B>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
fn from(service: S) -> Self {
StreamAdapter {
service,
_phantom_data: PhantomData,
}
}
}

/// Starts the Lambda Rust runtime and stream response back [Configure Lambda
/// Streaming Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html).
impl<'a, S, B, E> Service<LambdaEvent<LambdaRequest>> for StreamAdapter<'a, S, B>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
type Response = StreamResponse<BodyStream<B>>;
type Error = E;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'a>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future {
let event: Request = req.payload.into();
Box::pin(
self.service
.call(event.with_lambda_context(req.context))
.map_ok(into_stream_response),
)
}
}

/// Builds a streaming-aware Tower service from a `Service<Request>` **without**
/// boxing its future (no heap allocation / vtable).
///
/// This takes care of transforming the LambdaEvent into a [`Request`] and
/// accepts [`http::Response<http_body::Body>`] as response.
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
/// Transforms `LambdaEvent<LambdaRequest>` into `Request` with Lambda context
/// and wraps `Response<B>` into `StreamResponse<BodyStream<B>>`.
///
/// Used internally by [`run_with_streaming_response`]; not part of the public
/// API.
#[allow(clippy::type_complexity)]
fn into_stream_service<'a, S, B, E>(
handler: S,
) -> MapResponse<
MapRequest<S, impl FnMut(LambdaEvent<LambdaRequest>) -> Request>,
impl FnOnce(Response<B>) -> StreamResponse<BodyStream<B>> + Clone,
>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
Expand All @@ -25,38 +91,59 @@ where
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
let svc = ServiceBuilder::new()
ServiceBuilder::new()
.map_request(|req: LambdaEvent<LambdaRequest>| {
let event: Request = req.payload.into();
event.with_lambda_context(req.context)
})
.service(handler)
.map_response(|res| {
let (parts, body) = res.into_parts();

let mut prelude_headers = parts.headers;

let cookies = prelude_headers.get_all(SET_COOKIE);
let cookies = cookies
.iter()
.map(|c| String::from_utf8_lossy(c.as_bytes()).to_string())
.collect::<Vec<String>>();
.map_response(into_stream_response)
}

prelude_headers.remove(SET_COOKIE);
/// Converts an `http::Response<B>` into a streaming Lambda response.
fn into_stream_response<B>(res: Response<B>) -> StreamResponse<BodyStream<B>>
where
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
let (parts, body) = res.into_parts();

let metadata_prelude = MetadataPrelude {
headers: prelude_headers,
status_code: parts.status,
cookies,
};
let mut headers = parts.headers;
let cookies = headers
.get_all(SET_COOKIE)
.iter()
.map(|c| String::from_utf8_lossy(c.as_bytes()).to_string())
.collect::<Vec<_>>();
headers.remove(SET_COOKIE);

StreamResponse {
metadata_prelude,
stream: BodyStream { body },
}
});
StreamResponse {
metadata_prelude: MetadataPrelude {
headers,
status_code: parts.status,
cookies,
},
stream: BodyStream { body },
}
}

lambda_runtime::run(svc).await
/// Runs the Lambda runtime with a handler that returns **streaming** HTTP
/// responses.
///
/// See the [AWS docs for response streaming].
///
/// [AWS docs for response streaming]:
/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
E: Debug + Into<Diagnostic>,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
lambda_runtime::run(into_stream_service(handler)).await
}

pin_project_lite::pin_project! {
Expand Down Expand Up @@ -85,3 +172,56 @@ where
}
}
}

#[cfg(test)]
mod test_stream_adapter {
use super::*;

use crate::Body;
use http::StatusCode;

// A middleware that logs requests before forwarding them to another service
struct LogService<S> {
inner: S,
}

impl<S> Service<LambdaEvent<LambdaRequest>> for LogService<S>
where
S: Service<LambdaEvent<LambdaRequest>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, event: LambdaEvent<LambdaRequest>) -> Self::Future {
// Log the request
println!("Lambda event: {event:#?}");

self.inner.call(event)
}
}

/// This tests that `StreamAdapter` can be used in a `tower::Service` where
/// the user may require additional middleware between `lambda_runtime::run`
/// and where the `LambdaEvent` is converted into a `Request`.
#[test]
fn stream_adapter_is_boxable() {
let _svc = ServiceBuilder::new()
Copy link
Contributor

@jlizen jlizen Aug 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Since this isn't making any assertions anyway, and is mostly testing that it compiles, would this perhaps be better suited as an example? Seems like a nice simple example of setting up custom middleware? Also would help capture any weirdness with private vs public API.

Up to you, I'm happy to have it here too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah can do! I think I was mostly just trying to mimic the same test for the Adapter. I can move this out into the example which would effectively test this.

.layer_fn(|service| {
// This could be any middleware that logs, inspects, or
// manipulates the `LambdaEvent` before it's converted to a
// `Request` by `Adapter`.

LogService { inner: service }
})
.layer_fn(StreamAdapter::from)
.service_fn(
|_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) },
)
.boxed();
}
}
Loading