Source: api/bulk.js

/*global process*/
 * @file Manages Salesforce Bulk API related operations
 * @author Shinichi Tomita <>

'use strict';

var inherits     = require('inherits'),
    stream       = require('readable-stream'),
    Duplex       = stream.Duplex,
    events       = require('events'),
    _            = require('lodash/core'),
    jsforce      = require('../core'),
    RecordStream = require('../record-stream'),
    CSV          = require('../csv'),
    Promise      = require('../promise'),
    HttpApi      = require('../http-api');


 * Class for Bulk API Job
 * @protected
 * @class Bulk~Job
 * @extends events.EventEmitter
 * @param {Bulk} bulk - Bulk API object
 * @param {String} [type] - SObject type
 * @param {String} [operation] - Bulk load operation ('insert', 'update', 'upsert', 'delete', or 'hardDelete')
 * @param {Object} [options] - Options for bulk loading operation
 * @param {String} [options.extIdField] - External ID field name (used when upsert operation).
 * @param {String} [options.concurrencyMode] - 'Serial' or 'Parallel'. Defaults to Parallel.
 * @param {String} [jobId] - Job ID (if already available)
var Job = function(bulk, type, operation, options, jobId) {
  this._bulk = bulk;
  this.type = type;
  this.operation = operation;
  this.options = options || {}; = jobId;
  this.state = ? 'Open' : 'Unknown';
  this._batches = {};

inherits(Job, events.EventEmitter);

 * @typedef {Object} Bulk~JobInfo
 * @prop {String} id - Job ID
 * @prop {String} object - Object type name
 * @prop {String} operation - Operation type of the job
 * @prop {String} state - Job status

 * Return latest jobInfo from cache
 * @method Bulk~Job#open
 * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
 * @returns {Promise.<Bulk~JobInfo>}
 */ = function(callback) {
  var self = this;
  // if cache is not available, check the latest
  if (!this._jobInfo) {
    this._jobInfo = this.check();
  return this._jobInfo.thenCall(callback);

 * Open new job and get jobinfo
 * @method Bulk~Job#open
 * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
 * @returns {Promise.<Bulk~JobInfo>}
 */ = function(callback) {
  var self = this;
  var bulk = this._bulk;
  var logger = bulk._logger;

  // if not requested opening job
  if (!this._jobInfo) {
    var operation = this.operation.toLowerCase();
    if (operation === 'harddelete') { operation = 'hardDelete'; }
    var body = [
      '<?xml version="1.0" encoding="UTF-8"?>',
      '<jobInfo  xmlns="">',
        '<operation>' + operation + '</operation>',
        '<object>' + this.type + '</object>',
        (this.options.extIdField ?
         '<externalIdFieldName>'+this.options.extIdField+'</externalIdFieldName>' :
        (this.options.concurrencyMode ?
         '<concurrencyMode>'+this.options.concurrencyMode+'</concurrencyMode>' :
        (this.options.assignmentRuleId ?
          '<assignmentRuleId>' + this.options.assignmentRuleId + '</assignmentRuleId>' :

    this._jobInfo = bulk._request({
      method : 'POST',
      path : "/job",
      body : body,
      headers : {
        "Content-Type" : "application/xml; charset=utf-8"
      responseType: "application/xml"
    }).then(function(res) {
      self.emit("open", res.jobInfo); =;
      self.state = res.jobInfo.state;
      return res.jobInfo;
    }, function(err) {
      self.emit("error", err);
      throw err;
  return this._jobInfo.thenCall(callback);

 * Create a new batch instance in the job
 * @method Bulk~Job#createBatch
 * @returns {Bulk~Batch}
Job.prototype.createBatch = function() {
  var batch = new Batch(this);
  var self = this;
  batch.on('queue', function() {
    self._batches[] = batch;
  return batch;

 * Get a batch instance specified by given batch ID
 * @method Bulk~Job#batch
 * @param {String} batchId - Batch ID
 * @returns {Bulk~Batch}
Job.prototype.batch = function(batchId) {
  var batch = this._batches[batchId];
  if (!batch) {
    batch = new Batch(this, batchId);
    this._batches[batchId] = batch;
  return batch;

 * Check the latest job status from server
 * @method Bulk~Job#check
 * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
 * @returns {Promise.<Bulk~JobInfo>}
Job.prototype.check = function(callback) {
  var self = this;
  var bulk = this._bulk;
  var logger = bulk._logger;

  this._jobInfo = this._waitAssign().then(function() {
    return bulk._request({
      method : 'GET',
      path : "/job/" +,
      responseType: "application/xml"
  }).then(function(res) {
    logger.debug(res.jobInfo); =;
    self.type = res.jobInfo.object;
    self.operation = res.jobInfo.operation;
    self.state = res.jobInfo.state;
    return res.jobInfo;
  return this._jobInfo.thenCall(callback);

 * Wait till the job is assigned to server
 * @method Bulk~Job#info
 * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
 * @returns {Promise.<Bulk~JobInfo>}
Job.prototype._waitAssign = function(callback) {
  return ( ? Promise.resolve({ id: }) :;

 * List all registered batch info in job
 * @method Bulk~Job#list
 * @param {Callback.<Array.<Bulk~BatchInfo>>} [callback] - Callback function
 * @returns {Promise.<Array.<Bulk~BatchInfo>>}
Job.prototype.list = function(callback) {
  var self = this;
  var bulk = this._bulk;
  var logger = bulk._logger;

  return this._waitAssign().then(function() {
    return bulk._request({
      method : 'GET',
      path : "/job/" + + "/batch",
      responseType: "application/xml"
  }).then(function(res) {
    var batchInfoList = res.batchInfoList;
    batchInfoList = _.isArray(batchInfoList.batchInfo) ? batchInfoList.batchInfo : [ batchInfoList.batchInfo ];
    return batchInfoList;


 * Close opened job
 * @method Bulk~Job#close
 * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
 * @returns {Promise.<Bulk~JobInfo>}
Job.prototype.close = function() {
  var self = this;
  return this._changeState("Closed").then(function(jobInfo) { = null;
    self.emit("close", jobInfo);
    return jobInfo;
  }, function(err) {
    self.emit("error", err);
    throw err;

 * Set the status to abort
 * @method Bulk~Job#abort
 * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
 * @returns {Promise.<Bulk~JobInfo>}
Job.prototype.abort = function() {
  var self = this;
  return this._changeState("Aborted").then(function(jobInfo) { = null;
    self.emit("abort", jobInfo);
    return jobInfo;
  }, function(err) {
    self.emit("error", err);
    throw err;

 * @private
Job.prototype._changeState = function(state, callback) {
  var self = this;
  var bulk = this._bulk;
  var logger = bulk._logger;

  this._jobInfo = this._waitAssign().then(function() {
    var body = [
      '<?xml version="1.0" encoding="UTF-8"?>',
      '<jobInfo xmlns="">',
        '<state>' + state + '</state>',
    return bulk._request({
      method : 'POST',
      path : "/job/" +,
      body : body,
      headers : {
        "Content-Type" : "application/xml; charset=utf-8"
      responseType: "application/xml"
  }).then(function(res) {
    self.state = res.jobInfo.state;
    return res.jobInfo;
  return this._jobInfo.thenCall(callback);



 * Batch (extends RecordStream)
 * @protected
 * @class Bulk~Batch
 * @extends {stream.Writable}
 * @implements {Promise.<Array.<RecordResult>>}
 * @param {Bulk~Job} job - Bulk job object
 * @param {String} [batchId] - Batch ID (if already available)
var Batch = function(job, batchId) {, { objectMode: true });
  this.job = job; = batchId;
  this._bulk = job._bulk;
  this._deferred = Promise.defer();

inherits(Batch, stream.Writable);

 * @private
Batch.prototype._setupDataStreams = function() {
  var batch = this;
  var converterOptions = { nullValue : '#N/A' };
  this._uploadStream = new RecordStream.Serializable();
  this._uploadDataStream ='csv', converterOptions);
  this._downloadStream = new RecordStream.Parsable();
  this._downloadDataStream ='csv', converterOptions);

  this.on('finish', function() {
  this._uploadDataStream.once('readable', function() { {
      // pipe upload data to batch API request stream

  // duplex data stream, opened access to API programmers by Batch#stream()
  var dataStream = this._dataStream = new Duplex();
  dataStream._write = function(data, enc, cb) {
    batch._uploadDataStream.write(data, enc, cb);
  dataStream.on('finish', function() {

  this._downloadDataStream.on('readable', function() {;
  this._downloadDataStream.on('end', function() {
  dataStream._read = function(size) {
    var chunk;
    while ((chunk = !== null) {

 * Connect batch API and create stream instance of request/response
 * @private
 * @returns {stream.Duplex}
Batch.prototype._createRequestStream = function() {
  var batch = this;
  var bulk = batch._bulk;
  var logger = bulk._logger;

  return bulk._request({
    method : 'POST',
    path : "/job/" + + "/batch",
    headers: {
      "Content-Type": "text/csv"
    responseType: "application/xml"
  }, function(err, res) {
    if (err) {
      batch.emit('error', err);
    } else {
      logger.debug(res.batchInfo); =;
      batch.emit('queue', res.batchInfo);

 * Implementation of Writable
 * @override
 * @private
Batch.prototype._write = function(record, enc, cb) {
  record = _.clone(record);
  if (this.job.operation === "insert") {
    delete record.Id;
  } else if (this.job.operation === "delete") {
    record = { Id: record.Id };
  delete record.type;
  delete record.attributes;
  this._uploadStream.write(record, enc, cb);

 * Returns duplex stream which accepts CSV data input and batch result output
 * @returns {stream.Duplex}
 */ = function() {
  return this._dataStream;

 * Execute batch operation
 * @method Bulk~Batch#execute
 * @param {Array.<Record>|stream.Stream|String} [input] - Input source for batch operation. Accepts array of records, CSV string, and CSV data input stream in insert/update/upsert/delete/hardDelete operation, SOQL string in query operation.
 * @param {Callback.<Array.<RecordResult>|Array.<BatchResultInfo>>} [callback] - Callback function
 * @returns {Bulk~Batch}
 */ =
Batch.prototype.exec =
Batch.prototype.execute = function(input, callback) {
  var self = this;

  if (typeof input === 'function') { // if input argument is omitted
    callback = input;
    input = null;

  // if batch is already executed
  if (this._result) {
    throw new Error("Batch already executed.");

  var rdeferred = Promise.defer();
  this._result = rdeferred.promise;
  this._result.then(function(res) {
  }, function(err) {
  this.once('response', function(res) {
  this.once('error', function(err) {

  if (_.isObject(input) && _.isFunction(input.pipe)) { // if input has stream.Readable interface
  } else {
    var data;
    if (_.isArray(input)) {
      _.forEach(input, function(record) { self.write(record); });
    } else if (_.isString(input)){
      data = input;
      this._dataStream.write(data, 'utf8');

  // return Batch instance for chaining
  return this.thenCall(callback);

 * Promise/A+ interface
 * Delegate to deferred promise, return promise instance for batch result
 * @method Bulk~Batch#then
Batch.prototype.then = function(onResolved, onReject, onProgress) {
  return this._deferred.promise.then(onResolved, onReject, onProgress);

 * Promise/A+ extension
 * Call "then" using given node-style callback function
 * @method Bulk~Batch#thenCall
Batch.prototype.thenCall = function(callback) {
  if (_.isFunction(callback)) {
    this.then(function(res) {
      process.nextTick(function() {
        callback(null, res);
    }, function(err) {
      process.nextTick(function() {
  return this;

 * @typedef {Object} Bulk~BatchInfo
 * @prop {String} id - Batch ID
 * @prop {String} jobId - Job ID
 * @prop {String} state - Batch state
 * @prop {String} stateMessage - Batch state message

 * Check the latest batch status in server
 * @method Bulk~Batch#check
 * @param {Callback.<Bulk~BatchInfo>} [callback] - Callback function
 * @returns {Promise.<Bulk~BatchInfo>}
Batch.prototype.check = function(callback) {
  var self = this;
  var bulk = this._bulk;
  var logger = bulk._logger;
  var jobId =;
  var batchId =;

  if (!jobId || !batchId) {
    throw new Error("Batch not started.");
  return bulk._request({
    method : 'GET',
    path : "/job/" + jobId + "/batch/" + batchId,
    responseType: "application/xml"
  }).then(function(res) {
    return res.batchInfo;

 * Polling the batch result and retrieve
 * @method Bulk~Batch#poll
 * @param {Number} interval - Polling interval in milliseconds
 * @param {Number} timeout - Polling timeout in milliseconds
Batch.prototype.poll = function(interval, timeout) {
  var self = this;
  var jobId =;
  var batchId =;

  if (!jobId || !batchId) {
    throw new Error("Batch not started.");
  var startTime = new Date().getTime();
  var poll = function() {
    var now = new Date().getTime();
    if (startTime + timeout < now) {
      var err = new Error("Polling time out. Job Id = " + jobId + " , batch Id = " + batchId); = 'PollingTimeout';
      self.emit('error', err);
    self.check(function(err, res) {
      if (err) {
        self.emit('error', err);
      } else {
        if (res.state === "Failed") {
          if (parseInt(res.numberRecordsProcessed, 10) > 0) {
          } else {
            self.emit('error', new Error(res.stateMessage));
        } else if (res.state === "Completed") {
        } else {
          self.emit('progress', res);
          setTimeout(poll, interval);
  setTimeout(poll, interval);

 * @typedef {Object} Bulk~BatchResultInfo
 * @prop {String} id - Batch result ID
 * @prop {String} batchId - Batch ID which includes this batch result.
 * @prop {String} jobId - Job ID which includes this batch result.

 * Retrieve batch result
 * @method Bulk~Batch#retrieve
 * @param {Callback.<Array.<RecordResult>|Array.<Bulk~BatchResultInfo>>} [callback] - Callback function
 * @returns {Promise.<Array.<RecordResult>|Array.<Bulk~BatchResultInfo>>}
Batch.prototype.retrieve = function(callback) {
  var self = this;
  var bulk = this._bulk;
  var jobId =;
  var job = this.job;
  var batchId =;

  if (!jobId || !batchId) {
    throw new Error("Batch not started.");

  return {
    return bulk._request({
      method : 'GET',
      path : "/job/" + jobId + "/batch/" + batchId + "/result"
  }).then(function(res) {
    var results;
    if (job.operation === 'query') {
      var conn = bulk._conn;
      var resultIds = res['result-list'].result;
      results = res['result-list'].result;
      results = ? results : [ results ], function(id) {
        return {
          id: id,
          batchId: batchId,
          jobId: jobId
    } else {
      results =, function(ret) {
        return {
          id: ret.Id || null,
          success: ret.Success === "true",
          errors: ret.Error ? [ ret.Error ] : []
    self.emit('response', results);
    return results;
  }).fail(function(err) {
    self.emit('error', err);
    throw err;

 * Fetch query result as a record stream
 * @param {String} resultId - Result id
 * @returns {RecordStream} - Record stream, convertible to CSV data stream
Batch.prototype.result = function(resultId) {
  var jobId =;
  var batchId =;
  if (!jobId || !batchId) {
    throw new Error("Batch not started.");
  var resultStream = new RecordStream.Parsable();
  var resultDataStream ='csv');
  var reqStream = this._bulk._request({
    method : 'GET',
    path : "/job/" + jobId + "/batch/" + batchId + "/result/" + resultId
  return resultStream;

 * @private
var BulkApi = function() {
  BulkApi.super_.apply(this, arguments);

inherits(BulkApi, HttpApi);

BulkApi.prototype.beforeSend = function(request) {
  request.headers = request.headers || {};
  request.headers["X-SFDC-SESSION"] = this._conn.accessToken;

BulkApi.prototype.isSessionExpired = function(response) {
  return response.statusCode === 400 &&

BulkApi.prototype.hasErrorInResponseBody = function(body) {
  return !!body.error;

BulkApi.prototype.parseError = function(body) {
  return {
    errorCode: body.error.exceptionCode,
    message: body.error.exceptionMessage


 * Class for Bulk API
 * @class
 * @param {Connection} conn - Connection object
var Bulk = function(conn) {
  this._conn = conn;
  this._logger = conn._logger;

 * Polling interval in milliseconds
 * @type {Number}
Bulk.prototype.pollInterval = 1000;

 * Polling timeout in milliseconds
 * @type {Number}
Bulk.prototype.pollTimeout = 10000;

/** @private **/
Bulk.prototype._request = function(request, callback) {
  var conn = this._conn;
  request = _.clone(request);
  var baseUrl = [ conn.instanceUrl, "services/async", conn.version ].join('/');
  request.url = baseUrl + request.path;
  var options = { responseType: request.responseType };
  delete request.path;
  delete request.responseType;
  return new BulkApi(this._conn, options).request(request).thenCall(callback);

 * Create and start bulkload job and batch
 * @param {String} type - SObject type
 * @param {String} operation - Bulk load operation ('insert', 'update', 'upsert', 'delete', or 'hardDelete')
 * @param {Object} [options] - Options for bulk loading operation
 * @param {String} [options.extIdField] - External ID field name (used when upsert operation).
 * @param {String} [options.concurrencyMode] - 'Serial' or 'Parallel'. Defaults to Parallel.
 * @param {Array.<Record>|stream.Stream|String} [input] - Input source for bulkload. Accepts array of records, CSV string, and CSV data input stream in insert/update/upsert/delete/hardDelete operation, SOQL string in query operation.
 * @param {Callback.<Array.<RecordResult>|Array.<Bulk~BatchResultInfo>>} [callback] - Callback function
 * @returns {Bulk~Batch}
Bulk.prototype.load = function(type, operation, options, input, callback) {
  var self = this;
  if (!type || !operation) {
    throw new Error("Insufficient arguments. At least, 'type' and 'operation' are required.");
  if (!_.isObject(options) || options.constructor !== Object) { // when options is not plain hash object, it is omitted
    callback = input;
    input = options;
    options = null;
  var job = this.createJob(type, operation, options);
  job.once('error', function (error) {
    if (batch) {
      batch.emit('error', error); // pass job error to batch
  var batch = job.createBatch();
  var cleanup = function() {
    batch = null;
  var cleanupOnError = function(err) {
    if ( !== 'PollingTimeout') {
  batch.on('response', cleanup);
  batch.on('error', cleanupOnError);
  batch.on('queue', function() { batch.poll(self.pollInterval, self.pollTimeout); });
  return batch.execute(input, callback);

 * Execute bulk query and get record stream
 * @param {String} soql - SOQL to execute in bulk job
 * @returns {RecordStream.Parsable} - Record stream, convertible to CSV data stream
Bulk.prototype.query = function(soql) {
  var m = soql.replace(/\([\s\S]+\)/g, '').match(/FROM\s+(\w+)/i);
  if (!m) {
    throw new Error("No sobject type found in query, maybe caused by invalid SOQL.");
  var type = m[1];
  var self = this;
  var recordStream = new RecordStream.Parsable();
  var dataStream ='csv');
  this.load(type, "query", soql).then(function(results) {
    // Ideally, it should merge result files into one stream.
    // Currently only first batch result is the target (mostly enough).
    var r = results[0];
    var result = self.job(r.jobId).batch(r.batchId).result(;;
  }).fail(function(err) {
    recordStream.emit('error', err);
  return recordStream;

 * Create a new job instance
 * @param {String} type - SObject type
 * @param {String} operation - Bulk load operation ('insert', 'update', 'upsert', 'delete', 'hardDelete', or 'query')
 * @param {Object} [options] - Options for bulk loading operation
 * @returns {Bulk~Job}
Bulk.prototype.createJob = function(type, operation, options) {
  return new Job(this, type, operation, options);

 * Get a job instance specified by given job ID
 * @param {String} jobId - Job ID
 * @returns {Bulk~Job}
Bulk.prototype.job = function(jobId) {
  return new Job(this, null, null, null, jobId);

 * Register hook in connection instantiation for dynamically adding this API module features
jsforce.on('connection:new', function(conn) {
  conn.bulk = new Bulk(conn);

module.exports = Bulk;