Skip to content

Add CborReaderProxy to support process CBOR stream in chunks #3939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
using Amazon.Runtime.Internal.Util;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Formats.Cbor;
using System.IO;

namespace Amazon.Extensions.CborProtocol.Internal
{
/// <summary>
/// A streaming CBOR reader that processes large CBOR data streams in chunks without loading
/// the entire payload into memory at once. This class wraps <see cref="CborReader"/>
/// to provide streaming capabilities while maintaining the same reading interface.
/// </summary>
public class CborStreamReader : IDisposable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class is missing documentation

{
/// <summary>
/// Enum to track the type of CBOR container (map or array)
/// for state management within the CborStreamReader.
/// </summary>
private enum CborContainerType
{
Map,
Array
}

private static readonly ILogger _logger = Logger.GetLogger(typeof(CborStreamReader));
private readonly Stack<CborContainerType> _nestingStack = new Stack<CborContainerType>();
private readonly Stream _stream;
private byte[] _buffer;
private CborReader _internalCborReader;
private int _currentChunkSize;

/// <summary>
/// Initializes a new instance of the <see cref="CborStreamReader"/> class that reads CBOR data
/// from the specified stream.
/// </summary>
/// <param name="stream">The input stream containing CBOR-formatted data.
/// The stream must be readable and remain open for the lifetime of the reader.</param>
/// <exception cref="ArgumentNullException">Thrown when the stream parameter is null.</exception>
/// <remarks>
/// The reader uses a configurable initial buffer size (from <see cref="AWSConfigs.CborReaderInitialBufferSize"/>)
/// and will automatically resize the buffer if needed to handle larger CBOR items.
/// </remarks>
public CborStreamReader(Stream stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class is missing documentation

{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_buffer = ArrayPool<byte>.Shared.Rent(AWSConfigs.CborReaderInitialBufferSize);

_currentChunkSize = _stream.Read(_buffer, 0, _buffer.Length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the intention is to fill the whole buffer or fill until the stream has no data left, this isn't guaranteed to happen with one Stream.read call, since the docs state that it could be less than the bytes you requested if the stream doesn't have that data available yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why we are using _currentChunkSize to know how much did we read and we create the ReadOnlyMemory from the start of the buffer to _currentChunkSize not to the buffer size.

From Read documentation:
image

var memorySlice = new ReadOnlyMemory<byte>(_buffer, 0, _currentChunkSize);

// We must allow multiple root values because when refilling the new chunk is just a fragment of the whole stream.
_internalCborReader = new CborReader(memorySlice, allowMultipleRootLevelValues: true);
}

/// <summary>
/// This method is called when a read operation fails because it needs more
/// data than is currently available in the buffer. It handles stitching leftover
/// data with a new chunk from the stream and, if necessary, resizing the buffer.
/// </summary>
/// <param name="bytesToSkip">Number of bytes to skip before reading new data (e.g., 1 to skip CBOR break byte 0xFF)</param>
private void RefillBuffer(int bytesToSkip = 0)
{
int leftoverBytesCount = _internalCborReader.BytesRemaining;

// Determine where the leftover bytes start
int leftoverStartIndex = _currentChunkSize - leftoverBytesCount;

// If we are skipping bytes, we need to move the start forward
leftoverStartIndex += bytesToSkip;
leftoverBytesCount = leftoverBytesCount - bytesToSkip;

// If the leftover data completely fills the buffer, grow it
if (leftoverBytesCount >= _buffer.Length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering.. is there a reason you used >= here? Isn't it impossible for the leftoverBytesCount to be greather than the buffer's length, since the reader will be instantiated with at most the data in the buffer? or am I missing something? Have you seen this case happen before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It didn't happen while testing, but just wanted to be safe incase we had an off by one error or some, at this case I would prefer expanding the buffer rather than go to the next case and lose data when copying to the start of the buffer.

{
int newSize = _buffer.Length * 2;
var newBuffer = ArrayPool<byte>.Shared.Rent(newSize);

Buffer.BlockCopy(_buffer, leftoverStartIndex, newBuffer, 0, leftoverBytesCount);
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = newBuffer;
}
else if (leftoverBytesCount > 0) // Shift leftovers (after skipping) to the beginning of the buffer
{
Buffer.BlockCopy(_buffer, leftoverStartIndex, _buffer, 0, leftoverBytesCount);
}

// Read from stream into buffer after leftovers
int bytesReadFromStream = _stream.Read(_buffer, leftoverBytesCount, _buffer.Length - leftoverBytesCount);

// Update the total size of valid data in our buffer.
_currentChunkSize = leftoverBytesCount + bytesReadFromStream;

// Check for a malformed stream: if we have leftovers but the stream is empty,
// it means the CBOR data was truncated.
if (bytesReadFromStream == 0 && leftoverBytesCount > 0)
{
throw new CborContentException("Stream ended unexpectedly with an incomplete CBOR data item.");
}

var newMemorySlice = new ReadOnlyMemory<byte>(_buffer, 0, _currentChunkSize);
_internalCborReader.Reset(newMemorySlice);

_logger.DebugFormat("Buffer refilled: read {0} byte(s), total in buffer now: {1}.", bytesReadFromStream, _currentChunkSize);
}

/// <summary>
/// Executes a CBOR read operation, refilling the buffer and retrying if a CborContentException is thrown.
/// </summary>
/// <typeparam name="T">The return type of the CBOR read operation.</typeparam>
/// <param name="readOperation">A delegate representing the read operation to execute.</param>
/// <returns>The result of the read operation.</returns>
/// <exception cref="CborContentException">
/// Thrown if too many retries are attempted or if the stream ends unexpectedly.
/// </exception>
private T ExecuteRead<T>(Func<CborReader, T> readOperation)
{
int maxRetries = 64;
int retryCount = 0;

while (true)
{
try
{
return readOperation(_internalCborReader);
}
catch (CborContentException ex)
{
if (_currentChunkSize == 0 && _internalCborReader.BytesRemaining == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious how we would be in a CborContentException is the _currentChunkSize == 0 and the BytesRemaining == 0? Wouldn't this indicate that we successfully read everything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and after reading everything the code that uses the CborStreamReader might try to read more data which means there is an issue with that code, at this case CborReader will throw this exception and we will throw it back for the other code to handle or fail.

{
// Fail fast if we’ve already consumed all input and nothing remains to refill.
throw;
}

if (++retryCount > maxRetries)
{
throw new CborContentException("Too many retries during CBOR stream parsing. Possible malformed or infinite data.", ex);
}

_logger.Debug(ex, "CborContentException caught (attempt #{0}), attempting to refill buffer.", retryCount);
// Attempt to refill and retry the operation.
RefillBuffer();
}
}
}


private bool IsNextByteEndOfContainer()
{
int unreadOffset = _currentChunkSize - _internalCborReader.BytesRemaining;
if (unreadOffset < _currentChunkSize)
return _buffer[unreadOffset] == 0xFF; // 0xFF indicates "break" in indefinite-length map/array

return false;
}

private void ReadEndContainer(CborContainerType expectedType, CborReaderState expectedEndState, Action<CborReader> readEndAction)
{
ExecuteRead(r =>
{
if (_nestingStack.Count == 0 || _nestingStack.Peek() != expectedType)
throw new CborContentException($"Unexpected end of {expectedType.ToString().ToLowerInvariant()}.");

var state = CborReaderState.Finished;
try
{
state = r.PeekState();
}
catch (CborContentException)
{
// This exception is expected in two cases:
// 1. When we've reached the end of the current CBOR buffer chunk and need to refill.
// 2. When the current buffer does not contain the start of the array/map we're trying to end.
// In both cases, the internal CborReader's state will be `Finished` and we will trigger a buffer refill.
// This is not a true error, but logging at Info level can help trace how we arrived at a certain state.
_logger.DebugFormat("CborContentException caught during PeekState while expecting end of {0}", expectedType.ToString().ToLowerInvariant());
}

if (state == expectedEndState)
{
readEndAction(r);
_nestingStack.Pop();
return true;
}

if (state == CborReaderState.Finished)
{
if (IsNextByteEndOfContainer())
{
RefillBuffer(1); // Skip the break marker (0xFF)
}
else
{
RefillBuffer(0); // This means we are in a definite-length map/array which doesn't end with 0xFF.
}
_nestingStack.Pop();
return true;
}

throw new CborContentException($"Expected end of {expectedType.ToString().ToLowerInvariant()} but could not parse it.");
});
}


public void ReadEndMap()
{
ReadEndContainer(CborContainerType.Map, CborReaderState.EndMap, (reader) => reader.ReadEndMap());
}

public void ReadEndArray()
{
ReadEndContainer(CborContainerType.Array, CborReaderState.EndArray, (r) => r.ReadEndArray());
}


public int? ReadStartMap() => ExecuteRead(reader =>
{
var result = reader.ReadStartMap();
_nestingStack.Push(CborContainerType.Map);
return result;
});

public int? ReadStartArray() => ExecuteRead(reader =>
{
var result = reader.ReadStartArray();
_nestingStack.Push(CborContainerType.Array);
return result;
});


public CborReaderState PeekState()
{
return ExecuteRead(r =>
{
try
{
return r.PeekState();
}
catch (CborContentException ex)
{
// Translate a Break code to the appropriate container end state
// based on our own nesting stack.
if (_nestingStack.Count > 0)
{
var inferredState = _nestingStack.Peek() == CborContainerType.Map
? CborReaderState.EndMap
: CborReaderState.EndArray;

_logger.Debug(ex, "CborContentException during PeekState interpreted as {0} due to nesting stack.", inferredState);
return inferredState;
}
// If our stack is empty, it's a genuine error.
throw;
}
});
}

public string ReadTextString() => ExecuteRead(r => r.ReadTextString());
public int ReadInt32() => ExecuteRead(r => r.ReadInt32());
public long ReadInt64() => ExecuteRead(r => r.ReadInt64());
public decimal ReadDecimal() => ExecuteRead(r => r.ReadDecimal());
public double ReadDouble() => ExecuteRead(r => r.ReadDouble());
public bool ReadBoolean() => ExecuteRead(r => r.ReadBoolean());
public float ReadSingle() => ExecuteRead(r => r.ReadSingle());
public CborTag ReadTag() => ExecuteRead(r => r.ReadTag());
public byte[] ReadByteString() => ExecuteRead(r => r.ReadByteString());
public void ReadNull() => ExecuteRead(r => { r.ReadNull(); return true; });
public void SkipValue() => ExecuteRead(r => { r.SkipValue(); return true; });
public int CurrentDepth => _internalCborReader.CurrentDepth;

public void Dispose()
{
if (_buffer != null)
{
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = null;
}
}
}
}
Loading