Skip to content

Commit 6a80340

Browse files
Add CborReaderProxy to support process CBOR stream in chunks
1 parent fc01b7b commit 6a80340

File tree

5 files changed

+729
-73
lines changed

5 files changed

+729
-73
lines changed
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
using Amazon.Runtime.Internal.Util;
16+
using System;
17+
using System.Buffers;
18+
using System.Collections.Generic;
19+
using System.Formats.Cbor;
20+
using System.IO;
21+
22+
namespace Amazon.Extensions.CborProtocol.Internal
23+
{
24+
public class CborReaderProxy : IDisposable
25+
{
26+
/// <summary>
27+
/// Enum to track the type of CBOR container (map or array)
28+
/// for state management within the CborReaderProxy.
29+
/// </summary>
30+
private enum CborContainerType
31+
{
32+
Map,
33+
Array
34+
}
35+
36+
private static readonly ILogger _logger = Logger.GetLogger(typeof(CborReaderProxy));
37+
private readonly Stack<CborContainerType> _nestingStack = new Stack<CborContainerType>();
38+
private readonly Stream _stream;
39+
private byte[] _buffer;
40+
private CborReader _internalCborReader;
41+
private int _currentChunkSize;
42+
43+
public CborReaderProxy(Stream stream)
44+
{
45+
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
46+
_buffer = ArrayPool<byte>.Shared.Rent(AWSConfigs.CborReaderInitialBufferSize);
47+
48+
_currentChunkSize = _stream.Read(_buffer, 0, AWSConfigs.CborReaderInitialBufferSize);
49+
var memorySlice = new ReadOnlyMemory<byte>(_buffer, 0, _currentChunkSize);
50+
51+
// We must allow multiple root values because when refilling the new chunk is just a fragment of the whole stream.
52+
_internalCborReader = new CborReader(memorySlice, allowMultipleRootLevelValues: true);
53+
}
54+
55+
/// <summary>
56+
/// This method is called when a read operation fails because it needs more
57+
/// data than is currently available in the buffer. It handles stitching leftover
58+
/// data with a new chunk from the stream and, if necessary, resizing the buffer.
59+
/// </summary>
60+
/// <param name="bytesToSkip">Number of bytes to skip before reading new data (e.g., 1 to skip CBOR break byte 0xFF)</param>
61+
private void RefillBuffer(int bytesToSkip = 0)
62+
{
63+
int leftoverBytesCount = _internalCborReader.BytesRemaining;
64+
65+
// Determine where the leftover bytes start
66+
int leftoverStartIndex = _currentChunkSize - leftoverBytesCount;
67+
68+
// If we are skipping bytes, we need to move the start forward
69+
leftoverStartIndex += bytesToSkip;
70+
leftoverBytesCount = leftoverBytesCount - bytesToSkip;
71+
72+
// If the leftover data completely fills the buffer, grow it
73+
if (leftoverBytesCount >= _buffer.Length)
74+
{
75+
int newSize = _buffer.Length * 2;
76+
var newBuffer = ArrayPool<byte>.Shared.Rent(newSize);
77+
78+
Buffer.BlockCopy(_buffer, leftoverStartIndex, newBuffer, 0, leftoverBytesCount);
79+
ArrayPool<byte>.Shared.Return(_buffer);
80+
_buffer = newBuffer;
81+
}
82+
else if (leftoverBytesCount > 0) // Shift leftovers (after skipping) to the beginning of the buffer
83+
{
84+
Buffer.BlockCopy(_buffer, leftoverStartIndex, _buffer, 0, leftoverBytesCount);
85+
}
86+
87+
// Read from stream into buffer after leftovers
88+
int bytesReadFromStream = _stream.Read(_buffer, leftoverBytesCount, _buffer.Length - leftoverBytesCount);
89+
90+
// Update the total size of valid data in our buffer.
91+
_currentChunkSize = leftoverBytesCount + bytesReadFromStream;
92+
93+
// Check for a malformed stream: if we have leftovers but the stream is empty,
94+
// it means the CBOR data was truncated.
95+
if (bytesReadFromStream == 0 && leftoverBytesCount > 0)
96+
{
97+
throw new CborContentException("Stream ended unexpectedly with an incomplete CBOR data item.");
98+
}
99+
100+
var newMemorySlice = new ReadOnlyMemory<byte>(_buffer, 0, _currentChunkSize);
101+
_internalCborReader.Reset(newMemorySlice);
102+
103+
_logger.InfoFormat("Buffer refilled: read {0} byte(s), total in buffer now: {1}.", bytesReadFromStream, _currentChunkSize);
104+
}
105+
106+
/// <summary>
107+
/// Executes a CBOR read operation, refilling the buffer and retrying if a CborContentException is thrown.
108+
/// </summary>
109+
/// <typeparam name="T">The return type of the CBOR read operation.</typeparam>
110+
/// <param name="readOperation">A delegate representing the read operation to execute.</param>
111+
/// <returns>The result of the read operation.</returns>
112+
/// <exception cref="CborContentException">
113+
/// Thrown if too many retries are attempted or if the stream ends unexpectedly.
114+
/// </exception>
115+
private T ExecuteRead<T>(Func<CborReader, T> readOperation)
116+
{
117+
int maxRetries = 64;
118+
int retryCount = 0;
119+
120+
while (true)
121+
{
122+
try
123+
{
124+
return readOperation(_internalCborReader);
125+
}
126+
catch (CborContentException ex)
127+
{
128+
if (_currentChunkSize == 0 && _internalCborReader.BytesRemaining == 0)
129+
{
130+
// Fail fast if we’ve already consumed all input and nothing remains to refill.
131+
throw;
132+
}
133+
134+
if (++retryCount > maxRetries)
135+
{
136+
throw new CborContentException("Too many retries during CBOR stream parsing. Possible malformed or infinite data.");
137+
}
138+
139+
_logger.Debug(ex, $"CborContentException caught (attempt #{retryCount}), attempting to refill buffer.");
140+
// Attempt to refill and retry the operation.
141+
RefillBuffer();
142+
}
143+
}
144+
}
145+
146+
147+
private bool IsNextByteEndOfContainer()
148+
{
149+
int unreadOffset = _currentChunkSize - _internalCborReader.BytesRemaining;
150+
if (unreadOffset < _currentChunkSize)
151+
return _buffer[unreadOffset] == 0xFF; // 0xFF indicates "break" in indefinite-length map/array
152+
153+
return false;
154+
}
155+
156+
private void ReadEndContainer(CborContainerType expectedType, CborReaderState expectedEndState, Action<CborReader> readEndAction)
157+
{
158+
ExecuteRead(r =>
159+
{
160+
if (_nestingStack.Count == 0 || _nestingStack.Peek() != expectedType)
161+
throw new CborContentException($"Unexpected end of {expectedType.ToString().ToLowerInvariant()}.");
162+
163+
var state = CborReaderState.Finished;
164+
try
165+
{
166+
state = r.PeekState();
167+
}
168+
catch (CborContentException)
169+
{
170+
_logger.InfoFormat($"CborContentException caught during PeekState while expecting end of {expectedType.ToString().ToLowerInvariant()}");
171+
}
172+
173+
if (state == expectedEndState)
174+
{
175+
readEndAction(r);
176+
_nestingStack.Pop();
177+
return true;
178+
}
179+
180+
if (state == CborReaderState.Finished)
181+
{
182+
if (IsNextByteEndOfContainer())
183+
{
184+
RefillBuffer(1); // Skip 0xFF
185+
}
186+
else
187+
{
188+
RefillBuffer(0); // This means we are in a definite-length map/array which doesn't end with 0xFF.
189+
}
190+
_nestingStack.Pop();
191+
return true;
192+
}
193+
194+
throw new CborContentException($"Expected end of {expectedType.ToString().ToLowerInvariant()} but could not parse it.");
195+
});
196+
}
197+
198+
199+
public void ReadEndMap()
200+
{
201+
ReadEndContainer(CborContainerType.Map, CborReaderState.EndMap, (reader) => reader.ReadEndMap());
202+
}
203+
204+
public void ReadEndArray()
205+
{
206+
ReadEndContainer(CborContainerType.Array, CborReaderState.EndArray, (r) => r.ReadEndArray());
207+
}
208+
209+
210+
public int? ReadStartMap() => ExecuteRead(reader =>
211+
{
212+
var result = reader.ReadStartMap();
213+
_nestingStack.Push(CborContainerType.Map);
214+
return result;
215+
});
216+
217+
public int? ReadStartArray() => ExecuteRead(reader =>
218+
{
219+
var result = reader.ReadStartArray();
220+
_nestingStack.Push(CborContainerType.Array);
221+
return result;
222+
});
223+
224+
225+
public CborReaderState PeekState()
226+
{
227+
return ExecuteRead(r =>
228+
{
229+
try
230+
{
231+
return r.PeekState();
232+
}
233+
catch (CborContentException ex)
234+
{
235+
// Translate a Break code to the appropriate container end state
236+
// based on our own nesting stack.
237+
if (_nestingStack.Count > 0)
238+
{
239+
var inferredState = _nestingStack.Peek() == CborContainerType.Map
240+
? CborReaderState.EndMap
241+
: CborReaderState.EndArray;
242+
243+
_logger.Debug(ex, $"CborContentException during PeekState interpreted as {inferredState} due to nesting stack.");
244+
return inferredState;
245+
}
246+
// If our stack is empty, it's a genuine error.
247+
throw;
248+
}
249+
});
250+
}
251+
252+
public string ReadTextString() => ExecuteRead(r => r.ReadTextString());
253+
public int ReadInt32() => ExecuteRead(r => r.ReadInt32());
254+
public long ReadInt64() => ExecuteRead(r => r.ReadInt64());
255+
public decimal ReadDecimal() => ExecuteRead(r => r.ReadDecimal());
256+
public double ReadDouble() => ExecuteRead(r => r.ReadDouble());
257+
public bool ReadBoolean() => ExecuteRead(r => r.ReadBoolean());
258+
public float ReadSingle() => ExecuteRead(r => r.ReadSingle());
259+
public CborTag ReadTag() => ExecuteRead(r => r.ReadTag());
260+
public byte[] ReadByteString() => ExecuteRead(r => r.ReadByteString());
261+
public void ReadNull() => ExecuteRead(r => { r.ReadNull(); return true; });
262+
public void SkipValue() => ExecuteRead(r => { r.SkipValue(); return true; });
263+
public int CurrentDepth => _internalCborReader.CurrentDepth;
264+
265+
public void Dispose()
266+
{
267+
if (_buffer != null)
268+
{
269+
ArrayPool<byte>.Shared.Return(_buffer);
270+
_buffer = null;
271+
}
272+
}
273+
}
274+
}

