/*global process*/
/**
* @file Manages query for records in Salesforce
* @author Shinichi Tomita <shinichi.tomita@gmail.com>
*/
'use strict';
var inherits = require('inherits'),
events = require('events'),
stream = require('readable-stream'),
_ = require('lodash/core'),
Promise = require('./promise'),
SfDate = require("./date"),
SOQLBuilder = require("./soql-builder"),
RecordStream = require("./record-stream");
/**
* Query
*
* @protected
* @class
* @extends {stream.Readable}
* @implements Promise.<T>
* @template T
* @param {Connection} conn - Connection object
* @param {Object|String} config - Query config object or SOQL string
* @param {String} [locator] - Locator string to fetch next record set
*/
var Query = module.exports = function(conn, config, locator) {
Query.super_.call(this, { objectMode: true });
this._conn = conn;
if (config) {
if (_.isString(config)) { // if query config is string, it is given in SOQL.
this._soql = config;
} else {
this._config = config;
this.select(config.fields);
if (config.includes) {
this.include(config.includes);
}
}
}
if (locator && locator.indexOf("/") >= 0) { // if locator given in url for next records
locator = locator.split("/").pop();
}
this._locator = locator;
this._executed = false;
this._finished = false;
this._chaining = false;
this._deferred = Promise.defer();
var self = this;
};
inherits(Query, stream.Readable);
/**
* Select fields to include in the returning result
*
* @param {Object|Array.<String>|String} fields - Fields to fetch. Format can be in JSON object (MongoDB-like), array of field names, or comma-separated field names.
* @returns {Query.<T>}
*/
Query.prototype.select = function(fields) {
if (this._soql) {
throw Error("Cannot set select fields for the query which has already built SOQL.");
}
fields = fields || '*';
if (_.isString(fields)) {
fields = fields.split(/\s*,\s*/);
} else if (_.isObject(fields) && !_.isArray(fields)) {
var _fields = [];
for (var k in fields) {
if (fields[k]) { _fields.push(k); }
}
fields = _fields;
}
this._config.fields = fields;
return this;
};
/**
* Set query conditions to filter the result records
*
* @param {Object|String} conditions - Conditions in JSON object (MongoDB-like), or raw SOQL WHERE clause string.
* @returns {Query.<T>}
*/
Query.prototype.where = function(conditions) {
if (this._soql) {
throw Error("Cannot set where conditions for the query which has already built SOQL.");
}
this._config.conditions = conditions;
return this;
};
/**
* Limit the returning result
*
* @param {Number} limit - Maximum number of records the query will return.
* @returns {Query.<T>}
*/
Query.prototype.limit = function(limit) {
if (this._soql) {
throw Error("Cannot set limit for the query which has already built SOQL.");
}
this._config.limit = limit;
return this;
};
/**
* Synonym of Query#skip()
*
* @method Query#offset
* @param {Number} offset - Offset number where begins returning results.
* @returns {Query.<T>}
*/
/**
* Skip records
*
* @method Query#offset
* @param {Number} offset - Offset number where begins returning results.
* @returns {Query.<T>}
*/
Query.prototype.skip =
Query.prototype.offset = function(offset) {
if (this._soql) {
throw Error("Cannot set skip/offset for the query which has already built SOQL.");
}
this._config.offset = offset;
return this;
};
/**
* Synonym of Query#sort()
*
* @memthod Query#orderby
* @param {String|Object} sort - Sorting field or hash object with field name and sord direction
* @param {String|Number} [dir] - Sorting direction (ASC|DESC|1|-1)
* @returns {Query.<T>}
*/
/**
* Set query sort with direction
*
* @method Query#sort
* @param {String|Object} sort - Sorting field or hash object with field name and sord direction
* @param {String|Number} [dir] - Sorting direction (ASC|DESC|1|-1)
* @returns {Query.<T>}
*/
Query.prototype.sort =
Query.prototype.orderby = function(sort, dir) {
if (this._soql) {
throw Error("Cannot set sort for the query which has already built SOQL.");
}
if (_.isString(sort) && _.isString(dir)) {
sort = [ [ sort, dir ] ];
}
this._config.sort = sort;
return this;
};
/**
* Include child relationship query
*
* @param {String} childRelName - Child relationship name to include in query result
* @param {Object|String} [conditions] - Conditions in JSON object (MongoDB-like), or raw SOQL WHERE clause string.
* @param {Object|Array.<String>|String} [fields] - Fields to fetch. Format can be in JSON object (MongoDB-like), array of field names, or comma-separated field names.
* @param {Object} [options] - Query options.
* @param {Number} [options.limit] - Maximum number of records the query will return.
* @param {Number} [options.offset] - Offset number where begins returning results.
* @param {Number} [options.skip] - Synonym of options.offset.
* @returns {Query~SubQuery}
*/
Query.prototype.include = function(childRelName, conditions, fields, options) {
if (this._soql) {
throw Error("Cannot include child relationship into the query which has already built SOQL.");
}
if (_.isObject(childRelName)) {
var includes = childRelName;
for (var crname in includes) {
var config = includes[crname];
this.include(crname, config.conditions, config.fields, config);
}
return;
}
var childConfig = {
table: childRelName,
conditions: conditions,
fields: fields,
limit: options && options.limit,
offset: options && (options.offset || options.skip)
};
this._config.includes = this._config.includes || [];
this._config.includes.push(childConfig);
var childQuery = new SubQuery(this._conn, this, childConfig);
this._children = this._children || [];
this._children.push(childQuery);
return childQuery;
};
/** @private **/
Query.prototype._maxFetch = 10000;
/**
* Setting maxFetch query option
*
* @param {Number} maxFetch - Max fetching records in auto fetch mode
* @returns {Query.<T>}
*/
Query.prototype.maxFetch = function(maxFetch) {
this._maxFetch = maxFetch;
return this;
};
/** @private **/
Query.prototype._autoFetch = false;
/**
* Switching auto fetch mode
*
* @param {Boolean} autoFetch - Using auto fetch mode or not
* @returns {Query.<T>}
*/
Query.prototype.autoFetch = function(autoFetch) {
this._autoFetch = autoFetch;
return this;
};
/** @private **/
Query.prototype._scanAll = false;
/**
* Set flag to scan all records including deleted and archived.
*
* @param {Boolean} scanAll - Flag whether include deleted/archived record or not. Default is false.
* @returns {Query.<T>}
*/
Query.prototype.scanAll = function(scanAll) {
this._scanAll = scanAll;
return this;
};
/**
* @private
*/
var ResponseTargets = Query.ResponseTargets = {};
[ "QueryResult", "Records", "SingleRecord", "Count" ].forEach(function(f) {
ResponseTargets[f] = f;
});
/** @private **/
Query.prototype._responseTarget = ResponseTargets.QueryResult;
/**
* @protected
* @param {String} responseTarget - Query response target
* @returns {Query.<S>}
*/
Query.prototype.setResponseTarget = function(responseTarget) {
if (responseTarget in ResponseTargets) {
this._responseTarget = responseTarget;
}
return this;
};
/**
* Synonym of Query#execute()
*
* @method Query#run
* @param {Object} [options] - Query options
* @param {Boolean} [options.autoFetch] - Using auto fetch mode or not
* @param {Number} [options.maxFetch] - Max fetching records in auto fetch mode
* @param {Boolean} [options.scanAll] - Including deleted records for query target or not
* @param {Callback.<T>} [callback] - Callback function
* @returns {Query.<T>}
*/
Query.prototype.run =
/**
* Synonym of Query#execute()
*
* @method Query#exec
* @param {Object} [options] - Query options
* @param {Boolean} [options.autoFetch] - Using auto fetch mode or not
* @param {Number} [options.maxFetch] - Max fetching records in auto fetch mode
* @param {Boolean} [options.scanAll] - Including deleted records for query target or not
* @param {Callback.<T>} [callback] - Callback function
* @returns {Query.<T>}
*/
Query.prototype.exec =
/**
* Execute query and fetch records from server.
*
* @method Query#execute
* @param {Object} [options] - Query options
* @param {Boolean} [options.autoFetch] - Using auto fetch mode or not
* @param {Number} [options.maxFetch] - Max fetching records in auto fetch mode
* @param {Boolean} [options.scanAll] - Including deleted records for query target or not
* @param {Callback.<T>} [callback] - Callback function
* @returns {Query.<T>}
*/
Query.prototype.execute = function(options, callback) {
var self = this;
var logger = this._conn._logger;
var deferred = this._deferred;
if (this._executed) {
deferred.reject(new Error("re-executing already executed query"));
return this;
}
if (this._finished) {
deferred.reject(new Error("executing already closed query"));
return this;
}
if (typeof options === "function") {
callback = options;
options = {};
}
options = options || {};
options = {
responseTarget: options.responseTarget || self._responseTarget,
autoFetch: options.autoFetch || self._autoFetch,
maxFetch: options.maxFetch || self._maxFetch,
scanAll: options.scanAll || self._scanAll
};
// callback and promise resolution;
var promiseCallback = function(err, res) {
if (_.isFunction(callback)) {
try {
res = callback(err, res);
err = null;
} catch(e) {
err = e;
}
}
if (err) {
deferred.reject(err);
} else {
deferred.resolve(res);
}
};
this.once('response', function(res) {
promiseCallback(null, res);
});
this.once('error', function(err) {
promiseCallback(err);
});
// collect fetched records in array
// only when response target is Records and
// either callback or chaining promises are available to this query.
this.once('fetch', function() {
if (options.responseTarget === ResponseTargets.Records && (self._chaining || callback)) {
logger.debug('--- collecting all fetched records ---');
var records = [];
var onRecord = function(record) {
records.push(record);
};
self.on('record', onRecord);
self.once('end', function() {
self.removeListener('record', onRecord);
self.emit('response', records, self);
});
}
});
// flag to prevent re-execution
this._executed = true;
// start actual query
logger.debug('>>> Query start >>>');
this._execute(options).then(function() {
logger.debug('*** Query finished ***');
}).fail(function(err) {
logger.debug('--- Query error ---');
self.emit('error', err);
});
// return Query instance for chaining
return this;
};
/**
* @private
*/
Query.prototype._execute = function(options) {
var self = this;
var logger = this._conn._logger;
var responseTarget = options.responseTarget;
var autoFetch = options.autoFetch;
var maxFetch = options.maxFetch;
var scanAll = options.scanAll;
return Promise.resolve(
self._locator ?
self._conn._baseUrl() + "/query/" + self._locator :
self.toSOQL().then(function(soql) {
self.totalFetched = 0;
logger.debug("SOQL = " + soql);
return self._conn._baseUrl() + "/" + (scanAll ? "queryAll" : "query") + "?q=" + encodeURIComponent(soql);
})
).then(function(url) {
return self._conn.request(url);
}).then(function(data) {
self.emit("fetch");
self.totalSize = data.totalSize;
var res;
switch(responseTarget) {
case ResponseTargets.SingleRecord:
res = data.records && data.records.length > 0 ? data.records[0] : null;
break;
case ResponseTargets.Records:
res = data.records;
break;
case ResponseTargets.Count:
res = data.totalSize;
break;
default:
res = data;
}
// only fire response event when it should be notified per fetch
if (responseTarget !== ResponseTargets.Records) {
self.emit("response", res, self);
}
// streaming record instances
for (var i=0, l=data.records.length; i<l; i++) {
if (self.totalFetched >= maxFetch) {
self._finished = true;
break;
}
var record = data.records[i];
self.push(record);
self.emit('record', record, self.totalFetched++, self);
}
if (data.nextRecordsUrl) {
self._locator = data.nextRecordsUrl.split('/').pop();
}
self._finished = self._finished || data.done || !autoFetch;
if (self._finished) {
self.push(null);
} else {
self._execute(options);
}
return res;
});
};
/**
* Readable stream implementation
*
* @override
* @private
*/
Query.prototype._read = function(size) {
if (!this._finished && !this._executed) {
this.execute({ autoFetch: true });
}
};
/** @override **/
Query.prototype.on = function(e, fn) {
if (e === 'record') {
var self = this;
this.on('readable', function() {
while(self.read() !== null) {} // discard buffered records
});
}
return Query.super_.prototype.on.call(this, e, fn);
};
/** @override **/
Query.prototype.addListener = Query.prototype.on;
/**
* @private
*/
Query.prototype._expandFields = function() {
if (this._soql) {
return Promise.reject(new Error("Cannot expand fields for the query which has already built SOQL."));
}
var self = this;
var logger = self._conn._logger;
var conn = this._conn;
var table = this._config.table;
var fields = this._config.fields || [];
logger.debug('_expandFields: table = ' + table + ', fields = ' + fields.join(', '));
return Promise.all([
Promise.resolve(self._parent ? findRelationTable(table) : table)
.then(function(table) {
return Promise.all(
_.map(fields, function(field) { return expandAsteriskField(table, field); })
).then(function(expandedFields) {
self._config.fields = _.flatten(expandedFields);
});
}),
Promise.all(
_.map(self._children || [], function(childQuery) {
return childQuery._expandFields();
})
)
]);
function findRelationTable(rname) {
var ptable = self._parent._config.table;
logger.debug('finding table for relation "' + rname + '" in "' + ptable + '"...');
return describeCache(ptable).then(function(sobject) {
var upperRname = rname.toUpperCase();
var childRelation = _.find(sobject.childRelationships, function(cr) {
return (cr.relationshipName || '').toUpperCase() === upperRname;
});
return childRelation ? childRelation.childSObject :
Promise.reject(new Error("No child relationship found: " + rname ));
});
}
function describeCache(table) {
logger.debug('describe cache: '+table);
var deferred = Promise.defer();
conn.describe$(table, function(err, sobject) {
logger.debug('... done.');
if (err) { deferred.reject(err); }
else { deferred.resolve(sobject); }
});
return deferred.promise;
}
function expandAsteriskField(table, field) {
logger.debug('expanding field "'+ field + '" in "' + table + '"...');
var fpath = field.split('.');
return fpath[fpath.length - 1] === '*' ?
describeCache(table).then(function(sobject) {
logger.debug('table '+table+'has been described');
if (fpath.length > 1) {
var rname = fpath.shift();
var rfield = _.find(sobject.fields, function(f) {
return f.relationshipName &&
f.relationshipName.toUpperCase() === rname.toUpperCase();
});
if (rfield) {
var rtable = rfield.referenceTo.length === 1 ? rfield.referenceTo[0] : 'Name';
return expandAsteriskField(rtable, fpath.join('.')).then(function(fpaths) {
return _.map(fpaths, function(fpath) { return rname + '.' + fpath; });
});
} else {
return [];
}
} else {
return _.map(sobject.fields, function(f) { return f.name; });
}
}) :
Promise.resolve([ field ]);
}
};
/**
* Explain plan for executing query
*
* @param {Callback.<ExplainInfo>} [callback] - Callback function
* @returns {Promise.<ExplainInfo>}
*/
Query.prototype.explain = function(callback) {
var self = this;
var logger = this._conn._logger;
return self.toSOQL().then(function(soql) {
logger.debug("SOQL = " + soql);
var url = "/query/?explain=" + encodeURIComponent(soql);
return self._conn.request(url);
}).thenCall(callback);
};
/**
* Return SOQL expression for the query
*
* @param {Callback.<String>} [callback] - Callback function
* @returns {Promise.<String>}
*/
Query.prototype.toSOQL = function(callback) {
var self = this;
return Promise.resolve(self._soql ||
self._expandFields().then(function() { return SOQLBuilder.createSOQL(self._config); })
).thenCall(callback);
};
/**
* Create data stream of queried records.
* Automatically resume query if paused.
*
* @param {String} [type] - Type of outgoing data format. Currently 'csv' is default value and the only supported.
* @param {Object} [options] - Options passed to converter
* @returns {stream.Readable}
*/
Query.prototype.stream = RecordStream.Serializable.prototype.stream;
/**
* Get record stream of queried records applying the given mapping function
*
* @param {RecordMapFunction} fn - Record mapping function
* @returns {RecordStream.Serializable}
*/
Query.prototype.map = RecordStream.prototype.map;
/**
* Get record stream of queried records, applying the given filter function
*
* @param {RecordFilterFunction} fn - Record filtering function
* @returns {RecordStream.Serializable}
*/
Query.prototype.filter = RecordStream.prototype.map;
/**
* Synonym of Query#destroy()
*
* @method Query#delete
* @param {String} [type] - SObject type. Required for SOQL based query object.
* @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
* @returns {Bulk~Batch}
*/
/**
* Synonym of Query#destroy()
*
* @method Query#del
* @param {String} [type] - SObject type. Required for SOQL based query object.
* @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
* @returns {Bulk~Batch}
*/
/**
* Bulk delete queried records
*
* @method Query#destroy
* @param {String} [type] - SObject type. Required for SOQL based query object.
* @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
* @returns {Promise.<Array.<RecordResult>>}
*/
Query.prototype["delete"] =
Query.prototype.del =
Query.prototype.destroy = function(type, callback) {
if (typeof type === 'function') {
callback = type;
type = null;
}
type = type || (this._config && this._config.table);
if (!type) {
throw new Error("SOQL based query needs SObject type information to bulk delete.");
}
var batch = this._conn.sobject(type).deleteBulk();
var deferred = Promise.defer();
var handleError = function(err) {
if (err.name === 'ClientInputError') { deferred.resolve([]); } // if batch input receives no records
else { deferred.reject(err); }
};
this.on('error', handleError)
.pipe(batch)
.on('response', function(res) { deferred.resolve(res); })
.on('error', handleError);
return deferred.promise.thenCall(callback);
};
/**
* Bulk update queried records, using given mapping function/object
*
* @param {Record|RecordMapFunction} mapping - Mapping record or record mapping function
* @param {String} [type] - SObject type. Required for SOQL based query object.
* @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
* @returns {Promise.<Array.<RecordResult>>}
*/
Query.prototype.update = function(mapping, type, callback) {
if (typeof type === 'function') {
callback = type;
type = null;
}
type = type || (this._config && this._config.table);
if (!type) {
throw new Error("SOQL based query needs SObject type information to bulk update.");
}
var updateStream = _.isFunction(mapping) ? RecordStream.map(mapping) : RecordStream.recordMapStream(mapping);
var batch = this._conn.sobject(type).updateBulk();
var deferred = Promise.defer();
var handleError = function(err) {
if (err.name === 'ClientInputError') { deferred.resolve([]); } // if batch input receives no records
else { deferred.reject(err); }
};
this.on('error', handleError)
.pipe(updateStream)
.on('error', handleError)
.pipe(batch)
.on('response', function(res) { deferred.resolve(res); })
.on('error', handleError);
return deferred.promise.thenCall(callback);
};
/**
* Promise/A+ interface
* http://promises-aplus.github.io/promises-spec/
*
* Delegate to deferred promise, return promise instance for query result
*
* @param {FulfilledCallback.<T, S1>} [onFulfilled]
* @param {RejectedCallback.<S2>} [onRejected]
* @returns {Promise.<S1|S2>}
*/
Query.prototype.then = function(onResolved, onReject) {
this._chaining = true;
if (!this._finished && !this._executed) { this.execute(); }
return this._deferred.promise.then.apply(this._deferred.promise, arguments);
};
/**
* Promise/A+ extension
* Call "then" using given node-style callback function
*
* @param {Callback.<T>} [callback] - Callback function
* @returns {Query}
*/
Query.prototype.thenCall = function(callback) {
if (_.isFunction(callback)) {
this.then(function(res) {
process.nextTick(function() {
callback(null, res);
});
}, function(err) {
process.nextTick(function() {
callback(err);
});
});
}
return this;
};
/*--------------------------------------------*/
/**
* SubQuery object for representing child relationship query
*
* @protected
* @class Query~SubQuery
* @extends Query
* @param {Connection} conn - Connection object
* @param {Query} parent - Parent query object
* @param {Object} config - Sub query configuration
*/
var SubQuery = function(conn, parent, config) {
SubQuery.super_.call(this, conn, config);
this._parent = parent;
};
inherits(SubQuery, Query);
/**
* @method Query~SubQuery#include
* @override
*/
SubQuery.prototype.include = function() {
throw new Error("Not allowed to include another subquery in subquery.");
};
/**
* Back the context to parent query object
*
* @method Query~SubQuery#end
* @returns {Query}
*/
SubQuery.prototype.end = function() {
return this._parent;
};
/**
* If execute is called in subquery context, delegate it to parent query object
*
* @method Query~SubQuery#execute
* @override
*/
SubQuery.prototype.run =
SubQuery.prototype.exec =
SubQuery.prototype.execute = function() {
return this._parent.execute.apply(this._parent, arguments);
};