Source: query.js

/*global process*/
 * @file Manages query for records in Salesforce
 * @author Shinichi Tomita <>

'use strict';

var inherits = require('inherits'),
    events = require('events'),
    stream = require('readable-stream'),
    _      = require('lodash/core'),
    Promise = require('./promise'),
    SfDate = require("./date"),
    SOQLBuilder = require("./soql-builder"),
    RecordStream = require("./record-stream");

 * Query
 * @protected
 * @class
 * @extends {stream.Readable}
 * @implements Promise.<T>
 * @template T
 * @param {Connection} conn - Connection object
 * @param {Object|String} config - Query config object or SOQL string
 * @param {String} [locator] - Locator string to fetch next record set
var Query = module.exports = function(conn, config, locator) {, { objectMode: true });

  this._conn = conn;
  if (config) {
    if (_.isString(config)) { // if query config is string, it is given in SOQL.
      this._soql = config;
    } else {
      this._config = config;;
      if (config.includes) {
  if (locator && locator.indexOf("/") >= 0) { // if locator given in url for next records
    locator = locator.split("/").pop();
  this._locator = locator;

  this._executed = false;
  this._finished = false;
  this._chaining = false;

  this._deferred = Promise.defer();

  var self = this;

inherits(Query, stream.Readable);

 * Select fields to include in the returning result
 * @param {Object|Array.<String>|String} fields - Fields to fetch. Format can be in JSON object (MongoDB-like), array of field names, or comma-separated field names.
 * @returns {Query.<T>}
 */ = function(fields) {
  if (this._soql) {
    throw Error("Cannot set select fields for the query which has already built SOQL.");
  fields = fields || '*';
  if (_.isString(fields)) {
    fields = fields.split(/\s*,\s*/);
  } else if (_.isObject(fields) && !_.isArray(fields)) {
    var _fields =  [];
    for (var k in fields) {
      if (fields[k]) { _fields.push(k); }
    fields = _fields;
  this._config.fields = fields;
  return this;

 * Set query conditions to filter the result records
 * @param {Object|String} conditions - Conditions in JSON object (MongoDB-like), or raw SOQL WHERE clause string.
 * @returns {Query.<T>}
Query.prototype.where = function(conditions) {
  if (this._soql) {
    throw Error("Cannot set where conditions for the query which has already built SOQL.");
  this._config.conditions = conditions;
  return this;

 * Limit the returning result
 * @param {Number} limit - Maximum number of records the query will return.
 * @returns {Query.<T>}
Query.prototype.limit = function(limit) {
  if (this._soql) {
    throw Error("Cannot set limit for the query which has already built SOQL.");
  this._config.limit = limit;
  return this;

 * Synonym of Query#skip()
 * @method Query#offset
 * @param {Number} offset - Offset number where begins returning results.
 * @returns {Query.<T>}
 * Skip records
 * @method Query#offset
 * @param {Number} offset - Offset number where begins returning results.
 * @returns {Query.<T>}
Query.prototype.skip =
Query.prototype.offset = function(offset) {
  if (this._soql) {
    throw Error("Cannot set skip/offset for the query which has already built SOQL.");
  this._config.offset = offset;
  return this;

 * Synonym of Query#sort()
 * @memthod Query#orderby
 * @param {String|Object} sort - Sorting field or hash object with field name and sord direction
 * @param {String|Number} [dir] - Sorting direction (ASC|DESC|1|-1)
 * @returns {Query.<T>}
 * Set query sort with direction
 * @method Query#sort
 * @param {String|Object} sort - Sorting field or hash object with field name and sord direction
 * @param {String|Number} [dir] - Sorting direction (ASC|DESC|1|-1)
 * @returns {Query.<T>}
Query.prototype.sort =
Query.prototype.orderby = function(sort, dir) {
  if (this._soql) {
    throw Error("Cannot set sort for the query which has already built SOQL.");
  if (_.isString(sort) && _.isString(dir)) {
    sort = [ [ sort, dir ] ];
  this._config.sort = sort;
  return this;

 * Include child relationship query
 * @param {String} childRelName - Child relationship name to include in query result
 * @param {Object|String} [conditions] - Conditions in JSON object (MongoDB-like), or raw SOQL WHERE clause string.
 * @param {Object|Array.<String>|String} [fields] - Fields to fetch. Format can be in JSON object (MongoDB-like), array of field names, or comma-separated field names.
 * @param {Object} [options] - Query options.
 * @param {Number} [options.limit] - Maximum number of records the query will return.
 * @param {Number} [options.offset] - Offset number where begins returning results.
 * @param {Number} [options.skip] - Synonym of options.offset.
 * @returns {Query~SubQuery}
Query.prototype.include = function(childRelName, conditions, fields, options) {
  if (this._soql) {
    throw Error("Cannot include child relationship into the query which has already built SOQL.");
  if (_.isObject(childRelName)) {
    var includes = childRelName;
    for (var crname in includes) {
      var config = includes[crname];
      this.include(crname, config.conditions, config.fields, config);
  var childConfig = {
    table: childRelName,
    conditions: conditions,
    fields: fields,
    limit: options && options.limit,
    offset: options && (options.offset || options.skip)
  this._config.includes = this._config.includes || [];
  var childQuery = new SubQuery(this._conn, this, childConfig);
  this._children = this._children || [];
  return childQuery;

/** @private **/
Query.prototype._maxFetch = 10000;
 * Setting maxFetch query option
 * @param {Number} maxFetch - Max fetching records in auto fetch mode
 * @returns {Query.<T>}
Query.prototype.maxFetch = function(maxFetch) {
  this._maxFetch = maxFetch;
  return this;

/** @private **/
Query.prototype._autoFetch = false;
 * Switching auto fetch mode
 * @param {Boolean} autoFetch - Using auto fetch mode or not
 * @returns {Query.<T>}
Query.prototype.autoFetch = function(autoFetch) {
  this._autoFetch = autoFetch;
  return this;

/** @private **/
Query.prototype._scanAll = false;
 * Set flag to scan all records including deleted and archived.
 * @param {Boolean} scanAll - Flag whether include deleted/archived record or not. Default is false.
 * @returns {Query.<T>}
Query.prototype.scanAll = function(scanAll) {
  this._scanAll = scanAll;
  return this;

 * @private
var ResponseTargets = Query.ResponseTargets = {};
[ "QueryResult", "Records", "SingleRecord", "Count" ].forEach(function(f) {
  ResponseTargets[f] = f;

/** @private **/
Query.prototype._responseTarget = ResponseTargets.QueryResult;
 * @protected
 * @param {String} responseTarget - Query response target
 * @returns {Query.<S>}
Query.prototype.setResponseTarget = function(responseTarget) {
  if (responseTarget in ResponseTargets) {
    this._responseTarget = responseTarget;
  return this;

 * Synonym of Query#execute()
 * @method Query#run
 * @param {Object} [options] - Query options
 * @param {Boolean} [options.autoFetch] - Using auto fetch mode or not
 * @param {Number} [options.maxFetch] - Max fetching records in auto fetch mode
 * @param {Boolean} [options.scanAll] - Including deleted records for query target or not
 * @param {Callback.<T>} [callback] - Callback function
 * @returns {Query.<T>}
 */ =
 * Synonym of Query#execute()
 * @method Query#exec
 * @param {Object} [options] - Query options
 * @param {Boolean} [options.autoFetch] - Using auto fetch mode or not
 * @param {Number} [options.maxFetch] - Max fetching records in auto fetch mode
 * @param {Boolean} [options.scanAll] - Including deleted records for query target or not
 * @param {Callback.<T>} [callback] - Callback function
 * @returns {Query.<T>}
Query.prototype.exec =
 * Execute query and fetch records from server.
 * @method Query#execute
 * @param {Object} [options] - Query options
 * @param {Boolean} [options.autoFetch] - Using auto fetch mode or not
 * @param {Number} [options.maxFetch] - Max fetching records in auto fetch mode
 * @param {Boolean} [options.scanAll] - Including deleted records for query target or not
 * @param {Callback.<T>} [callback] - Callback function
 * @returns {Query.<T>}
Query.prototype.execute = function(options, callback) {
  var self = this;
  var logger = this._conn._logger;
  var deferred = this._deferred;

  if (this._executed) {
    deferred.reject(new Error("re-executing already executed query"));
    return this;

  if (this._finished) {
    deferred.reject(new Error("executing already closed query"));
    return this;

  if (typeof options === "function") {
    callback = options;
    options = {};
  options = options || {};
  options = {
    responseTarget: options.responseTarget || self._responseTarget,
    autoFetch: options.autoFetch || self._autoFetch,
    maxFetch: options.maxFetch || self._maxFetch,
    scanAll: options.scanAll || self._scanAll

  // callback and promise resolution;
  var promiseCallback = function(err, res) {
    if (_.isFunction(callback)) {
      try {
        res = callback(err, res);
        err = null;
      } catch(e) {
        err = e;
    if (err) {
    } else {
  this.once('response', function(res) {
    promiseCallback(null, res);
  this.once('error', function(err) {

  // collect fetched records in array
  // only when response target is Records and
  // either callback or chaining promises are available to this query.
  this.once('fetch', function() {
    if (options.responseTarget === ResponseTargets.Records && (self._chaining || callback)) {
      logger.debug('--- collecting all fetched records ---');
      var records = [];
      var onRecord = function(record) {
      self.on('record', onRecord);
      self.once('end', function() {
        self.removeListener('record', onRecord);
        self.emit('response', records, self);

  // flag to prevent re-execution
  this._executed = true;

  // start actual query
  logger.debug('>>> Query start >>>');
  this._execute(options).then(function() {
    logger.debug('*** Query finished ***');
  }).fail(function(err) {
    logger.debug('--- Query error ---');
    self.emit('error', err);

  // return Query instance for chaining
  return this;

 * @private
Query.prototype._execute = function(options) {
  var self = this;
  var logger = this._conn._logger;
  var responseTarget = options.responseTarget;
  var autoFetch = options.autoFetch;
  var maxFetch = options.maxFetch;
  var scanAll = options.scanAll;

  return Promise.resolve(
    self._locator ?
    self._conn._baseUrl() + "/query/" + self._locator :
    self.toSOQL().then(function(soql) {
      self.totalFetched = 0;
      logger.debug("SOQL = " + soql);
      return self._conn._baseUrl() + "/" + (scanAll ? "queryAll" : "query") + "?q=" + encodeURIComponent(soql);
  ).then(function(url) {
    return self._conn.request(url);
  }).then(function(data) {
    self.totalSize = data.totalSize;
    var res;
    switch(responseTarget) {
      case ResponseTargets.SingleRecord:
        res = data.records && data.records.length > 0 ? data.records[0] : null;
      case ResponseTargets.Records:
        res = data.records;
      case ResponseTargets.Count:
        res = data.totalSize;
        res = data;
    // only fire response event when it should be notified per fetch
    if (responseTarget !== ResponseTargets.Records) {
      self.emit("response", res, self);

    // streaming record instances
    for (var i=0, l=data.records.length; i<l; i++) {
      if (self.totalFetched >= maxFetch) {
        self._finished = true;
      var record = data.records[i];
      self.emit('record', record, self.totalFetched++, self);
    if (data.nextRecordsUrl) {
      self._locator = data.nextRecordsUrl.split('/').pop();
    self._finished = self._finished || data.done || !autoFetch;
    if (self._finished) {
    } else {
    return res;

 * Readable stream implementation
 * @override
 * @private
Query.prototype._read = function(size) {
  if (!this._finished && !this._executed) {
    this.execute({ autoFetch: true });

/** @override **/
Query.prototype.on = function(e, fn) {
  if (e === 'record') {
    var self = this;
    this.on('readable', function() {
      while( !== null) {} // discard buffered records
  return, e, fn);

/** @override **/
Query.prototype.addListener = Query.prototype.on;

 * @private
Query.prototype._expandFields = function() {
  if (this._soql) {
    return Promise.reject(new Error("Cannot expand fields for the query which has already built SOQL."));
  var self = this;
  var logger = self._conn._logger;
  var conn = this._conn;
  var table = this._config.table;
  var fields = this._config.fields || [];

  logger.debug('_expandFields: table = ' + table + ', fields = ' + fields.join(', '));

  return Promise.all([
    Promise.resolve(self._parent ? findRelationTable(table) : table)
      .then(function(table) {
        return Promise.all(
, function(field) { return expandAsteriskField(table, field); })
        ).then(function(expandedFields) {
          self._config.fields = _.flatten(expandedFields);
    Promise.all( || [], function(childQuery) {
        return childQuery._expandFields();

  function findRelationTable(rname) {
    var ptable = self._parent._config.table;
    logger.debug('finding table for relation "' + rname + '" in "' + ptable + '"...');
    return describeCache(ptable).then(function(sobject) {
      var upperRname = rname.toUpperCase();
      var childRelation = _.find(sobject.childRelationships, function(cr) {
        return (cr.relationshipName || '').toUpperCase() === upperRname;
      return childRelation ? childRelation.childSObject :
        Promise.reject(new Error("No child relationship found: " + rname ));

  function describeCache(table) {
    logger.debug('describe cache: '+table);
    var deferred = Promise.defer();
    conn.describe$(table, function(err, sobject) {
      logger.debug('... done.');
      if (err) { deferred.reject(err); }
      else { deferred.resolve(sobject); }
    return deferred.promise;

  function expandAsteriskField(table, field) {
    logger.debug('expanding field "'+ field + '" in "' + table + '"...');
    var fpath = field.split('.');
    return fpath[fpath.length - 1] === '*' ?
      describeCache(table).then(function(sobject) {
        logger.debug('table '+table+'has been described');
        if (fpath.length > 1) {
          var rname = fpath.shift();
          var rfield = _.find(sobject.fields, function(f) {
            return f.relationshipName &&
                   f.relationshipName.toUpperCase() === rname.toUpperCase();
          if (rfield) {
            var rtable = rfield.referenceTo.length === 1 ? rfield.referenceTo[0] : 'Name';
            return expandAsteriskField(rtable, fpath.join('.')).then(function(fpaths) {
              return, function(fpath) { return rname + '.' + fpath; });
          } else {
            return [];
        } else {
          return, function(f) { return; });
      }) :
      Promise.resolve([ field ]);

 * Explain plan for executing query
 * @param {Callback.<ExplainInfo>} [callback] - Callback function
 * @returns {Promise.<ExplainInfo>}
Query.prototype.explain = function(callback) {
  var self = this;
  var logger = this._conn._logger;
  return self.toSOQL().then(function(soql) {
    logger.debug("SOQL = " + soql);
    var url = "/query/?explain=" + encodeURIComponent(soql);
    return self._conn.request(url);

 * Return SOQL expression for the query
 * @param {Callback.<String>} [callback] - Callback function
 * @returns {Promise.<String>}
Query.prototype.toSOQL = function(callback) {
  var self = this;
  return Promise.resolve(self._soql ||
    self._expandFields().then(function() { return SOQLBuilder.createSOQL(self._config); })

 * Create data stream of queried records.
 * Automatically resume query if paused.
 * @param {String} [type] - Type of outgoing data format. Currently 'csv' is default value and the only supported.
 * @param {Object} [options] - Options passed to converter
 * @returns {stream.Readable}
 */ =;

 * Get record stream of queried records applying the given mapping function
 * @param {RecordMapFunction} fn - Record mapping function
 * @returns {RecordStream.Serializable}
 */ =;

 * Get record stream of queried records, applying the given filter function
 * @param {RecordFilterFunction} fn - Record filtering function
 * @returns {RecordStream.Serializable}
Query.prototype.filter =;

 * Synonym of Query#destroy()
 * @method Query#delete
 * @param {String} [type] - SObject type. Required for SOQL based query object.
 * @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
 * @returns {Bulk~Batch}
 * Synonym of Query#destroy()
 * @method Query#del
 * @param {String} [type] - SObject type. Required for SOQL based query object.
 * @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
 * @returns {Bulk~Batch}
 * Bulk delete queried records
 * @method Query#destroy
 * @param {String} [type] - SObject type. Required for SOQL based query object.
 * @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
 * @returns {Promise.<Array.<RecordResult>>}
Query.prototype["delete"] =
Query.prototype.del =
Query.prototype.destroy = function(type, callback) {
  if (typeof type === 'function') {
    callback = type;
    type = null;
  type = type || (this._config && this._config.table);
  if (!type) {
    throw new Error("SOQL based query needs SObject type information to bulk delete.");
  var batch = this._conn.sobject(type).deleteBulk();
  var deferred = Promise.defer();
  var handleError = function(err) {
    if ( === 'ClientInputError') { deferred.resolve([]); } // if batch input receives no records
    else { deferred.reject(err); }
  this.on('error', handleError)
    .on('response', function(res) { deferred.resolve(res); })
    .on('error', handleError);
  return deferred.promise.thenCall(callback);

 * Bulk update queried records, using given mapping function/object
 * @param {Record|RecordMapFunction} mapping - Mapping record or record mapping function
 * @param {String} [type] - SObject type. Required for SOQL based query object.
 * @param {Callback.<Array.<RecordResult>>} [callback] - Callback function
 * @returns {Promise.<Array.<RecordResult>>}
Query.prototype.update = function(mapping, type, callback) {
  if (typeof type === 'function') {
    callback = type;
    type = null;
  type = type || (this._config && this._config.table);
  if (!type) {
    throw new Error("SOQL based query needs SObject type information to bulk update.");
  var updateStream = _.isFunction(mapping) ? : RecordStream.recordMapStream(mapping);
  var batch = this._conn.sobject(type).updateBulk();
  var deferred = Promise.defer();
  var handleError = function(err) {
    if ( === 'ClientInputError') { deferred.resolve([]); } // if batch input receives no records
    else { deferred.reject(err); }
  this.on('error', handleError)
    .on('error', handleError)
    .on('response', function(res) { deferred.resolve(res); })
    .on('error', handleError);
  return deferred.promise.thenCall(callback);

 * Promise/A+ interface
 * Delegate to deferred promise, return promise instance for query result
 * @param {FulfilledCallback.<T, S1>} [onFulfilled]
 * @param {RejectedCallback.<S2>} [onRejected]
 * @returns {Promise.<S1|S2>}
Query.prototype.then = function(onResolved, onReject) {
  this._chaining = true;
  if (!this._finished && !this._executed) { this.execute(); }
  return this._deferred.promise.then.apply(this._deferred.promise, arguments);

 * Promise/A+ extension
 * Call "then" using given node-style callback function
 * @param {Callback.<T>} [callback] - Callback function
 * @returns {Query}
Query.prototype.thenCall = function(callback) {
  if (_.isFunction(callback)) {
    this.then(function(res) {
      process.nextTick(function() {
        callback(null, res);
    }, function(err) {
      process.nextTick(function() {
  return this;


 * SubQuery object for representing child relationship query
 * @protected
 * @class Query~SubQuery
 * @extends Query
 * @param {Connection} conn - Connection object
 * @param {Query} parent - Parent query object
 * @param {Object} config - Sub query configuration
var SubQuery = function(conn, parent, config) {, conn, config);
  this._parent = parent;

inherits(SubQuery, Query);

 * @method Query~SubQuery#include
 * @override
SubQuery.prototype.include = function() {
  throw new Error("Not allowed to include another subquery in subquery.");

 * Back the context to parent query object
 * @method Query~SubQuery#end
 * @returns {Query}
SubQuery.prototype.end = function() {
  return this._parent;

 * If execute is called in subquery context, delegate it to parent query object
 * @method Query~SubQuery#execute
 * @override
 */ =
SubQuery.prototype.exec =
SubQuery.prototype.execute = function() {
  return this._parent.execute.apply(this._parent, arguments);