Skip to content

Expose streaming API #1013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

s0l0ist
Copy link
Contributor

@s0l0ist s0l0ist commented Jul 11, 2025

📬 Issue #, if available:

The SDK supports streaming, but lambda_http::Adapter only handles buffered responses from Axum's Router (i.e., a Service). The lambda_http::run_with_streaming_response is available, but it doesn't allow you to specify a custom runtime which is necessary if you want to support OpenTelemetry.

Related (ish):

✍️ Description of changes:

This PR exposes:

  • StreamAdapter: Converts a tower Service into an AWS Lambda streaming response.

I also have added two new examples which showcase how to use Axum with run_with_streaming_response and with StreamAdapter + OTeL

This is how you can use it with a custom runtime supporting OTeL:

use crate::{error::BoxError, service::build_service};
use axum::Router;
use lambda_runtime::{
    Runtime,
    layers::{OpenTelemetryFaasTrigger, OpenTelemetryLayer},
};
use opentelemetry_sdk::trace as sdktrace;

/// The main function to run the AWS Lambda
pub async fn async_main(tracer_provider: sdktrace::SdkTracerProvider) -> Result<(), BoxError> {
    let (app, _): (Router, _) = build_service().await;

    // For a buffered response:
    // let handler = lambda_http::Adapter::from(app);
    // For a streamed response:
    let handler = lambda_http::StreamAdapter::from(app);

    let runtime = Runtime::new(handler).layer(
        // Create a tracing span for each Lambda invocation
        OpenTelemetryLayer::new(|| {
            if let Err(err) = tracer_provider.force_flush() {
                eprintln!("Error flushing traces: {:#?}", err);
            }
        })
        .with_trigger(OpenTelemetryFaasTrigger::Http),
    );
    runtime.run().await
}

🔏 By submitting this pull request

  • I confirm that I've ran cargo +nightly fmt.
  • I confirm that I've ran cargo clippy --fix.
  • I confirm that I've made a best effort attempt to update all relevant documentation.
  • I confirm that my contribution is made under the terms of the Apache 2.0 license.

@jlizen
Copy link
Contributor

jlizen commented Jul 11, 2025

Hi @s0l0ist , thanks for cutting this PR. This change seems very reasonable to me.

I do have concerns about leaking internals. This is only naming return types that are already public, but we might want to tweak their composition or otherwise shift bounds in ways that break callers. I think it would be best to type erase the returned service stack.

