-
Notifications
You must be signed in to change notification settings - Fork 859
[API Proposal]: DataIngestion: Pipelines #6895
Description
Background and motivation
In the modern AI-driven world, organizations are increasingly leveraging large language models (LLMs) and retrieval-augmented generation (RAG) systems to extract insights from their vast document repositories. However, a fundamental challenge emerges: how to efficiently process, chunk, and retrieve information from diverse document formats while preserving semantic meaning and structural context.
API Proposal
Note: following proposal contains both the abstraction and implementations. It's expected that all abstractions (and option bags) will become part of the Microsoft.Extensions.DataIngestion.Abstractions without any dependencies.
The implementations will be part of the Microsoft.Extensions.DataIngestion project.
namespace Microsoft.Extensions.DataIngestion;
public abstract class IngestionDocumentReader
{
public Task<IngestionDocument> ReadAsync(FileInfo source, CancellationToken cancellationToken = default)
=> ReadAsync(source, source.FullName, cancellationToken);
public virtual Task<IngestionDocument> ReadAsync(FileInfo source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default);
public abstract Task<IngestionDocument> ReadAsync(Stream source, string mediaType, string identifier, CancellationToken cancellationToken = default);
}
public abstract class IngestionDocumentProcessor
{
public abstract Task<IngestionDocument> ProcessAsync(IngestionDocument document, CancellationToken cancellationToken = default);
}
public sealed class ImageAlternativeTextEnricher : IngestionDocumentProcessor
{
public ImageAlternativeTextEnricher(IChatClient chatClient, ChatOptions? chatOptions = null);
}
public sealed class IngestionChunk<T>
{
public T Content { get; }
public IngestionDocument Document { get; }
public int? TokenCount { get; }
public string? Context { get; }
public Dictionary<string, object> Metadata { get; }
public bool HasMetadata { get; }
public IngestionChunk(T content, Document document, int? tokenCount = null, string? context = null);
}
public abstract class DocumentChunker<T>
{
public abstract IAsyncEnumerable<IngestionChunk<T>> ProcessAsync(IngestionDocument document, CancellationToken cancellationToken = default);
}
// the class is not sealed on purpose, so creating a derived type specific for given chunker is possible
public class ChunkerOptions
{
// Default values come from https://learn.microsoft.com/en-us/azure/search/vector-search-how-to-chunk-documents#text-split-skill-example
/// <summary>
/// The maximum number of tokens allowed in each chunk. Default is 2000.
/// </summary>
public int MaxTokensPerChunk { get; set; } = 2_000;
/// <summary>
/// The number of overlapping tokens between consecutive chunks. Default is 500.
/// </summary>
public int OverlapTokens { get; set; } = 500;
}
// Splits a Document into chunks based on semantic similarity between its elements.
public sealed class SemanticSimilarityChunker : DocumentChunker<string>
{
public SemanticChunker(
IEmbeddingGenerator<string, Embedding<float>> embeddingGenerator,
Tokenizer tokenizer, ChunkerOptions? options = default,
float thresholdPercentile = 95.0f);
}
// Splits documents into chunks based on headers and their corresponding levels, preserving the header context.
public sealed class HeaderChunker : DocumentChunker<string>
{
public HeaderChunker(Tokenizer tokenizer, ChunkerOptions? options = default);
}
// Treats each section in a Document as a separate entity.
public sealed class SectionChunker : DocumentChunker<string>
{
public SectionChunker(Tokenizer tokenizer, ChunkerOptions? options = default);
}
public abstract class ChunkProcessor<T>
{
public abstract IAsyncEnumerable<IngestionChunk<T>> ProcessAsync(IAsyncEnumerable<IngestionChunk<T>> chunks, CancellationToken cancellationToken = default);
}
// Enriches chunks with summary text using an AI chat model.
public sealed class SummaryEnricher : ChunkProcessor<string>
{
public SummaryEnricher(IChatClient chatClient, ChatOptions? chatOptions = null, int maxWordCount = 100);
public static string MetadataKey => "summary";
}
// Enriches chunks with sentiment analysis using an AI chat model.
public sealed class SentimentEnricher : ChunkProcessor<string>
{
public SentimentEnricher(IChatClient chatClient, ChatOptions? chatOptions = null, double confidenceThreshold = 0.7);
public static string MetadataKey => "sentiment";
}
// Enriches document chunks with a classification label based on their content.
public sealed class ClassificationEnricher : ChunkProcessor<string>
{
public ClassificationEnricher(IChatClient chatClient, string[] predefinedClasses,
ChatOptions? chatOptions = null, string fallbackClass = "Unknown");
public static string MetadataKey => "classification";
}
// Enriches chunks with keyword extraction using an AI chat model.
public sealed class KeywordEnricher : ChunkProcessor<string>
{
// predefinedKeywords needs to be provided in explicit way, so the user is encouraged to think about it.
// And for example provide a closed set, so the results are more predictable.
public KeywordEnricher(IChatClient chatClient, string[]? predefinedKeywords,
ChatOptions? chatOptions = null, int maxKeywords = 5, double confidenceThreshold = 0.7)
public static string MetadataKey => "keywords";
}
public abstract class IngestionChunkWriter<T> : IDisposable
{
public abstract Task WriteAsync(IAsyncEnumerable<IngestionChunk<T>> chunks, CancellationToken cancellationToken = default);
public void Dispose();
protected virtual void Dispose(bool disposing);
}
public sealed class VectorStoreWriterOptions
{
public string CollectionName { get; set; } = "chunks";
// The distance function to use when creating the collection. When not provided, the default specific to given database will be used.
public string? DistanceFunction { get; set; }
// When enabled, the writer will delete the chunks for given document before inserting the new ones.
// So the ingestion will "replace" the existing chunks for the document with the new ones.
public bool IncrementalIngestion { get; set; } = false;
}
public sealed class VectorStoreWriter<T> : IngestionChunkWriter<T>
{
public VectorStoreWriter(VectorStore vectorStore, int dimensionCount, VectorStoreWriterOptions? options = default);
public VectorStoreCollection<object, Dictionary<string, object?>> VectorStoreCollection { get; }
}
public sealed class IngestionPipelineOptions
{
public string ActivitySourceName { get; set; } = "Experimental.Microsoft.Extensions.DataIngestion";
// In the future, we might extend it with sth like this:
// public int MaxDegreeOfParallelism { get; set; } = Environment.ProcessorCount;
}
public sealed class IngestionPipeline<T> : IDisposable
{
public IngestionPipeline(
IngestionDocumentReader reader,
IngestionChunker<T> chunker,
IngestionChunkWriter<T> writer,
IngestionPipelineOptions? options = default,
ILoggerFactory? loggerFactory = default);
public IList<IngestionDocumentProcessor> DocumentProcessors { get; }
public IList<IngestionChunkProcessor<T>> ChunkProcessors { get; }
public async Task ProcessAsync(DirectoryInfo directory, string searchPattern = "*.*", SearchOption searchOption = SearchOption.TopDirectoryOnly, CancellationToken cancellationToken = default);
public async Task ProcessAsync(IEnumerable<FileInfo> files, CancellationToken cancellationToken = default);
public void Dispose();
}API Usage
Sample that leverages Azure Document Intelligence, OpenAI and SQL Server to process all PDF files from current directory:
DocumentIntelligenceReader reader = new(new DocumentIntelligenceClient(
new Uri(Environment.GetEnvironmentVariable("AZURE_DOCUMENT_INT_ENDPOINT")!),
new AzureKeyCredential(Environment.GetEnvironmentVariable("AZURE_DOCUMENT_INT_KEY")!)));
AzureOpenAIClient llm = new(
new Uri(Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")!),
new AzureKeyCredential(Environment.GetEnvironmentVariable("AZURE_OPENAI_API_KEY")!));
var embeddingGenerator = llm.GetEmbeddingClient("text-embedding-3-small").AsIEmbeddingGenerator();
SemanticSimilarityChunker chunker = new(embeddingGenerator, TiktokenTokenizer.CreateForModel("gpt-4"));
IChatClient chatClient = llm.GetChatClient("gpt-4.1").AsIChatClient();
using SqlServerVectorStore sqlServerVectorStore = new(
Environment.GetEnvironmentVariable("SQL_SERVER_CONNECTION_STRING")!,
new() { EmbeddingGenerator = embeddingGenerator });
using VectorStoreWriter<string> writer = new(sqlServerVectorStore, 1536 /* text-embedding-3-small */);
using DocumentPipeline<string> pipeline = new(reader, chunker, writer)
{
DocumentProcessors = { new AlternativeTextEnricher(chatClient) },
ChunkProcessors = { new SummaryEnricher(chatClient), new SentimentEnricher(chatClient) }
};
await pipeline.ProcessAsync(new DirectoryInfo("."), "*.pdf");Alternative Designs
No response
Risks
Not every DocumentReader implementation wraps a service that supports Uri as an input, this enforces them to for example download the content and store them in a temporary file for the time of parsing.