extensions/src/AWSSDK.Extensions.CborProtocol/Internal/Transform/CborUnmarshallerContext.cs

Lines changed: 7 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ namespace Amazon.Extensions.CborProtocol.Internal.Transform
2929
public class CborUnmarshallerContext : UnmarshallerContext
3030
{
3131
private bool disposed = false;
32-
private byte[] rentedBuffer = null;
3332
private readonly Stack<string> _pathStack = new Stack<string>();
3433

3534
// There isn't a direct way to check if the reader is at the start of the document,
@@ -39,7 +38,8 @@ public class CborUnmarshallerContext : UnmarshallerContext
3938
public override bool IsEndElement => Reader.PeekState() == CborReaderState.Finished;
4039
public override bool IsStartElement => Reader.PeekState() == CborReaderState.StartMap;
4140

42-
public CborReader Reader { get; set; }
41+
public Stream Stream { get; set; }
42+
public CborReaderProxy Reader { get; set; }
4343
public override string CurrentPath => string.Join("/", _pathStack.Reverse());
4444
public override int CurrentDepth => Reader.CurrentDepth;
4545

@@ -76,8 +76,6 @@ IRequestContext requestContext
7676
responseStream = WrappingStream;
7777
}
7878

79-
long? streamSize = null;
80-
8179
if (responseData != null)
8280
{
8381
bool parsedContentLengthHeader = long.TryParse(
@@ -105,79 +103,14 @@ out long contentLength
105103
requestContext
106104
);
107105
}
108-
if (parsedContentLengthHeader && responseData.ContentLength == contentLength)
109-
{
110-
streamSize = contentLength;
111-
}
112106
}
113107

