Skip to content

Feat/parallel exec nik #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 50 additions & 13 deletions benchmarking/src/app/benchmarking-tests/dbm-benchmarking.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import axios from 'axios';
import { ChildProcess, spawn } from 'child_process';
import * as puppeteer from 'puppeteer';

describe('Benchmarking DBMs', () => {
let page: puppeteer.Page;
let browser: puppeteer.Browser;
let appProcess: ChildProcess;
let appProcessRunner: ChildProcess;

let totalTimeForMemoryDB: number;

Expand All @@ -14,25 +14,40 @@ describe('Benchmarking DBMs', () => {
stdio: 'inherit',
});

// Wait for the server to start
appProcessRunner = spawn('npx', ['nx', 'serve', 'meerkat-browser-runner'], {
stdio: 'inherit',
});

browser = await puppeteer.launch({
headless: 'new',
});
page = await browser.newPage();

//Wait for the server to start by visiting the page
let serverStarted = false;
while (!serverStarted) {
console.info('Waiting for server to start');
try {
const response = await axios.get('http://localhost:4200');
if (response.status === 200) {
serverStarted = true;
} else {
await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait for 1 second before retrying
}
await page.goto('http://localhost:4200');
serverStarted = true;
} catch (error) {
console.info('Server not started yet', error);
await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait for 1 second before retrying
}
}

browser = await puppeteer.launch({
headless: 'new',
});
page = await browser.newPage();
let serverStartedRunner = false;
while (!serverStartedRunner) {
console.info('Waiting for server to start');

try {
await page.goto('http://localhost:4205');
serverStartedRunner = true;
} catch (error) {
console.info('Server not started yet', error);
await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait for 1 second before retrying
}
}
}, 30000);

