Skip to content
119 changes: 108 additions & 11 deletions lightrag/api/routers/document_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,27 +1052,124 @@ def _extract_pptx(file_bytes: bytes) -> str:


def _extract_xlsx(file_bytes: bytes) -> str:
"""Extract XLSX content (synchronous).
"""Extract XLSX content in tab-delimited format with clear sheet separation.

This function processes Excel workbooks and converts them to a structured text format
suitable for LLM prompts and RAG systems. Each sheet is clearly delimited with
separator lines, and special characters are escaped to preserve the tab-delimited structure.

Features:
- Each sheet is wrapped with '====================' separators for visual distinction
- Special characters (tabs, newlines, backslashes) are escaped to prevent structure corruption
- Column alignment is preserved across all rows to maintain tabular structure
- Empty rows are preserved as blank lines to maintain row structure
- Two-pass processing: determines max column width, then extracts with consistent alignment

Args:
file_bytes: XLSX file content as bytes

Returns:
str: Extracted text content
str: Extracted text content with all sheets in tab-delimited format.
Format: Sheet separators, sheet name, then tab-delimited rows.

Example output:
==================== Sheet: Data ====================
Name\tAge\tCity
Alice\t30\tNew York
Bob\t25\tLondon

==================== Sheet: Summary ====================
Total\t2
====================
"""
from openpyxl import load_workbook # type: ignore

xlsx_file = BytesIO(file_bytes)
wb = load_workbook(xlsx_file)
content = ""
for sheet in wb:
content += f"Sheet: {sheet.title}\n"
for row in sheet.iter_rows(values_only=True):
content += (
"\t".join(str(cell) if cell is not None else "" for cell in row) + "\n"
)
content += "\n"
return content

def escape_cell(cell_value: str | int | float | None) -> str:
"""Escape characters that would break tab-delimited layout.

Escape order is critical: backslashes first, then tabs/newlines.
This prevents double-escaping issues.

Args:
cell_value: The cell value to escape (can be None, str, int, or float)

Returns:
str: Escaped cell value safe for tab-delimited format
"""
if cell_value is None:
return ""
text = str(cell_value)
# CRITICAL: Escape backslash first to avoid double-escaping
return (
text.replace("\\", "\\\\") # Must be first: \ -> \\
.replace("\t", "\\t") # Tab -> \t (visible)
.replace("\r\n", "\\n") # Windows newline -> \n
.replace("\r", "\\n") # Mac newline -> \n
.replace("\n", "\\n") # Unix newline -> \n
)

def escape_sheet_title(title: str) -> str:
"""Escape sheet title to prevent formatting issues in separators.

Args:
title: Original sheet title

Returns:
str: Sanitized sheet title with tabs/newlines replaced
"""
return str(title).replace("\n", " ").replace("\t", " ").replace("\r", " ")

content_parts: list[str] = []
sheet_separator = "=" * 20

for idx, sheet in enumerate(wb):
if idx > 0:
content_parts.append("") # Blank line between sheets for readability

# Escape sheet title to handle edge cases with special characters
safe_title = escape_sheet_title(sheet.title)
content_parts.append(f"{sheet_separator} Sheet: {safe_title} {sheet_separator}")

# Two-pass approach to preserve column alignment:
# Pass 1: Determine the maximum column width for this sheet
max_columns = 0
all_rows = list(sheet.iter_rows(values_only=True))

for row in all_rows:
last_nonempty_idx = -1
for idx, cell in enumerate(row):
# Check if cell has meaningful content (not None or empty string)
if cell is not None and str(cell).strip():
last_nonempty_idx = idx

if last_nonempty_idx >= 0:
max_columns = max(max_columns, last_nonempty_idx + 1)

# Pass 2: Extract rows with consistent width to preserve column alignment
for row in all_rows:
row_parts = []

# Build row up to max_columns width
for idx in range(max_columns):
if idx < len(row):
row_parts.append(escape_cell(row[idx]))
else:
row_parts.append("") # Pad short rows

# Check if row is completely empty
if all(part == "" for part in row_parts):
# Preserve empty rows as blank lines (maintains row structure)
content_parts.append("")
else:
# Join all columns to maintain consistent column count
content_parts.append("\t".join(row_parts))

# Final separator for symmetry (makes parsing easier)
content_parts.append(sheet_separator)
return "\n".join(content_parts)


async def pipeline_enqueue_file(
Expand Down
Loading