Skip to content

Add async iterator on result #234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions api/src/DuckDBResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,35 @@ export class DuckDBResult {
public async getRowObjectsJson(): Promise<Record<string, Json>[]> {
return this.convertRowObjects(JsonDuckDBValueConverter);
}

/**
* Async iterator that yields rows one by one as objects
* Usage: for await (const row of result) { ... }
*/
public async *[Symbol.asyncIterator](): AsyncIterableIterator<Record<string, JS>> {
const columnNames = this.deduplicatedColumnNames();
while (true) {
const chunk = await this.fetchChunk();
if (!chunk || chunk.rowCount === 0) {
break;
}
// Pre-fetch all column vectors for this chunk for efficiency
const columnVectors = [];
for (let colIndex = 0; colIndex < this.columnCount; colIndex++) {
columnVectors.push(chunk.getColumnVector(colIndex));
}
// Yield each row in the chunk
for (let rowIndex = 0; rowIndex < chunk.rowCount; rowIndex++) {
const row: Record<string, JS> = {};

for (let colIndex = 0; colIndex < this.columnCount; colIndex++) {
const columnName = columnNames[colIndex];
const value = columnVectors[colIndex].getItem(rowIndex);
row[columnName] = JSDuckDBValueConverter(value, columnVectors[colIndex].type, JSDuckDBValueConverter);
}

yield row;
}
}
}
}
2 changes: 1 addition & 1 deletion api/test/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async function sleep(ms: number): Promise<void> {
});
}

async function withConnection(
export async function withConnection(
fn: (connection: DuckDBConnection) => Promise<void>
) {
const instance = await DuckDBInstance.create();
Expand Down
117 changes: 117 additions & 0 deletions api/test/async-iterator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import { describe, test, assert } from 'vitest';
import { withConnection } from './api.test';

describe('DuckDBResult async iterator', () => {

test('should iterate over basic query results', async () => {
await withConnection(async (connection) => {
await connection.run('CREATE TABLE test_table (id INTEGER, name VARCHAR)');
await connection.run("INSERT INTO test_table VALUES (1, 'Alice'), (2, 'Bob')");

const result = await connection.run('SELECT * FROM test_table ORDER BY id');

const rows: any[] = [];
for await (const row of result) {
rows.push(row);
}

assert.equal(rows.length, 2);
assert.deepEqual(rows[0], { id: 1, name: 'Alice' });
assert.deepEqual(rows[1], { id: 2, name: 'Bob' });
});
});

test('should handle empty result sets', async () => {
await withConnection(async (connection) => {
await connection.run('CREATE TABLE empty_table (id INTEGER)');

const result = await connection.run('SELECT * FROM empty_table');

const rows: any[] = [];
for await (const row of result) {
rows.push(row);
}

assert.equal(rows.length, 0);
});
});

test('should allow early termination with break', async () => {
await withConnection(async (connection) => {
await connection.run('CREATE TABLE numbers (value INTEGER)');
await connection.run('INSERT INTO numbers SELECT range FROM range(10)');

const result = await connection.run('SELECT * FROM numbers ORDER BY value');

const rows: any[] = [];
for await (const row of result) {
rows.push(row);
if (rows.length >= 3) {
break;
}
}

assert.equal(rows.length, 3);
assert.equal(rows[0].value, 0);
assert.equal(rows[2].value, 2);
});
});

test('should handle null values', async () => {
await withConnection(async (connection) => {
await connection.run('CREATE TABLE nullable_table (id INTEGER, value VARCHAR)');
await connection.run("INSERT INTO nullable_table VALUES (1, 'test'), (2, null)");

const result = await connection.run('SELECT * FROM nullable_table ORDER BY id');

const rows: any[] = [];
for await (const row of result) {
rows.push(row);
}

assert.equal(rows.length, 2);
assert.equal(rows[0].value, 'test');
assert.isNull(rows[1].value);
});
});

test('should fetch chunks progressively', async () => {
await withConnection(async (connection) => {
const result = await connection.run('SELECT range as value FROM range(10000)');

let count = 0;
let chunkCount = 0;

// Track chunks
const originalFetchChunk = result.fetchChunk.bind(result);
result.fetchChunk = async function () {
const chunk = await originalFetchChunk();
if (chunk && chunk.rowCount > 0) {
chunkCount++;
}
return chunk;
};

assert.equal(chunkCount, 0);

// Use the SAME iterator instance
const iterator = result[Symbol.asyncIterator]();

// Get first row
const firstResult = await iterator.next();
assert.equal(chunkCount, 1);
assert.isFalse(firstResult.done);
count++;

// Continue with the same iterator
let nextResult = await iterator.next();
while (!nextResult.done) {
count++;
nextResult = await iterator.next();
}

assert.equal(count, 10000);
assert.isAbove(chunkCount, 1);
});
});
});