it('Benchmark raw duckdb with memory sequence duckdb', async () => {
Expand Down Expand Up @@ -93,8 +108,30 @@ describe('Benchmarking DBMs', () => {
expect(totalTimeForIndexedDBM).toBeLessThan(totalTimeForMemoryDB * 1.3);
}, 300000);

it('Benchmark parallel dbm duckdb', async () => {
await page.goto('http://localhost:4200/parallel-dbm');
/**
* wait for total time to be render
*/
await page.waitForSelector('#total_time', { timeout: 300000 });
/**
* Get the total time as number
*/
const totalTimeForParallelDBM = await page.$eval('#total_time', (el) =>
Number(el.textContent)
);

console.info('totalTimeForParallelDBM', totalTimeForParallelDBM);

/**
* The total diff between indexed dbm and memory dbm should be less than 30%
*/
expect(totalTimeForParallelDBM).toBeLessThan(totalTimeForMemoryDB * 1.3);
}, 300000);

afterAll(async () => {
await browser.close();
appProcess.kill('SIGTERM');
appProcessRunner.kill('SIGTERM');
await browser.close();
});
});
115 changes: 90 additions & 25 deletions benchmarking/src/app/constants.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,101 @@
export const TEST_QUERIES = [
`
WITH group_by_query AS (
SELECT
hvfhs_license_num,
COUNT(*)
FROM
taxi.parquet
GROUP BY
hvfhs_license_num
),

full_query AS (
SELECT
*
FROM
taxi.parquet
)

SELECT
COUNT(*)
FROM
group_by_query
LEFT JOIN
full_query
ON
group_by_query.hvfhs_license_num = full_query.hvfhs_license_num
LIMIT 1
`,
'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi.parquet',
"SELECT * FROM taxi.parquet WHERE originating_base_num='B03404' LIMIT 100",
'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi.parquet GROUP BY hvfhs_license_num',
'SELECT * as total_count FROM taxi.parquet ORDER BY bcf LIMIT 100',

`
WITH group_by_query AS (
SELECT
hvfhs_license_num,
COUNT(*)
FROM
taxi.parquet
GROUP BY
hvfhs_license_num
),
WITH group_by_query AS (
SELECT
hvfhs_license_num,
COUNT(*)
FROM
taxi.parquet
GROUP BY
hvfhs_license_num
),

full_query AS (
SELECT
*
FROM
taxi.parquet
)
full_query AS (
SELECT
*
FROM
taxi.parquet
)

SELECT
COUNT(*)
FROM
group_by_query
LEFT JOIN
full_query
ON
group_by_query.hvfhs_license_num = full_query.hvfhs_license_num
LIMIT 1
`,
SELECT
COUNT(*)
FROM
group_by_query
LEFT JOIN
full_query
ON
group_by_query.hvfhs_license_num = full_query.hvfhs_license_num
LIMIT 1
`,
'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi.parquet',
"SELECT * FROM taxi.parquet WHERE originating_base_num='B03404' LIMIT 100",
'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi.parquet GROUP BY hvfhs_license_num',
'SELECT * as total_count FROM taxi.parquet ORDER BY bcf LIMIT 100',
`
WITH group_by_query AS (
SELECT
hvfhs_license_num,
COUNT(*)
FROM
taxi.parquet
GROUP BY
hvfhs_license_num
),

full_query AS (
SELECT
*
FROM
taxi.parquet
)

SELECT
COUNT(*)
FROM
group_by_query
LEFT JOIN
full_query
ON
group_by_query.hvfhs_license_num = full_query.hvfhs_license_num
LIMIT 1
`,
'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi.parquet',
"SELECT * FROM taxi.parquet WHERE originating_base_num='B03404' LIMIT 100",
'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi.parquet GROUP BY hvfhs_license_num',
'SELECT * as total_count FROM taxi.parquet ORDER BY bcf LIMIT 100',
// 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxijson.parquet',
// 'SELECT * FROM taxijson.parquet WHERE price >= 1.0005812645 LIMIT 100',
// 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxijson.parquet GROUP BY order_count',
Expand Down
26 changes: 13 additions & 13 deletions benchmarking/src/app/dbm-context/parallel-dbm-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@ export const ParallelDBMProvider = ({
}: {
children: JSX.Element;
}) => {
const fileManagerRef = useRef<ParallelMemoryFileManager | null>(null);
const [dbm, setdbm] = useState<DBMParallel | null>(null);
const instanceManagerRef = useRef<InstanceManager>(new InstanceManager());

const dbState = useAsyncDuckDB();

useClassicEffect(() => {
if (!dbState) {
return;
}
fileManagerRef.current = new ParallelMemoryFileManager({
const fileManagerRef = useRef<ParallelMemoryFileManager>(
new ParallelMemoryFileManager({
instanceManager: instanceManagerRef.current,
fetchTableFileBuffers: async (table) => {
return [];
Expand All @@ -34,12 +27,19 @@ export const ParallelDBMProvider = ({
onEvent: (event) => {
console.info(event);
},
});
})
);

const dbState = useAsyncDuckDB();

useClassicEffect(() => {
if (!dbState) {
return;
}
const iframeManager = new IFrameRunnerManager({
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
//@ts-ignore
totalRunners: 4,
fetchTableFileBuffers: async (table) => {
return fileManagerRef.current?.getTableBufferData(table);
return fileManagerRef.current.getTableBufferData(table);
},
});

Expand Down
31 changes: 12 additions & 19 deletions benchmarking/src/app/file-loader/file-loader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,29 @@ export const FileLoader = ({ children }: { children: JSX.Element }) => {
useClassicEffect(() => {
(async () => {
const file = await axios.get(
'http://localhost:4200/public/data-sets/fhvhv_tripdata_2023-01.parquet',
'http://localhost:4200/data-sets/fhvhv_tripdata_2023-01.parquet',
{ responseType: 'arraybuffer' }
);
const fileBuffer = file.data;
const fileBufferView = new Uint8Array(fileBuffer);

const jsonFile = await axios.get(
'http://localhost:4200/data-sets/taxi.json',
{ responseType: 'json' }
);
const TAXI_JSON_DATA = jsonFile.data;

await fileManager.registerFileBuffer({
tableName: 'taxi',
fileName: 'taxi.parquet',
buffer: fileBufferView,
});

// await fileManager.registerJSON({
// json: TAXI_JSON_DATA,
// tableName: 'taxijson',
// fileName: 'taxijson.parquet',
// });

// //Find all iframe and add fileBufferView & TAXI_JSON_DATA to the window
// const iframes = document.querySelectorAll('iframe');
// iframes.forEach((iframe) => {
// const win = iframe.contentWindow;
// // eslint-disable-next-line @typescript-eslint/ban-ts-comment
// //@ts-ignore
// win.fileBufferView = fileBufferView;
// console.log('fileBufferView', fileBufferView);
// // eslint-disable-next-line @typescript-eslint/ban-ts-comment
// //@ts-ignore
// win.TAXI_JSON_DATA = TAXI_JSON_DATA;
// });
await fileManager.registerJSON({
json: TAXI_JSON_DATA,
tableName: 'taxijson',
fileName: 'taxijson.parquet',
});

setIsFileLoader(true);
})();
Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion benchmarking/src/assets/data-sets/taxi.json

This file was deleted.

Loading