-
Notifications
You must be signed in to change notification settings - Fork 366
Expose streaming API #1013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Expose streaming API #1013
Conversation
Hi @s0l0ist , thanks for cutting this PR. This change seems very reasonable to me. I do have concerns about leaking internals. This is only naming return types that are already public, but we might want to tweak their composition or otherwise shift bounds in ways that break callers. I think it would be best to type erase the returned service stack. The easiest way to do that would be via a tower::util::BoxService / BoxLayer (or the sync/clone-bounded variants, though I don't think we need them here - but we could certainly add alternate APIs that do include the sync/clone bounds if it would be useful for how the returned struct is used by a runtime). That adds a small amount of performance overhead due to an extra allocation and layer of dynamic dispatch. But, I think the ergonomics would be much better compared to a more complex, composable builder-style API using generics and sealed inner layer types. Note that we would probably want some sort of How does that sound to you? It would also be great to get a small example showing usage of this with a non-tokio runtime, if you'd be open to it! That would both let us validate the API, and make it more discoverable for users. I'd probably be ok with this landing without that, though, if you don't have cycles. |
@jlizen, I think that's the right direction. Currently busy right now, but can revisit this PR in a couple of weeks from now. |
@jlizen - does this API work? I'll add an example or two if I have time, but wanted to make sure this was aligned beforehand. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General approach looks good, but left a note around newtyping the BoxService
so we don't lock ourselves into extra Service: Send + 'static
bounds for all time. (For now specifying them in our API ourselves is fine, just want to leave the door open to removing them without type breakage).
I'm seeing that we don't have ANY tests currently of the run_with_streaming_response()
API... that is unfortunate... An example would be really great if you have the time (which would double as a test that things compile at least). And then you could trivially validate e2e with cargo-lambda
at that point. (Or feel free to throw in an integration test if you want our CI/CD to probe the e2e, up to you).
Hi @jlizen, I think I may have a better approach. I noticed that Adapter, handles buffered responses by wrapping a This lets us remove the With this change, pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = Response<B>, Error = E>,
S::Future: Send + 'a,
B: Body + Unpin + Send + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<Error> + Send + Debug,
{
lambda_runtime::run(StreamAdapter::from(handler)).await
} And I can selectively choose which one I want like this: /// The main function to run the AWS Lambda
pub async fn async_main(tracer_provider: sdktrace::SdkTracerProvider) -> Result<(), BoxError> {
let (app, _) = build_service().await;
// For a buffered response:
// let handler = lambda_http::Adapter::from(app);
// For a streamed response:
let handler = lambda_http::StreamAdapter::from(app);
let runtime = Runtime::new(handler).layer(
// Create a tracing span for each Lambda invocation
OpenTelemetryLayer::new(|| {
if let Err(err) = tracer_provider.force_flush() {
eprintln!("Error flushing traces: {err:#?}");
}
})
.with_trigger(OpenTelemetryFaasTrigger::Http),
);
runtime.run().await
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the direction you went with this for the new API.
But, I would prefer to have somewhat duplicated code if that is what is needed to keep from adding a new allocation to the existing API. Especially given that there is no additional benefit to the caller. IE, let's leave the run_with_streaming_response
API not using the StreamAdapter
.
Presumably a decent chunk of it could be pulled out into a pure function to avoid too much duplication.
Okay I think I have it:
I added a parallel test to what |
📬 Issue #, if available:
The SDK supports streaming, but
lambda_http::Adapter
only handles buffered responses from Axum'sRouter
(i.e., aService
). This change allows direct streaming from Axum handlers/services, while specifying a custom runtime (i.e., with OTeL).For example,
lambda_http::run_with_streaming_response(app).await
doesn't allow you to specify a custom runtime. So this change helps to abstract out constructing the stream response and allowing a custom runtime.Related (ish):
✍️ Description of changes:
This PR exposes:
StreamAdapter
: Convert aService
into an AWS Lambda streaming response.Which was originally internal to the
run_with_streaming_response
function. There are no functional changes.There might be a more ergonomic way to expose an API like this, but I'm not aware. Happy to make changes as necessary so we're not exposing internals that may change in the future.
This is how you can use it with a custom runtime supporting OTeL:
🔏 By submitting this pull request
cargo +nightly fmt
.cargo clippy --fix
.