/*global process*/
/**
* @file Manages Salesforce Bulk API related operations
* @author Shinichi Tomita <shinichi.tomita@gmail.com>
*/
'use strict';
var inherits = require('inherits'),
stream = require('readable-stream'),
Duplex = stream.Duplex,
events = require('events'),
_ = require('lodash/core'),
jsforce = require('../core'),
RecordStream = require('../record-stream'),
CSV = require('../csv'),
Promise = require('../promise'),
HttpApi = require('../http-api');
/*--------------------------------------------*/
/**
* Class for Bulk API Job
*
* @protected
* @class Bulk~Job
* @extends events.EventEmitter
*
* @param {Bulk} bulk - Bulk API object
* @param {String} [type] - SObject type
* @param {String} [operation] - Bulk load operation ('insert', 'update', 'upsert', 'delete', or 'hardDelete')
* @param {Object} [options] - Options for bulk loading operation
* @param {String} [options.extIdField] - External ID field name (used when upsert operation).
* @param {String} [options.concurrencyMode] - 'Serial' or 'Parallel'. Defaults to Parallel.
* @param {String} [jobId] - Job ID (if already available)
*/
var Job = function(bulk, type, operation, options, jobId) {
this._bulk = bulk;
this.type = type;
this.operation = operation;
this.options = options || {};
this.id = jobId;
this.state = this.id ? 'Open' : 'Unknown';
this._batches = {};
};
inherits(Job, events.EventEmitter);
/**
* @typedef {Object} Bulk~JobInfo
* @prop {String} id - Job ID
* @prop {String} object - Object type name
* @prop {String} operation - Operation type of the job
* @prop {String} state - Job status
*/
/**
* Return latest jobInfo from cache
*
* @method Bulk~Job#open
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype.info = function(callback) {
var self = this;
// if cache is not available, check the latest
if (!this._jobInfo) {
this._jobInfo = this.check();
}
return this._jobInfo.thenCall(callback);
};
/**
* Open new job and get jobinfo
*
* @method Bulk~Job#open
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype.open = function(callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
// if not requested opening job
if (!this._jobInfo) {
var operation = this.operation.toLowerCase();
if (operation === 'harddelete') { operation = 'hardDelete'; }
var body = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">',
'<operation>' + operation + '</operation>',
'<object>' + this.type + '</object>',
(this.options.extIdField ?
'<externalIdFieldName>'+this.options.extIdField+'</externalIdFieldName>' :
''),
(this.options.concurrencyMode ?
'<concurrencyMode>'+this.options.concurrencyMode+'</concurrencyMode>' :
''),
(this.options.assignmentRuleId ?
'<assignmentRuleId>' + this.options.assignmentRuleId + '</assignmentRuleId>' :
''),
'<contentType>CSV</contentType>',
'</jobInfo>'
].join('');
this._jobInfo = bulk._request({
method : 'POST',
path : "/job",
body : body,
headers : {
"Content-Type" : "application/xml; charset=utf-8"
},
responseType: "application/xml"
}).then(function(res) {
self.emit("open", res.jobInfo);
self.id = res.jobInfo.id;
self.state = res.jobInfo.state;
return res.jobInfo;
}, function(err) {
self.emit("error", err);
throw err;
});
}
return this._jobInfo.thenCall(callback);
};
/**
* Create a new batch instance in the job
*
* @method Bulk~Job#createBatch
* @returns {Bulk~Batch}
*/
Job.prototype.createBatch = function() {
var batch = new Batch(this);
var self = this;
batch.on('queue', function() {
self._batches[batch.id] = batch;
});
return batch;
};
/**
* Get a batch instance specified by given batch ID
*
* @method Bulk~Job#batch
* @param {String} batchId - Batch ID
* @returns {Bulk~Batch}
*/
Job.prototype.batch = function(batchId) {
var batch = this._batches[batchId];
if (!batch) {
batch = new Batch(this, batchId);
this._batches[batchId] = batch;
}
return batch;
};
/**
* Check the latest job status from server
*
* @method Bulk~Job#check
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype.check = function(callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
this._jobInfo = this._waitAssign().then(function() {
return bulk._request({
method : 'GET',
path : "/job/" + self.id,
responseType: "application/xml"
});
}).then(function(res) {
logger.debug(res.jobInfo);
self.id = res.jobInfo.id;
self.type = res.jobInfo.object;
self.operation = res.jobInfo.operation;
self.state = res.jobInfo.state;
return res.jobInfo;
});
return this._jobInfo.thenCall(callback);
};
/**
* Wait till the job is assigned to server
*
* @method Bulk~Job#info
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype._waitAssign = function(callback) {
return (this.id ? Promise.resolve({ id: this.id }) : this.open()).thenCall(callback);
};
/**
* List all registered batch info in job
*
* @method Bulk~Job#list
* @param {Callback.<Array.<Bulk~BatchInfo>>} [callback] - Callback function
* @returns {Promise.<Array.<Bulk~BatchInfo>>}
*/
Job.prototype.list = function(callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
return this._waitAssign().then(function() {
return bulk._request({
method : 'GET',
path : "/job/" + self.id + "/batch",
responseType: "application/xml"
});
}).then(function(res) {
logger.debug(res.batchInfoList.batchInfo);
var batchInfoList = res.batchInfoList;
batchInfoList = _.isArray(batchInfoList.batchInfo) ? batchInfoList.batchInfo : [ batchInfoList.batchInfo ];
return batchInfoList;
}).thenCall(callback);
};
/**
* Close opened job
*
* @method Bulk~Job#close
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype.close = function() {
var self = this;
return this._changeState("Closed").then(function(jobInfo) {
self.id = null;
self.emit("close", jobInfo);
return jobInfo;
}, function(err) {
self.emit("error", err);
throw err;
});
};
/**
* Set the status to abort
*
* @method Bulk~Job#abort
* @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~JobInfo>}
*/
Job.prototype.abort = function() {
var self = this;
return this._changeState("Aborted").then(function(jobInfo) {
self.id = null;
self.emit("abort", jobInfo);
return jobInfo;
}, function(err) {
self.emit("error", err);
throw err;
});
};
/**
* @private
*/
Job.prototype._changeState = function(state, callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
this._jobInfo = this._waitAssign().then(function() {
var body = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">',
'<state>' + state + '</state>',
'</jobInfo>'
].join('');
return bulk._request({
method : 'POST',
path : "/job/" + self.id,
body : body,
headers : {
"Content-Type" : "application/xml; charset=utf-8"
},
responseType: "application/xml"
});
}).then(function(res) {
logger.debug(res.jobInfo);
self.state = res.jobInfo.state;
return res.jobInfo;
});
return this._jobInfo.thenCall(callback);
};
/*--------------------------------------------*/
/**
* Batch (extends RecordStream)
*
* @protected
* @class Bulk~Batch
* @extends {stream.Writable}
* @implements {Promise.<Array.<RecordResult>>}
* @param {Bulk~Job} job - Bulk job object
* @param {String} [batchId] - Batch ID (if already available)
*/
var Batch = function(job, batchId) {
Batch.super_.call(this, { objectMode: true });
this.job = job;
this.id = batchId;
this._bulk = job._bulk;
this._deferred = Promise.defer();
this._setupDataStreams();
};
inherits(Batch, stream.Writable);
/**
* @private
*/
Batch.prototype._setupDataStreams = function() {
var batch = this;
var converterOptions = { nullValue : '#N/A' };
this._uploadStream = new RecordStream.Serializable();
this._uploadDataStream = this._uploadStream.stream('csv', converterOptions);
this._downloadStream = new RecordStream.Parsable();
this._downloadDataStream = this._downloadStream.stream('csv', converterOptions);
this.on('finish', function() {
batch._uploadStream.end();
});
this._uploadDataStream.once('readable', function() {
batch.job.open().then(function() {
// pipe upload data to batch API request stream
batch._uploadDataStream.pipe(batch._createRequestStream());
});
});
// duplex data stream, opened access to API programmers by Batch#stream()
var dataStream = this._dataStream = new Duplex();
dataStream._write = function(data, enc, cb) {
batch._uploadDataStream.write(data, enc, cb);
};
dataStream.on('finish', function() {
batch._uploadDataStream.end();
});
this._downloadDataStream.on('readable', function() {
dataStream.read(0);
});
this._downloadDataStream.on('end', function() {
dataStream.push(null);
});
dataStream._read = function(size) {
var chunk;
while ((chunk = batch._downloadDataStream.read()) !== null) {
dataStream.push(chunk);
}
};
};
/**
* Connect batch API and create stream instance of request/response
*
* @private
* @returns {stream.Duplex}
*/
Batch.prototype._createRequestStream = function() {
var batch = this;
var bulk = batch._bulk;
var logger = bulk._logger;
return bulk._request({
method : 'POST',
path : "/job/" + batch.job.id + "/batch",
headers: {
"Content-Type": "text/csv"
},
responseType: "application/xml"
}, function(err, res) {
if (err) {
batch.emit('error', err);
} else {
logger.debug(res.batchInfo);
batch.id = res.batchInfo.id;
batch.emit('queue', res.batchInfo);
}
}).stream();
};
/**
* Implementation of Writable
*
* @override
* @private
*/
Batch.prototype._write = function(record, enc, cb) {
record = _.clone(record);
if (this.job.operation === "insert") {
delete record.Id;
} else if (this.job.operation === "delete") {
record = { Id: record.Id };
}
delete record.type;
delete record.attributes;
this._uploadStream.write(record, enc, cb);
};
/**
* Returns duplex stream which accepts CSV data input and batch result output
*
* @returns {stream.Duplex}
*/
Batch.prototype.stream = function() {
return this._dataStream;
};
/**
* Execute batch operation
*
* @method Bulk~Batch#execute
* @param {Array.<Record>|stream.Stream|String} [input] - Input source for batch operation. Accepts array of records, CSV string, and CSV data input stream in insert/update/upsert/delete/hardDelete operation, SOQL string in query operation.
* @param {Callback.<Array.<RecordResult>|Array.<BatchResultInfo>>} [callback] - Callback function
* @returns {Bulk~Batch}
*/
Batch.prototype.run =
Batch.prototype.exec =
Batch.prototype.execute = function(input, callback) {
var self = this;
if (typeof input === 'function') { // if input argument is omitted
callback = input;
input = null;
}
// if batch is already executed
if (this._result) {
throw new Error("Batch already executed.");
}
var rdeferred = Promise.defer();
this._result = rdeferred.promise;
this._result.then(function(res) {
self._deferred.resolve(res);
}, function(err) {
self._deferred.reject(err);
});
this.once('response', function(res) {
rdeferred.resolve(res);
});
this.once('error', function(err) {
rdeferred.reject(err);
});
if (_.isObject(input) && _.isFunction(input.pipe)) { // if input has stream.Readable interface
input.pipe(this._dataStream);
} else {
var data;
if (_.isArray(input)) {
_.forEach(input, function(record) { self.write(record); });
self.end();
} else if (_.isString(input)){
data = input;
this._dataStream.write(data, 'utf8');
this._dataStream.end();
}
}
// return Batch instance for chaining
return this.thenCall(callback);
};
/**
* Promise/A+ interface
* http://promises-aplus.github.io/promises-spec/
*
* Delegate to deferred promise, return promise instance for batch result
*
* @method Bulk~Batch#then
*/
Batch.prototype.then = function(onResolved, onReject, onProgress) {
return this._deferred.promise.then(onResolved, onReject, onProgress);
};
/**
* Promise/A+ extension
* Call "then" using given node-style callback function
*
* @method Bulk~Batch#thenCall
*/
Batch.prototype.thenCall = function(callback) {
if (_.isFunction(callback)) {
this.then(function(res) {
process.nextTick(function() {
callback(null, res);
});
}, function(err) {
process.nextTick(function() {
callback(err);
});
});
}
return this;
};
/**
* @typedef {Object} Bulk~BatchInfo
* @prop {String} id - Batch ID
* @prop {String} jobId - Job ID
* @prop {String} state - Batch state
* @prop {String} stateMessage - Batch state message
*/
/**
* Check the latest batch status in server
*
* @method Bulk~Batch#check
* @param {Callback.<Bulk~BatchInfo>} [callback] - Callback function
* @returns {Promise.<Bulk~BatchInfo>}
*/
Batch.prototype.check = function(callback) {
var self = this;
var bulk = this._bulk;
var logger = bulk._logger;
var jobId = this.job.id;
var batchId = this.id;
if (!jobId || !batchId) {
throw new Error("Batch not started.");
}
return bulk._request({
method : 'GET',
path : "/job/" + jobId + "/batch/" + batchId,
responseType: "application/xml"
}).then(function(res) {
logger.debug(res.batchInfo);
return res.batchInfo;
}).thenCall(callback);
};
/**
* Polling the batch result and retrieve
*
* @method Bulk~Batch#poll
* @param {Number} interval - Polling interval in milliseconds
* @param {Number} timeout - Polling timeout in milliseconds
*/
Batch.prototype.poll = function(interval, timeout) {
var self = this;
var jobId = this.job.id;
var batchId = this.id;
if (!jobId || !batchId) {
throw new Error("Batch not started.");
}
var startTime = new Date().getTime();
var poll = function() {
var now = new Date().getTime();
if (startTime + timeout < now) {
var err = new Error("Polling time out. Job Id = " + jobId + " , batch Id = " + batchId);
err.name = 'PollingTimeout';
self.emit('error', err);
return;
}
self.check(function(err, res) {
if (err) {
self.emit('error', err);
} else {
if (res.state === "Failed") {
if (parseInt(res.numberRecordsProcessed, 10) > 0) {
self.retrieve();
} else {
self.emit('error', new Error(res.stateMessage));
}
} else if (res.state === "Completed") {
self.retrieve();
} else {
self.emit('progress', res);
setTimeout(poll, interval);
}
}
});
};
setTimeout(poll, interval);
};
/**
* @typedef {Object} Bulk~BatchResultInfo
* @prop {String} id - Batch result ID
* @prop {String} batchId - Batch ID which includes this batch result.
* @prop {String} jobId - Job ID which includes this batch result.
*/
/**
* Retrieve batch result
*
* @method Bulk~Batch#retrieve
* @param {Callback.<Array.<RecordResult>|Array.<Bulk~BatchResultInfo>>} [callback] - Callback function
* @returns {Promise.<Array.<RecordResult>|Array.<Bulk~BatchResultInfo>>}
*/
Batch.prototype.retrieve = function(callback) {
var self = this;
var bulk = this._bulk;
var jobId = this.job.id;
var job = this.job;
var batchId = this.id;
if (!jobId || !batchId) {
throw new Error("Batch not started.");
}
return job.info().then(function(jobInfo) {
return bulk._request({
method : 'GET',
path : "/job/" + jobId + "/batch/" + batchId + "/result"
});
}).then(function(res) {
var results;
if (job.operation === 'query') {
var conn = bulk._conn;
var resultIds = res['result-list'].result;
results = res['result-list'].result;
results = _.map(_.isArray(results) ? results : [ results ], function(id) {
return {
id: id,
batchId: batchId,
jobId: jobId
};
});
} else {
results = _.map(res, function(ret) {
return {
id: ret.Id || null,
success: ret.Success === "true",
errors: ret.Error ? [ ret.Error ] : []
};
});
}
self.emit('response', results);
return results;
}).fail(function(err) {
self.emit('error', err);
throw err;
}).thenCall(callback);
};
/**
* Fetch query result as a record stream
* @param {String} resultId - Result id
* @returns {RecordStream} - Record stream, convertible to CSV data stream
*/
Batch.prototype.result = function(resultId) {
var jobId = this.job.id;
var batchId = this.id;
if (!jobId || !batchId) {
throw new Error("Batch not started.");
}
var resultStream = new RecordStream.Parsable();
var resultDataStream = resultStream.stream('csv');
var reqStream = this._bulk._request({
method : 'GET',
path : "/job/" + jobId + "/batch/" + batchId + "/result/" + resultId
}).stream().pipe(resultDataStream);
return resultStream;
};
/*--------------------------------------------*/
/**
* @private
*/
var BulkApi = function() {
BulkApi.super_.apply(this, arguments);
};
inherits(BulkApi, HttpApi);
BulkApi.prototype.beforeSend = function(request) {
request.headers = request.headers || {};
request.headers["X-SFDC-SESSION"] = this._conn.accessToken;
};
BulkApi.prototype.isSessionExpired = function(response) {
return response.statusCode === 400 &&
/<exceptionCode>InvalidSessionId<\/exceptionCode>/.test(response.body);
};
BulkApi.prototype.hasErrorInResponseBody = function(body) {
return !!body.error;
};
BulkApi.prototype.parseError = function(body) {
return {
errorCode: body.error.exceptionCode,
message: body.error.exceptionMessage
};
};
/*--------------------------------------------*/
/**
* Class for Bulk API
*
* @class
* @param {Connection} conn - Connection object
*/
var Bulk = function(conn) {
this._conn = conn;
this._logger = conn._logger;
};
/**
* Polling interval in milliseconds
* @type {Number}
*/
Bulk.prototype.pollInterval = 1000;
/**
* Polling timeout in milliseconds
* @type {Number}
*/
Bulk.prototype.pollTimeout = 10000;
/** @private **/
Bulk.prototype._request = function(request, callback) {
var conn = this._conn;
request = _.clone(request);
var baseUrl = [ conn.instanceUrl, "services/async", conn.version ].join('/');
request.url = baseUrl + request.path;
var options = { responseType: request.responseType };
delete request.path;
delete request.responseType;
return new BulkApi(this._conn, options).request(request).thenCall(callback);
};
/**
* Create and start bulkload job and batch
*
* @param {String} type - SObject type
* @param {String} operation - Bulk load operation ('insert', 'update', 'upsert', 'delete', or 'hardDelete')
* @param {Object} [options] - Options for bulk loading operation
* @param {String} [options.extIdField] - External ID field name (used when upsert operation).
* @param {String} [options.concurrencyMode] - 'Serial' or 'Parallel'. Defaults to Parallel.
* @param {Array.<Record>|stream.Stream|String} [input] - Input source for bulkload. Accepts array of records, CSV string, and CSV data input stream in insert/update/upsert/delete/hardDelete operation, SOQL string in query operation.
* @param {Callback.<Array.<RecordResult>|Array.<Bulk~BatchResultInfo>>} [callback] - Callback function
* @returns {Bulk~Batch}
*/
Bulk.prototype.load = function(type, operation, options, input, callback) {
var self = this;
if (!type || !operation) {
throw new Error("Insufficient arguments. At least, 'type' and 'operation' are required.");
}
if (!_.isObject(options) || options.constructor !== Object) { // when options is not plain hash object, it is omitted
callback = input;
input = options;
options = null;
}
var job = this.createJob(type, operation, options);
job.once('error', function (error) {
if (batch) {
batch.emit('error', error); // pass job error to batch
}
});
var batch = job.createBatch();
var cleanup = function() {
batch = null;
job.close();
};
var cleanupOnError = function(err) {
if (err.name !== 'PollingTimeout') {
cleanup();
}
};
batch.on('response', cleanup);
batch.on('error', cleanupOnError);
batch.on('queue', function() { batch.poll(self.pollInterval, self.pollTimeout); });
return batch.execute(input, callback);
};
/**
* Execute bulk query and get record stream
*
* @param {String} soql - SOQL to execute in bulk job
* @returns {RecordStream.Parsable} - Record stream, convertible to CSV data stream
*/
Bulk.prototype.query = function(soql) {
var m = soql.replace(/\([\s\S]+\)/g, '').match(/FROM\s+(\w+)/i);
if (!m) {
throw new Error("No sobject type found in query, maybe caused by invalid SOQL.");
}
var type = m[1];
var self = this;
var recordStream = new RecordStream.Parsable();
var dataStream = recordStream.stream('csv');
this.load(type, "query", soql).then(function(results) {
// Ideally, it should merge result files into one stream.
// Currently only first batch result is the target (mostly enough).
var r = results[0];
var result = self.job(r.jobId).batch(r.batchId).result(r.id);
result.stream().pipe(dataStream);
}).fail(function(err) {
recordStream.emit('error', err);
});
return recordStream;
};
/**
* Create a new job instance
*
* @param {String} type - SObject type
* @param {String} operation - Bulk load operation ('insert', 'update', 'upsert', 'delete', 'hardDelete', or 'query')
* @param {Object} [options] - Options for bulk loading operation
* @returns {Bulk~Job}
*/
Bulk.prototype.createJob = function(type, operation, options) {
return new Job(this, type, operation, options);
};
/**
* Get a job instance specified by given job ID
*
* @param {String} jobId - Job ID
* @returns {Bulk~Job}
*/
Bulk.prototype.job = function(jobId) {
return new Job(this, null, null, null, jobId);
};
/*--------------------------------------------*/
/*
* Register hook in connection instantiation for dynamically adding this API module features
*/
jsforce.on('connection:new', function(conn) {
conn.bulk = new Bulk(conn);
});
module.exports = Bulk;