Skip to content

Commit 4dab4ca

Browse files
fix(stream): reads should emit the dataset number for each dataset (#2628)
* 🐛 streaming reads should emit the dataset number for each dataset * 👌 attend to PR commentary * 👌 rename test fixture file, as per PR request * 🚨 run prettier -w on test-multi-result-streaming.test.cjs * ⚗️ try latest mysql * ⚗️ enable GHA for this branch, hopefully * 🐛 no need to process.exit(0) in the test if the connection is properly destroyed * ⏪ revert addition of this branch as a trigger * ⏪ revert mysql version update * 🚨 pacify the linter * ci: debug multilpe stream test order --------- Co-authored-by: Davyd McColl <[email protected]>
1 parent b5df699 commit 4dab4ca

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

lib/commands/query.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ class Query extends Command {
251251
if (this.onResult) {
252252
this._rows[this._resultIndex].push(row);
253253
} else {
254-
this.emit('result', row);
254+
this.emit('result', row, this._resultIndex);
255255
}
256256
return Query.prototype.row;
257257
}
@@ -268,11 +268,11 @@ class Query extends Command {
268268
stream._read = () => {
269269
this._connection && this._connection.resume();
270270
};
271-
this.on('result', row => {
271+
this.on('result', (row, resultSetIndex) => {
272272
if (!stream.push(row)) {
273273
this._connection.pause();
274274
}
275-
stream.emit('result', row); // replicate old emitter
275+
stream.emit('result', row, resultSetIndex); // replicate old emitter
276276
});
277277
this.on('error', err => {
278278
stream.emit('error', err); // Pass on any errors
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
'use strict';
2+
3+
const { assert } = require('poku');
4+
const { createConnection } = require('../common.test.cjs');
5+
6+
(async () => {
7+
const conn = createConnection({ multipleStatements: true });
8+
const captured1 = [];
9+
const captured2 = [];
10+
const sql1 =
11+
'select * from information_schema.columns order by table_schema, table_name, column_name limit 1;';
12+
const sql2 =
13+
'select * from information_schema.columns order by table_schema, table_name, ordinal_position limit 1;';
14+
15+
await conn.promise().query('set global max_allowed_packet=524288000');
16+
17+
const compare1 = await conn.promise().query(sql1);
18+
const compare2 = await conn.promise().query(sql2);
19+
20+
if (!compare1 || compare1.length < 1) {
21+
assert.fail('no results for comparison 1');
22+
}
23+
if (!compare2 || compare2.length < 1) {
24+
assert.fail('no results for comparison 2');
25+
}
26+
27+
const stream = conn.query(`${sql1}\n${sql2}`).stream();
28+
stream.on('result', (row, datasetIndex) => {
29+
if (datasetIndex === 0) {
30+
captured1.push(row);
31+
} else {
32+
captured2.push(row);
33+
}
34+
});
35+
// note: this is very important:
36+
// after each result set is complete,
37+
// the stream will emit "readable" and if we don't
38+
// read then 'end' won't be emitted and the
39+
// test will hang.
40+
stream.on('readable', () => {
41+
stream.read();
42+
});
43+
44+
await new Promise((resolve, reject) => {
45+
stream.on('error', (e) => reject(e));
46+
stream.on('end', () => resolve());
47+
});
48+
49+
assert.equal(captured1.length, 1);
50+
assert.equal(captured2.length, 1);
51+
assert.deepEqual(captured1[0], compare1[0][0]);
52+
assert.deepEqual(captured2[0], compare2[0][0]);
53+
54+
conn.end();
55+
})();

0 commit comments

Comments
 (0)