114108
this.WebResponseData = responseData;
115109
this.MaintainResponseBody = maintainResponseBody;
116110
this.IsException = isException;
117111

118-
Reader = CreateCborReaderFromStream(
119-
FlexibleChecksumStream ?? CrcStream ?? responseStream,
120-
streamSize
121-
);
122-
}
123-
124-
private CborReader CreateCborReaderFromStream(Stream stream, long? streamSize = null)
125-
{
126-
int totalRead = 0;
127-
if (streamSize.HasValue)
128-
{
129-
// TODO: update this method to read the stream in chunks incase the stream is larger than int.MaxValue
130-
// If we know the size, we can read directly into a buffer of exact size
131-
rentedBuffer = ArrayPool<byte>.Shared.Rent((int)streamSize.Value);
132-
133-
while (totalRead < streamSize.Value)
134-
{
135-
int bytesRead = stream.Read(rentedBuffer, totalRead, (int)(streamSize.Value - totalRead));
136-
if (bytesRead == 0)
137-
{
138-
// If no bytes are read, it means we've reached the end of the stream before reading the whole streamSize.
139-
throw new EndOfStreamException($"Expected {streamSize.Value} bytes but only read {totalRead}.");
140-
}
141-
142-
totalRead += bytesRead;
143-
}
144-
}
145-
else
146-
{
147-
const int InitialBufferSize = 1024 * 8; // 8kb
148-
rentedBuffer = ArrayPool<byte>.Shared.Rent(InitialBufferSize);
149-
150-
while (true)
151-
{
152-
int read = stream.Read(rentedBuffer, totalRead, rentedBuffer.Length - totalRead);
153-
if (read == 0)
154-
break;
155-
156-
totalRead += read;
157-
158-
if (totalRead == rentedBuffer.Length)
159-
{
160-
// Expand the buffer size by doubling it
161-
var newBuffer = ArrayPool<byte>.Shared.Rent(rentedBuffer.Length * 2);
162-
try
163-
{
164-
Buffer.BlockCopy(rentedBuffer, 0, newBuffer, 0, totalRead);
165-
}
166-
catch
167-
{
168-
ArrayPool<byte>.Shared.Return(newBuffer);
169-
ArrayPool<byte>.Shared.Return(rentedBuffer);
170-
rentedBuffer = null;
171-
throw;
172-
}
173-
ArrayPool<byte>.Shared.Return(rentedBuffer);
174-
rentedBuffer = newBuffer;
175-
}
176-
}
177-
}
178-
179-
var actualBytes = new ReadOnlyMemory<byte>(rentedBuffer, 0, totalRead);
180-
return new CborReader(actualBytes);
112+
Stream = FlexibleChecksumStream ?? CrcStream ?? responseStream;
113+
Reader = new CborReaderProxy(Stream);
181114
}
182115

183116
/// <summary>
@@ -204,9 +137,10 @@ protected override void Dispose(bool disposing)
204137
{
205138
if (!this.disposed)
206139
{
207-
if (disposing && rentedBuffer != null)
140+
if (disposing && Reader != null)
208141
{
209-
ArrayPool<byte>.Shared.Return(rentedBuffer);
142+
Reader.Dispose();
143+
Reader = null;
210144
}
211145
disposed = true;
212146
}

0 commit comments

Comments
 (0)