The easiest way to do that would be via a tower::util::BoxService / BoxLayer (or the sync/clone-bounded variants, though I don't think we need them here - but we could certainly add alternate APIs that do include the sync/clone bounds if it would be useful for how the returned struct is used by a runtime).

That adds a small amount of performance overhead due to an extra allocation and layer of dynamic dispatch. But, I think the ergonomics would be much better compared to a more complex, composable builder-style API using generics and sealed inner layer types. Note that we would probably want some sort of into_streaming_streaming_response_inner() that is what into_streaming_response() is currently, so that our run_with_streaming_response can skip the minor type erasure overhead.

How does that sound to you?

It would also be great to get a small example showing usage of this with a non-tokio runtime, if you'd be open to it! That would both let us validate the API, and make it more discoverable for users. I'd probably be ok with this landing without that, though, if you don't have cycles.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Jul 19, 2025

@jlizen, I think that's the right direction. Currently busy right now, but can revisit this PR in a couple of weeks from now.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 10, 2025

@jlizen - does this API work? I'll add an example or two if I have time, but wanted to make sure this was aligned beforehand.

Copy link
Contributor

@jlizen jlizen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General approach looks good, but left a note around newtyping the BoxService so we don't lock ourselves into extra Service: Send + 'static bounds for all time. (For now specifying them in our API ourselves is fine, just want to leave the door open to removing them without type breakage).

I'm seeing that we don't have ANY tests currently of the run_with_streaming_response() API... that is unfortunate... An example would be really great if you have the time (which would double as a test that things compile at least). And then you could trivially validate e2e with cargo-lambda at that point. (Or feel free to throw in an integration test if you want our CI/CD to probe the e2e, up to you).

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 16, 2025

Hi @jlizen, I think I may have a better approach.

I noticed that Adapter, handles buffered responses by wrapping a Service<Request> and producing a LambdaResponse from anything that implements IntoResponse. Taking inspiration from that, I built a StreamAdapter that wraps a Service<Request, Response = Response<B>> and produces a StreamResponse<BodyStream<B>>.

This lets us remove the BoxService entirely while keeping the same ergonomics. The trade-off is a heap-allocated future per request (same as Adapter), but we avoid exposing BoxService in the public API.

With this change, run_with_streaming_response just mirrors the non-streaming run:

pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
where
    S: Service<Request, Response = Response<B>, Error = E>,
    S::Future: Send + 'a,
    B: Body + Unpin + Send + 'static,
    B::Data: Into<Bytes> + Send,
    B::Error: Into<Error> + Send + Debug,
{
    lambda_runtime::run(StreamAdapter::from(handler)).await
}

And I can selectively choose which one I want like this:

/// The main function to run the AWS Lambda
pub async fn async_main(tracer_provider: sdktrace::SdkTracerProvider) -> Result<(), BoxError> {
    let (app, _) = build_service().await;

    // For a buffered response:
    // let handler = lambda_http::Adapter::from(app);
    // For a streamed response:
    let handler = lambda_http::StreamAdapter::from(app);

    let runtime = Runtime::new(handler).layer(
        // Create a tracing span for each Lambda invocation
        OpenTelemetryLayer::new(|| {
            if let Err(err) = tracer_provider.force_flush() {
                eprintln!("Error flushing traces: {err:#?}");
            }
        })
        .with_trigger(OpenTelemetryFaasTrigger::Http),
    );
    runtime.run().await
}

@s0l0ist s0l0ist requested a review from jlizen August 16, 2025 12:01
Copy link
Contributor

@jlizen jlizen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the direction you went with this for the new API.

But, I would prefer to have somewhat duplicated code if that is what is needed to keep from adding a new allocation to the existing API. Especially given that there is no additional benefit to the caller. IE, let's leave the run_with_streaming_response API not using the StreamAdapter.

Presumably a decent chunk of it could be pulled out into a pure function to avoid too much duplication.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 17, 2025

Okay I think I have it:

  • Add into_stream_service(...): zero-alloc builder (no boxed future/vtable) that is used by run_with_streaming_response so that it continues to have the same exact behavior.
  • Keep StreamAdapter for ergonomic stacking; it still returns a boxed future for API stability.
  • Factor common logic into into_stream_response(...) which is used by both StreamAdapter and into_stream_service

I added a parallel test to what Adapter already had - ensuring we can compile-check a BoxService built the same way. Once we’re happy with the shape, I’ll follow up more examples. I've already added the http-axum-streaming example.

@s0l0ist s0l0ist requested a review from jlizen August 17, 2025 11:26
Copy link
Contributor

@jlizen jlizen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think you nailed the API! Thanks for all the iterations.

Looks like clippy/rustfmt runs are needed, lint CI is failing.

I'd be happy to merge it after those go green and cut a release, or I can wait if you're planning on adding more examples to this PR. Let me know your preference!

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 17, 2025

Yes I see the CI checks failing - they're unrelated to this PR, but happy to clean them up.

@jlizen
Copy link
Contributor

jlizen commented Aug 17, 2025

Ah, ok, I didn't actually look at them - probably we got hit by some new clippy/rustfmt lints across toolchain versions. If you have time to clean them up, feel free / much appreciated (separate commit please).

It sounded like you did plan to hoist that test into an example, so I can sit tight on merging since it sounds like a new commit is coming. But just ping me if you want to ship it without another round of changes.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 18, 2025

@jlizen , fixed CI here.

And yes, I plan on moving the test into the new example and that should wrap this PR up.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 18, 2025

@jlizen - Okay! I think we're ready :)

Added two examples, one that showcases run_with_streaming_response and another that demonstrates configuring a custom runtime with OTeL using StreamAdapter. Both examples type-erase the service into a BoxService before converting it into a streaming service. I didn’t have a good example of type-erasing after conversion, so I kept the compile test since it directly validates Tower::Service.

I've tested manually e2e to verify everything works.

@s0l0ist s0l0ist requested a review from jlizen August 18, 2025 17:53
@jlizen
Copy link
Contributor

jlizen commented Aug 19, 2025

Sorry to miss this today @s0l0ist - near the top of my list for tomorrow. Thanks for the CI fix.

@jlizen
Copy link
Contributor

jlizen commented Aug 19, 2025

Sorry for the back and forth on examples @s0l0ist - I'm good to merge with that one requested change (remove the vestigial BoxService). Given that users will probably copy-paste these examples, it would be better to show the right general practice.

@s0l0ist s0l0ist requested a review from jlizen August 20, 2025 09:03
@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 20, 2025

Sorry for the back and forth on examples @s0l0ist - I'm good to merge with that one requested change (remove the vestigial BoxService). Given that users will probably copy-paste these examples, it would be better to show the right general practice.

No need to apologize! I'm happy to iterate and collaborate on the best approach moving forward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants