216 lines
6.8 KiB
JavaScript
216 lines
6.8 KiB
JavaScript
'use strict';
|
|
|
|
var Pooling = require('generic-pool')
|
|
, Promise = require('../../promise')
|
|
, _ = require('lodash')
|
|
, defaultPoolingConfig = {
|
|
max: 5,
|
|
min: 0,
|
|
idle: 10000,
|
|
handleDisconnects: true
|
|
}
|
|
, ConnectionManager;
|
|
|
|
ConnectionManager = function(dialect, sequelize) {
|
|
var config = sequelize.config;
|
|
|
|
this.sequelize = sequelize;
|
|
this.config = config;
|
|
this.dialect = dialect;
|
|
|
|
if (config.pool) {
|
|
config.pool = _.clone(config.pool); // Make sure we don't modify the existing config object (user might re-use it)
|
|
config.pool =_.defaults(config.pool, defaultPoolingConfig, {
|
|
validate: this.$validate.bind(this)
|
|
}) ;
|
|
} else {
|
|
// If the user has turned off pooling we provide a 0/1 pool for backwards compat
|
|
config.pool = _.defaults({
|
|
max: 1,
|
|
min: 0
|
|
}, defaultPoolingConfig, {
|
|
validate: this.$validate.bind(this)
|
|
});
|
|
}
|
|
|
|
// Map old names
|
|
if (config.pool.maxIdleTime) config.pool.idle = config.pool.maxIdleTime;
|
|
if (config.pool.maxConnections) config.pool.max = config.pool.maxConnections;
|
|
if (config.pool.minConnections) config.pool.min = config.pool.minConnections;
|
|
|
|
this.onProcessExit = this.onProcessExit.bind(this); // Save a reference to the bound version so we can remove it with removeListener
|
|
process.on('exit', this.onProcessExit);
|
|
};
|
|
|
|
ConnectionManager.prototype.onProcessExit = function() {
|
|
var self = this;
|
|
|
|
if (this.pool) {
|
|
this.pool.drain(function() {
|
|
self.pool.destroyAllNow();
|
|
});
|
|
}
|
|
};
|
|
|
|
ConnectionManager.prototype.close = function () {
|
|
this.onProcessExit();
|
|
process.removeListener('exit', this.onProcessExit); // Remove the listener, so all references to this instance can be garbage collected.
|
|
|
|
this.getConnection = function () {
|
|
return Promise.reject(new Error('ConnectionManager.getConnection was called after the connection manager was closed!'));
|
|
};
|
|
};
|
|
|
|
// This cannot happen in the constructor because the user can specify a min. number of connections to have in the pool
|
|
// If he does this, generic-pool will try to call connect before the dialect-specific connection manager has been correctly set up
|
|
ConnectionManager.prototype.initPools = function () {
|
|
var self = this
|
|
, config = _.cloneDeep(this.config);
|
|
|
|
if (!config.replication) {
|
|
this.pool = Pooling.Pool({
|
|
name: 'sequelize-connection',
|
|
create: function(callback) {
|
|
self.$connect(config).nodeify(function (err, connection) {
|
|
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
|
|
});
|
|
},
|
|
destroy: function(connection) {
|
|
self.$disconnect(connection);
|
|
},
|
|
max: config.pool.max,
|
|
min: config.pool.min,
|
|
validate: config.pool.validate,
|
|
idleTimeoutMillis: config.pool.idle
|
|
});
|
|
return;
|
|
}
|
|
|
|
var reads = 0;
|
|
|
|
if (!Array.isArray(config.replication.read)) {
|
|
config.replication.read = [config.replication.read];
|
|
}
|
|
|
|
// Map main connection config
|
|
config.replication.write = _.defaults(config.replication.write, {
|
|
host: config.host,
|
|
port: config.port,
|
|
username: config.username,
|
|
password: config.password,
|
|
database: config.database
|
|
});
|
|
|
|
// Apply defaults to each read config
|
|
config.replication.read = _.map(config.replication.read, function(config) {
|
|
return _.defaults(config, {
|
|
host: self.config.host,
|
|
port: self.config.port,
|
|
username: self.config.username,
|
|
password: self.config.password,
|
|
database: self.config.database
|
|
});
|
|
});
|
|
|
|
// I'll make my own pool, with blackjack and hookers! (original credit goes to @janzeh)
|
|
this.pool = {
|
|
release: function(client) {
|
|
if (client.queryType === 'read') {
|
|
return self.pool.read.release(client);
|
|
} else {
|
|
return self.pool.write.release(client);
|
|
}
|
|
},
|
|
acquire: function(callback, priority, queryType, useMaster) {
|
|
useMaster = _.isUndefined(useMaster) ? false : useMaster;
|
|
if (queryType === 'SELECT' && !useMaster) {
|
|
self.pool.read.acquire(callback, priority);
|
|
} else {
|
|
self.pool.write.acquire(callback, priority);
|
|
}
|
|
},
|
|
destroy: function(connection) {
|
|
return self.pool[connection.queryType].destroy(connection);
|
|
},
|
|
destroyAllNow: function() {
|
|
self.pool.read.destroyAllNow();
|
|
self.pool.write.destroyAllNow();
|
|
},
|
|
drain: function(cb) {
|
|
self.pool.write.drain(function() {
|
|
self.pool.read.drain(cb);
|
|
});
|
|
},
|
|
read: Pooling.Pool({
|
|
name: 'sequelize-connection-read',
|
|
create: function(callback) {
|
|
// Simple round robin config
|
|
var nextRead = reads++ % config.replication.read.length;
|
|
self.$connect(config.replication.read[nextRead]).tap(function (connection) {
|
|
connection.queryType = 'read';
|
|
}).nodeify(function (err, connection) {
|
|
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
|
|
});
|
|
},
|
|
destroy: function(connection) {
|
|
self.$disconnect(connection);
|
|
},
|
|
validate: config.pool.validate,
|
|
max: config.pool.max,
|
|
min: config.pool.min,
|
|
idleTimeoutMillis: config.pool.idle
|
|
}),
|
|
write: Pooling.Pool({
|
|
name: 'sequelize-connection-write',
|
|
create: function(callback) {
|
|
self.$connect(config.replication.write).tap(function (connection) {
|
|
connection.queryType = 'write';
|
|
}).nodeify(function (err, connection) {
|
|
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
|
|
});
|
|
},
|
|
destroy: function(connection) {
|
|
self.$disconnect(connection);
|
|
},
|
|
validate: config.pool.validate,
|
|
max: config.pool.max,
|
|
min: config.pool.min,
|
|
idleTimeoutMillis: config.pool.idle
|
|
})
|
|
};
|
|
};
|
|
|
|
ConnectionManager.prototype.getConnection = function(options) {
|
|
var self = this;
|
|
options = options || {};
|
|
|
|
return new Promise(function (resolve, reject) {
|
|
self.pool.acquire(function(err, connection) {
|
|
if (err) return reject(err);
|
|
resolve(connection);
|
|
}, options.priority, options.type, options.useMaster);
|
|
});
|
|
};
|
|
ConnectionManager.prototype.releaseConnection = function(connection) {
|
|
var self = this;
|
|
|
|
return new Promise(function (resolve, reject) {
|
|
self.pool.release(connection);
|
|
resolve();
|
|
});
|
|
};
|
|
|
|
ConnectionManager.prototype.$connect = function(config) {
|
|
return this.dialect.connectionManager.connect(config);
|
|
};
|
|
ConnectionManager.prototype.$disconnect = function(connection) {
|
|
return this.dialect.connectionManager.disconnect(connection);
|
|
};
|
|
|
|
ConnectionManager.prototype.$validate = function(connection) {
|
|
if (!this.dialect.connectionManager.validate) return Promise.resolve();
|
|
return this.dialect.connectionManager.validate(connection);
|
|
};
|
|
|
|
module.exports = ConnectionManager;
|