diff --git a/lib/connection.js b/lib/connection.js index af6b3d9d68..8448522c97 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -156,7 +156,7 @@ class Connection extends EventEmitter { } promise(promiseImpl) { - const PromiseConnection = require('../promise').PromiseConnection; + const PromiseConnection = require('./promise_connection.js'); return new PromiseConnection(this, promiseImpl); } diff --git a/lib/inherit_events.js b/lib/inherit_events.js new file mode 100644 index 0000000000..47122aa16a --- /dev/null +++ b/lib/inherit_events.js @@ -0,0 +1,27 @@ +'use strict'; + +function inheritEvents(source, target, events) { + const listeners = {}; + target + .on('newListener', eventName => { + if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { + source.on( + eventName, + (listeners[eventName] = function () { + const args = [].slice.call(arguments); + args.unshift(eventName); + + target.emit.apply(target, args); + }) + ); + } + }) + .on('removeListener', eventName => { + if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { + source.removeListener(eventName, listeners[eventName]); + delete listeners[eventName]; + } + }); +} + +module.exports = inheritEvents diff --git a/lib/make_done_cb.js b/lib/make_done_cb.js new file mode 100644 index 0000000000..71de9b3797 --- /dev/null +++ b/lib/make_done_cb.js @@ -0,0 +1,19 @@ +'use strict'; + +function makeDoneCb(resolve, reject, localErr) { + return function (err, rows, fields) { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sql = err.sql; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve([rows, fields]); + } + }; +} + +module.exports = makeDoneCb diff --git a/lib/pool.js b/lib/pool.js index 7f73db8b00..534bb62aa2 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -33,7 +33,7 @@ class Pool extends EventEmitter { } promise(promiseImpl) { - const PromisePool = require('../promise').PromisePool; + const PromisePool = require('./promise_pool.js'); return new PromisePool(this, promiseImpl); } diff --git a/lib/pool_connection.js b/lib/pool_connection.js index 5760f9a468..f2e64f0755 100644 --- a/lib/pool_connection.js +++ b/lib/pool_connection.js @@ -30,7 +30,7 @@ class PoolConnection extends Connection { } promise(promiseImpl) { - const PromisePoolConnection = require('../promise').PromisePoolConnection; + const PromisePoolConnection = require('./promise_pool_connection.js'); return new PromisePoolConnection(this, promiseImpl); } diff --git a/lib/promise_connection.js b/lib/promise_connection.js new file mode 100644 index 0000000000..45f4a1af1e --- /dev/null +++ b/lib/promise_connection.js @@ -0,0 +1,221 @@ +'use strict'; + +const PromisePreparedStatementInfo = require('./promise_prepared_statement_info.js'); +const makeDoneCb = require('./make_done_cb.js'); +const inheritEvents = require('./inherit_events.js'); +const { Connection } = require('../index.js'); +const EventEmitter = require('events').EventEmitter; + +class PromiseConnection extends EventEmitter { + constructor(connection, promiseImpl) { + super(); + this.connection = connection; + this.Promise = promiseImpl || Promise; + inheritEvents(connection, this, [ + 'error', + 'drain', + 'connect', + 'end', + 'enqueue' + ]); + } + + release() { + this.connection.release(); + } + + query(query, params) { + const c = this.connection; + const localErr = new Error(); + if (typeof params === 'function') { + throw new Error( + 'Callback function is not available with promise clients.' + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (params !== undefined) { + c.query(query, params, done); + } else { + c.query(query, done); + } + }); + } + + execute(query, params) { + const c = this.connection; + const localErr = new Error(); + if (typeof params === 'function') { + throw new Error( + 'Callback function is not available with promise clients.' + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (params !== undefined) { + c.execute(query, params, done); + } else { + c.execute(query, done); + } + }); + } + + end() { + return new this.Promise(resolve => { + this.connection.end(resolve); + }); + } + + beginTransaction() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + c.beginTransaction(done); + }); + } + + commit() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + c.commit(done); + }); + } + + rollback() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + c.rollback(done); + }); + } + + ping() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.ping(err => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(true); + } + }); + }); + } + + connect() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.connect((err, param) => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(param); + } + }); + }); + } + + prepare(options) { + const c = this.connection; + const promiseImpl = this.Promise; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.prepare(options, (err, statement) => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + const wrappedStatement = new PromisePreparedStatementInfo( + statement, + promiseImpl + ); + resolve(wrappedStatement); + } + }); + }); + } + + changeUser(options) { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.changeUser(options, err => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(); + } + }); + }); + } + + get config() { + return this.connection.config; + } + + get threadId() { + return this.connection.threadId; + } +} +// patching PromiseConnection +// create facade functions for prototype functions on "Connection" that are not yet +// implemented with PromiseConnection + +// proxy synchronous functions only +(function (functionsToWrap) { + for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { + const func = functionsToWrap[i]; + + if ( + typeof Connection.prototype[func] === 'function' && + PromiseConnection.prototype[func] === undefined + ) { + PromiseConnection.prototype[func] = (function factory(funcName) { + return function () { + return Connection.prototype[funcName].apply( + this.connection, + arguments + ); + }; + })(func); + } + } +})([ + // synchronous functions + 'close', + 'createBinlogStream', + 'destroy', + 'escape', + 'escapeId', + 'format', + 'pause', + 'pipe', + 'resume', + 'unprepare' +]); +module.exports = PromiseConnection diff --git a/lib/promise_pool.js b/lib/promise_pool.js new file mode 100644 index 0000000000..4858515f90 --- /dev/null +++ b/lib/promise_pool.js @@ -0,0 +1,112 @@ +'use strict'; + +const EventEmitter = require('events').EventEmitter; +const makeDoneCb = require('./make_done_cb.js'); +const PromisePoolConnection = require('./promise_pool_connection.js'); +const inheritEvents = require('./inherit_events.js'); +const Pool = require('./pool'); + +class PromisePool extends EventEmitter { + constructor(pool, thePromise) { + super(); + this.pool = pool; + this.Promise = thePromise || Promise; + inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']); + } + + getConnection() { + const corePool = this.pool; + return new this.Promise((resolve, reject) => { + corePool.getConnection((err, coreConnection) => { + if (err) { + reject(err); + } else { + resolve(new PromisePoolConnection(coreConnection, this.Promise)); + } + }); + }); + } + + releaseConnection(connection) { + if (connection instanceof PromisePoolConnection) connection.release(); + } + + query(sql, args) { + const corePool = this.pool; + const localErr = new Error(); + if (typeof args === 'function') { + throw new Error( + 'Callback function is not available with promise clients.' + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (args !== undefined) { + corePool.query(sql, args, done); + } else { + corePool.query(sql, done); + } + }); + } + + execute(sql, args) { + const corePool = this.pool; + const localErr = new Error(); + if (typeof args === 'function') { + throw new Error( + 'Callback function is not available with promise clients.' + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (args) { + corePool.execute(sql, args, done); + } else { + corePool.execute(sql, done); + } + }); + } + + end() { + const corePool = this.pool; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + corePool.end(err => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(); + } + }); + }); + } +} + +(function (functionsToWrap) { + for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { + const func = functionsToWrap[i]; + + if ( + typeof Pool.prototype[func] === 'function' && + PromisePool.prototype[func] === undefined + ) { + PromisePool.prototype[func] = (function factory(funcName) { + return function () { + return Pool.prototype[funcName].apply(this.pool, arguments); + }; + })(func); + } + } +})([ + // synchronous functions + 'escape', + 'escapeId', + 'format' +]); + +module.exports = PromisePool diff --git a/lib/promise_pool_connection.js b/lib/promise_pool_connection.js new file mode 100644 index 0000000000..88fa0e0c1f --- /dev/null +++ b/lib/promise_pool_connection.js @@ -0,0 +1,19 @@ +'use strict'; + +const PromiseConnection = require('./promise_connection.js'); +const PoolConnection = require('./pool_connection.js'); + +class PromisePoolConnection extends PromiseConnection { + constructor(connection, promiseImpl) { + super(connection, promiseImpl); + } + + destroy() { + return PoolConnection.prototype.destroy.apply( + this.connection, + arguments + ); + } +} + +module.exports = PromisePoolConnection diff --git a/lib/promise_prepared_statement_info.js b/lib/promise_prepared_statement_info.js new file mode 100644 index 0000000000..410f999d3f --- /dev/null +++ b/lib/promise_prepared_statement_info.js @@ -0,0 +1,32 @@ +'use strict'; + +const makeDoneCb = require('./make_done_cb.js'); + +class PromisePreparedStatementInfo { + constructor(statement, promiseImpl) { + this.statement = statement; + this.Promise = promiseImpl; + } + + execute(parameters) { + const s = this.statement; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (parameters) { + s.execute(parameters, done); + } else { + s.execute(done); + } + }); + } + + close() { + return new this.Promise(resolve => { + this.statement.close(); + resolve(); + }); + } +} + +module.exports = PromisePreparedStatementInfo diff --git a/promise.js b/promise.js index 32cd8dc4c2..9a29f8d20f 100644 --- a/promise.js +++ b/promise.js @@ -3,257 +3,15 @@ const SqlString = require('sqlstring'); const EventEmitter = require('events').EventEmitter; const parserCache = require('./lib/parsers/parser_cache.js'); -const PoolConnection = require('./lib/pool_connection.js'); -const Pool = require('./lib/pool.js'); const PoolCluster = require('./lib/pool_cluster.js'); const createConnection = require('./lib/create_connection.js'); -const Connection = require('./lib/connection.js'); const createPool = require('./lib/create_pool.js'); const createPoolCluster = require('./lib/create_pool_cluster.js'); - -function makeDoneCb(resolve, reject, localErr) { - return function (err, rows, fields) { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sql = err.sql; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve([rows, fields]); - } - }; -} - -function inheritEvents(source, target, events) { - const listeners = {}; - target - .on('newListener', eventName => { - if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { - source.on( - eventName, - (listeners[eventName] = function () { - const args = [].slice.call(arguments); - args.unshift(eventName); - - target.emit.apply(target, args); - }) - ); - } - }) - .on('removeListener', eventName => { - if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { - source.removeListener(eventName, listeners[eventName]); - delete listeners[eventName]; - } - }); -} - -class PromisePreparedStatementInfo { - constructor(statement, promiseImpl) { - this.statement = statement; - this.Promise = promiseImpl; - } - - execute(parameters) { - const s = this.statement; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (parameters) { - s.execute(parameters, done); - } else { - s.execute(done); - } - }); - } - - close() { - return new this.Promise(resolve => { - this.statement.close(); - resolve(); - }); - } -} - -class PromiseConnection extends EventEmitter { - constructor(connection, promiseImpl) { - super(); - this.connection = connection; - this.Promise = promiseImpl || Promise; - inheritEvents(connection, this, [ - 'error', - 'drain', - 'connect', - 'end', - 'enqueue' - ]); - } - - release() { - this.connection.release(); - } - - query(query, params) { - const c = this.connection; - const localErr = new Error(); - if (typeof params === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (params !== undefined) { - c.query(query, params, done); - } else { - c.query(query, done); - } - }); - } - - execute(query, params) { - const c = this.connection; - const localErr = new Error(); - if (typeof params === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (params !== undefined) { - c.execute(query, params, done); - } else { - c.execute(query, done); - } - }); - } - - end() { - return new this.Promise(resolve => { - this.connection.end(resolve); - }); - } - - beginTransaction() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - c.beginTransaction(done); - }); - } - - commit() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - c.commit(done); - }); - } - - rollback() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - c.rollback(done); - }); - } - - ping() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.ping(err => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(true); - } - }); - }); - } - - connect() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.connect((err, param) => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(param); - } - }); - }); - } - - prepare(options) { - const c = this.connection; - const promiseImpl = this.Promise; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.prepare(options, (err, statement) => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - const wrappedStatement = new PromisePreparedStatementInfo( - statement, - promiseImpl - ); - resolve(wrappedStatement); - } - }); - }); - } - - changeUser(options) { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.changeUser(options, err => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(); - } - }); - }); - } - - get config() { - return this.connection.config; - } - - get threadId() { - return this.connection.threadId; - } -} +const PromiseConnection = require('./lib/promise_connection.js'); +const PromisePool = require('./lib/promise_pool.js'); +const makeDoneCb = require('./lib/make_done_cb.js'); +const PromisePoolConnection = require('./lib/promise_pool_connection.js'); +const inheritEvents = require('./lib/inherit_events.js'); function createConnectionPromise(opts) { const coreConnection = createConnection(opts); @@ -283,136 +41,7 @@ function createConnectionPromise(opts) { // note: the callback of "changeUser" is not called on success // hence there is no possibility to call "resolve" -// patching PromiseConnection -// create facade functions for prototype functions on "Connection" that are not yet -// implemented with PromiseConnection - -// proxy synchronous functions only -(function (functionsToWrap) { - for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { - const func = functionsToWrap[i]; - - if ( - typeof Connection.prototype[func] === 'function' && - PromiseConnection.prototype[func] === undefined - ) { - PromiseConnection.prototype[func] = (function factory(funcName) { - return function () { - return Connection.prototype[funcName].apply( - this.connection, - arguments - ); - }; - })(func); - } - } -})([ - // synchronous functions - 'close', - 'createBinlogStream', - 'destroy', - 'escape', - 'escapeId', - 'format', - 'pause', - 'pipe', - 'resume', - 'unprepare' -]); - -class PromisePoolConnection extends PromiseConnection { - constructor(connection, promiseImpl) { - super(connection, promiseImpl); - } - - destroy() { - return PoolConnection.prototype.destroy.apply( - this.connection, - arguments - ); - } -} - -class PromisePool extends EventEmitter { - constructor(pool, thePromise) { - super(); - this.pool = pool; - this.Promise = thePromise || Promise; - inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']); - } - - getConnection() { - const corePool = this.pool; - return new this.Promise((resolve, reject) => { - corePool.getConnection((err, coreConnection) => { - if (err) { - reject(err); - } else { - resolve(new PromisePoolConnection(coreConnection, this.Promise)); - } - }); - }); - } - - releaseConnection(connection) { - if (connection instanceof PromisePoolConnection) connection.release(); - } - - query(sql, args) { - const corePool = this.pool; - const localErr = new Error(); - if (typeof args === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (args !== undefined) { - corePool.query(sql, args, done); - } else { - corePool.query(sql, done); - } - }); - } - execute(sql, args) { - const corePool = this.pool; - const localErr = new Error(); - if (typeof args === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (args) { - corePool.execute(sql, args, done); - } else { - corePool.execute(sql, done); - } - }); - } - - end() { - const corePool = this.pool; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - corePool.end(err => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(); - } - }); - }); - } -} function createPromisePool(opts) { const corePool = createPool(opts); @@ -428,27 +57,7 @@ function createPromisePool(opts) { return new PromisePool(corePool, thePromise); } -(function (functionsToWrap) { - for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { - const func = functionsToWrap[i]; - if ( - typeof Pool.prototype[func] === 'function' && - PromisePool.prototype[func] === undefined - ) { - PromisePool.prototype[func] = (function factory(funcName) { - return function () { - return Pool.prototype[funcName].apply(this.pool, arguments); - }; - })(func); - } - } -})([ - // synchronous functions - 'escape', - 'escapeId', - 'format' -]); class PromisePoolCluster extends EventEmitter { constructor(poolCluster, thePromise) {