diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs index 52fe5c31..4747b041 100644 --- a/lambda-extension/src/extension.rs +++ b/lambda-extension/src/extension.rs @@ -225,6 +225,9 @@ where if let Some(mut log_processor) = self.logs_processor { trace!("Log processor found"); + + validate_buffering_configuration(self.log_buffering)?; + // Spawn task to run processor let addr = SocketAddr::from(([0, 0, 0, 0], self.log_port_number)); let make_service = service_fn(move |_socket: &AddrStream| { @@ -261,6 +264,9 @@ where if let Some(mut telemetry_processor) = self.telemetry_processor { trace!("Telemetry processor found"); + + validate_buffering_configuration(self.telemetry_buffering)?; + // Spawn task to run processor let addr = SocketAddr::from(([0, 0, 0, 0], self.telemetry_port_number)); let make_service = service_fn(move |_socket: &AddrStream| { diff --git a/lambda-extension/src/logs.rs b/lambda-extension/src/logs.rs index cf894100..20a095c4 100644 --- a/lambda-extension/src/logs.rs +++ b/lambda-extension/src/logs.rs @@ -5,6 +5,8 @@ use tokio::sync::Mutex; use tower::Service; use tracing::{error, trace}; +use crate::{Error, ExtensionError}; + /// Payload received from the Lambda Logs API /// See: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html#runtimes-logs-api-msg #[derive(Clone, Debug, Deserialize, PartialEq)] @@ -110,7 +112,7 @@ pub struct LogPlatformReportMetrics { /// Log buffering configuration. /// Allows Lambda to buffer logs before deliverying them to a subscriber. -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Copy, Clone)] #[serde(rename_all = "camelCase")] pub struct LogBuffering { /// The maximum time (in milliseconds) to buffer a batch. @@ -124,6 +126,40 @@ pub struct LogBuffering { pub max_items: usize, } +static LOG_BUFFERING_MIN_TIMEOUT_MS: usize = 25; +static LOG_BUFFERING_MAX_TIMEOUT_MS: usize = 30_000; +static LOG_BUFFERING_MIN_BYTES: usize = 262_144; +static LOG_BUFFERING_MAX_BYTES: usize = 1_048_576; +static LOG_BUFFERING_MIN_ITEMS: usize = 1_000; +static LOG_BUFFERING_MAX_ITEMS: usize = 10_000; + +impl LogBuffering { + fn validate(&self) -> Result<(), Error> { + if self.timeout_ms < LOG_BUFFERING_MIN_TIMEOUT_MS || self.timeout_ms > LOG_BUFFERING_MAX_TIMEOUT_MS { + let error = format!( + "LogBuffering validation error: Invalid timeout_ms: {}. Allowed values: Minumun: {}. Maximum: {}", + self.timeout_ms, LOG_BUFFERING_MIN_TIMEOUT_MS, LOG_BUFFERING_MAX_TIMEOUT_MS + ); + return Err(ExtensionError::boxed(error)); + } + if self.max_bytes < LOG_BUFFERING_MIN_BYTES || self.max_bytes > LOG_BUFFERING_MAX_BYTES { + let error = format!( + "LogBuffering validation error: Invalid max_bytes: {}. Allowed values: Minumun: {}. Maximum: {}", + self.max_bytes, LOG_BUFFERING_MIN_BYTES, LOG_BUFFERING_MAX_BYTES + ); + return Err(ExtensionError::boxed(error)); + } + if self.max_items < LOG_BUFFERING_MIN_ITEMS || self.max_items > LOG_BUFFERING_MAX_ITEMS { + let error = format!( + "LogBuffering validation error: Invalid max_items: {}. Allowed values: Minumun: {}. Maximum: {}", + self.max_items, LOG_BUFFERING_MIN_ITEMS, LOG_BUFFERING_MAX_ITEMS + ); + return Err(ExtensionError::boxed(error)); + } + Ok(()) + } +} + impl Default for LogBuffering { fn default() -> Self { LogBuffering { @@ -134,6 +170,18 @@ impl Default for LogBuffering { } } +/// Validate the `LogBuffering` configuration (if present) +/// +/// # Errors +/// +/// This function will return an error if `LogBuffering` is present and configured incorrectly +pub(crate) fn validate_buffering_configuration(log_buffering: Option) -> Result<(), Error> { + match log_buffering { + Some(log_buffering) => log_buffering.validate(), + None => Ok(()), + } +} + /// Wrapper function that sends logs to the subscriber Service /// /// This takes an `hyper::Request` and transforms it into `Vec` for the @@ -303,4 +351,67 @@ mod tests { }, ), } + + macro_rules! log_buffering_configuration_tests { + ($($name:ident: $value:expr,)*) => { + $( + #[test] + fn $name() { + let (input, expected) = $value; + let result = validate_buffering_configuration(input); + + if let Some(expected) = expected { + assert!(result.is_err()); + assert_eq!(result.unwrap_err().to_string(), expected.to_string()); + } else { + assert!(result.is_ok()); + } + + } + )* + } + } + + log_buffering_configuration_tests! { + log_buffer_configuration_none_success: ( + None, + None:: + ), + log_buffer_configuration_default_success: ( + Some(LogBuffering::default()), + None:: + ), + log_buffer_configuration_min_success: ( + Some(LogBuffering { timeout_ms: LOG_BUFFERING_MIN_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MIN_BYTES, max_items: LOG_BUFFERING_MIN_ITEMS }), + None:: + ), + log_buffer_configuration_max_success: ( + Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }), + None:: + ), + min_timeout_ms_error: ( + Some(LogBuffering { timeout_ms: LOG_BUFFERING_MIN_TIMEOUT_MS-1, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }), + Some(ExtensionError::boxed("LogBuffering validation error: Invalid timeout_ms: 24. Allowed values: Minumun: 25. Maximum: 30000")) + ), + max_timeout_ms_error: ( + Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS+1, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }), + Some(ExtensionError::boxed("LogBuffering validation error: Invalid timeout_ms: 30001. Allowed values: Minumun: 25. Maximum: 30000")) + ), + min_max_bytes_error: ( + Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MIN_BYTES-1, max_items: LOG_BUFFERING_MAX_ITEMS }), + Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_bytes: 262143. Allowed values: Minumun: 262144. Maximum: 1048576")) + ), + max_max_bytes_error: ( + Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES+1, max_items: LOG_BUFFERING_MAX_ITEMS }), + Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_bytes: 1048577. Allowed values: Minumun: 262144. Maximum: 1048576")) + ), + min_max_items_error: ( + Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MIN_ITEMS-1 }), + Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_items: 999. Allowed values: Minumun: 1000. Maximum: 10000")) + ), + max_max_items_error: ( + Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS+1 }), + Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_items: 10001. Allowed values: Minumun: 1000. Maximum: 10000")) + ), + } }