Source: api/bulk.js

  1. /*global process*/
  2. /**
  3. * @file Manages Salesforce Bulk API related operations
  4. * @author Shinichi Tomita <shinichi.tomita@gmail.com>
  5. */
  6. 'use strict';
  7. var inherits = require('inherits'),
  8. stream = require('readable-stream'),
  9. Duplex = stream.Duplex,
  10. events = require('events'),
  11. _ = require('lodash/core'),
  12. jsforce = require('../core'),
  13. RecordStream = require('../record-stream'),
  14. CSV = require('../csv'),
  15. Promise = require('../promise'),
  16. HttpApi = require('../http-api');
  17. /*--------------------------------------------*/
  18. /**
  19. * Class for Bulk API Job
  20. *
  21. * @protected
  22. * @class Bulk~Job
  23. * @extends events.EventEmitter
  24. *
  25. * @param {Bulk} bulk - Bulk API object
  26. * @param {String} [type] - SObject type
  27. * @param {String} [operation] - Bulk load operation ('insert', 'update', 'upsert', 'delete', or 'hardDelete')
  28. * @param {Object} [options] - Options for bulk loading operation
  29. * @param {String} [options.extIdField] - External ID field name (used when upsert operation).
  30. * @param {String} [options.concurrencyMode] - 'Serial' or 'Parallel'. Defaults to Parallel.
  31. * @param {String} [jobId] - Job ID (if already available)
  32. */
  33. var Job = function(bulk, type, operation, options, jobId) {
  34. this._bulk = bulk;
  35. this.type = type;
  36. this.operation = operation;
  37. this.options = options || {};
  38. this.id = jobId;
  39. this.state = this.id ? 'Open' : 'Unknown';
  40. this._batches = {};
  41. };
  42. inherits(Job, events.EventEmitter);
  43. /**
  44. * @typedef {Object} Bulk~JobInfo
  45. * @prop {String} id - Job ID
  46. * @prop {String} object - Object type name
  47. * @prop {String} operation - Operation type of the job
  48. * @prop {String} state - Job status
  49. */
  50. /**
  51. * Return latest jobInfo from cache
  52. *
  53. * @method Bulk~Job#open
  54. * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
  55. * @returns {Promise.<Bulk~JobInfo>}
  56. */
  57. Job.prototype.info = function(callback) {
  58. var self = this;
  59. // if cache is not available, check the latest
  60. if (!this._jobInfo) {
  61. this._jobInfo = this.check();
  62. }
  63. return this._jobInfo.thenCall(callback);
  64. };
  65. /**
  66. * Open new job and get jobinfo
  67. *
  68. * @method Bulk~Job#open
  69. * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
  70. * @returns {Promise.<Bulk~JobInfo>}
  71. */
  72. Job.prototype.open = function(callback) {
  73. var self = this;
  74. var bulk = this._bulk;
  75. var logger = bulk._logger;
  76. // if not requested opening job
  77. if (!this._jobInfo) {
  78. var operation = this.operation.toLowerCase();
  79. if (operation === 'harddelete') { operation = 'hardDelete'; }
  80. var body = [
  81. '<?xml version="1.0" encoding="UTF-8"?>',
  82. '<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">',
  83. '<operation>' + operation + '</operation>',
  84. '<object>' + this.type + '</object>',
  85. (this.options.extIdField ?
  86. '<externalIdFieldName>'+this.options.extIdField+'</externalIdFieldName>' :
  87. ''),
  88. (this.options.concurrencyMode ?
  89. '<concurrencyMode>'+this.options.concurrencyMode+'</concurrencyMode>' :
  90. ''),
  91. (this.options.assignmentRuleId ?
  92. '<assignmentRuleId>' + this.options.assignmentRuleId + '</assignmentRuleId>' :
  93. ''),
  94. '<contentType>CSV</contentType>',
  95. '</jobInfo>'
  96. ].join('');
  97. this._jobInfo = bulk._request({
  98. method : 'POST',
  99. path : "/job",
  100. body : body,
  101. headers : {
  102. "Content-Type" : "application/xml; charset=utf-8"
  103. },
  104. responseType: "application/xml"
  105. }).then(function(res) {
  106. self.emit("open", res.jobInfo);
  107. self.id = res.jobInfo.id;
  108. self.state = res.jobInfo.state;
  109. return res.jobInfo;
  110. }, function(err) {
  111. self.emit("error", err);
  112. throw err;
  113. });
  114. }
  115. return this._jobInfo.thenCall(callback);
  116. };
  117. /**
  118. * Create a new batch instance in the job
  119. *
  120. * @method Bulk~Job#createBatch
  121. * @returns {Bulk~Batch}
  122. */
  123. Job.prototype.createBatch = function() {
  124. var batch = new Batch(this);
  125. var self = this;
  126. batch.on('queue', function() {
  127. self._batches[batch.id] = batch;
  128. });
  129. return batch;
  130. };
  131. /**
  132. * Get a batch instance specified by given batch ID
  133. *
  134. * @method Bulk~Job#batch
  135. * @param {String} batchId - Batch ID
  136. * @returns {Bulk~Batch}
  137. */
  138. Job.prototype.batch = function(batchId) {
  139. var batch = this._batches[batchId];
  140. if (!batch) {
  141. batch = new Batch(this, batchId);
  142. this._batches[batchId] = batch;
  143. }
  144. return batch;
  145. };
  146. /**
  147. * Check the latest job status from server
  148. *
  149. * @method Bulk~Job#check
  150. * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
  151. * @returns {Promise.<Bulk~JobInfo>}
  152. */
  153. Job.prototype.check = function(callback) {
  154. var self = this;
  155. var bulk = this._bulk;
  156. var logger = bulk._logger;
  157. this._jobInfo = this._waitAssign().then(function() {
  158. return bulk._request({
  159. method : 'GET',
  160. path : "/job/" + self.id,
  161. responseType: "application/xml"
  162. });
  163. }).then(function(res) {
  164. logger.debug(res.jobInfo);
  165. self.id = res.jobInfo.id;
  166. self.type = res.jobInfo.object;
  167. self.operation = res.jobInfo.operation;
  168. self.state = res.jobInfo.state;
  169. return res.jobInfo;
  170. });
  171. return this._jobInfo.thenCall(callback);
  172. };
  173. /**
  174. * Wait till the job is assigned to server
  175. *
  176. * @method Bulk~Job#info
  177. * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
  178. * @returns {Promise.<Bulk~JobInfo>}
  179. */
  180. Job.prototype._waitAssign = function(callback) {
  181. return (this.id ? Promise.resolve({ id: this.id }) : this.open()).thenCall(callback);
  182. };
  183. /**
  184. * List all registered batch info in job
  185. *
  186. * @method Bulk~Job#list
  187. * @param {Callback.<Array.<Bulk~BatchInfo>>} [callback] - Callback function
  188. * @returns {Promise.<Array.<Bulk~BatchInfo>>}
  189. */
  190. Job.prototype.list = function(callback) {
  191. var self = this;
  192. var bulk = this._bulk;
  193. var logger = bulk._logger;
  194. return this._waitAssign().then(function() {
  195. return bulk._request({
  196. method : 'GET',
  197. path : "/job/" + self.id + "/batch",
  198. responseType: "application/xml"
  199. });
  200. }).then(function(res) {
  201. logger.debug(res.batchInfoList.batchInfo);
  202. var batchInfoList = res.batchInfoList;
  203. batchInfoList = _.isArray(batchInfoList.batchInfo) ? batchInfoList.batchInfo : [ batchInfoList.batchInfo ];
  204. return batchInfoList;
  205. }).thenCall(callback);
  206. };
  207. /**
  208. * Close opened job
  209. *
  210. * @method Bulk~Job#close
  211. * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
  212. * @returns {Promise.<Bulk~JobInfo>}
  213. */
  214. Job.prototype.close = function() {
  215. var self = this;
  216. return this._changeState("Closed").then(function(jobInfo) {
  217. self.id = null;
  218. self.emit("close", jobInfo);
  219. return jobInfo;
  220. }, function(err) {
  221. self.emit("error", err);
  222. throw err;
  223. });
  224. };
  225. /**
  226. * Set the status to abort
  227. *
  228. * @method Bulk~Job#abort
  229. * @param {Callback.<Bulk~JobInfo>} [callback] - Callback function
  230. * @returns {Promise.<Bulk~JobInfo>}
  231. */
  232. Job.prototype.abort = function() {
  233. var self = this;
  234. return this._changeState("Aborted").then(function(jobInfo) {
  235. self.id = null;
  236. self.emit("abort", jobInfo);
  237. return jobInfo;
  238. }, function(err) {
  239. self.emit("error", err);
  240. throw err;
  241. });
  242. };
  243. /**
  244. * @private
  245. */
  246. Job.prototype._changeState = function(state, callback) {
  247. var self = this;
  248. var bulk = this._bulk;
  249. var logger = bulk._logger;
  250. this._jobInfo = this._waitAssign().then(function() {
  251. var body = [
  252. '<?xml version="1.0" encoding="UTF-8"?>',
  253. '<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">',
  254. '<state>' + state + '</state>',
  255. '</jobInfo>'
  256. ].join('');
  257. return bulk._request({
  258. method : 'POST',
  259. path : "/job/" + self.id,
  260. body : body,
  261. headers : {
  262. "Content-Type" : "application/xml; charset=utf-8"
  263. },
  264. responseType: "application/xml"
  265. });
  266. }).then(function(res) {
  267. logger.debug(res.jobInfo);
  268. self.state = res.jobInfo.state;
  269. return res.jobInfo;
  270. });
  271. return this._jobInfo.thenCall(callback);
  272. };
  273. /*--------------------------------------------*/
  274. /**
  275. * Batch (extends RecordStream)
  276. *
  277. * @protected
  278. * @class Bulk~Batch
  279. * @extends {stream.Writable}
  280. * @implements {Promise.<Array.<RecordResult>>}
  281. * @param {Bulk~Job} job - Bulk job object
  282. * @param {String} [batchId] - Batch ID (if already available)
  283. */
  284. var Batch = function(job, batchId) {
  285. Batch.super_.call(this, { objectMode: true });
  286. this.job = job;
  287. this.id = batchId;
  288. this._bulk = job._bulk;
  289. this._deferred = Promise.defer();
  290. this._setupDataStreams();
  291. };
  292. inherits(Batch, stream.Writable);
  293. /**
  294. * @private
  295. */
  296. Batch.prototype._setupDataStreams = function() {
  297. var batch = this;
  298. var converterOptions = { nullValue : '#N/A' };
  299. this._uploadStream = new RecordStream.Serializable();
  300. this._uploadDataStream = this._uploadStream.stream('csv', converterOptions);
  301. this._downloadStream = new RecordStream.Parsable();
  302. this._downloadDataStream = this._downloadStream.stream('csv', converterOptions);
  303. this.on('finish', function() {
  304. batch._uploadStream.end();
  305. });
  306. this._uploadDataStream.once('readable', function() {
  307. batch.job.open().then(function() {
  308. // pipe upload data to batch API request stream
  309. batch._uploadDataStream.pipe(batch._createRequestStream());
  310. });
  311. });
  312. // duplex data stream, opened access to API programmers by Batch#stream()
  313. var dataStream = this._dataStream = new Duplex();
  314. dataStream._write = function(data, enc, cb) {
  315. batch._uploadDataStream.write(data, enc, cb);
  316. };
  317. dataStream.on('finish', function() {
  318. batch._uploadDataStream.end();
  319. });
  320. this._downloadDataStream.on('readable', function() {
  321. dataStream.read(0);
  322. });
  323. this._downloadDataStream.on('end', function() {
  324. dataStream.push(null);
  325. });
  326. dataStream._read = function(size) {
  327. var chunk;
  328. while ((chunk = batch._downloadDataStream.read()) !== null) {
  329. dataStream.push(chunk);
  330. }
  331. };
  332. };
  333. /**
  334. * Connect batch API and create stream instance of request/response
  335. *
  336. * @private
  337. * @returns {stream.Duplex}
  338. */
  339. Batch.prototype._createRequestStream = function() {
  340. var batch = this;
  341. var bulk = batch._bulk;
  342. var logger = bulk._logger;
  343. return bulk._request({
  344. method : 'POST',
  345. path : "/job/" + batch.job.id + "/batch",
  346. headers: {
  347. "Content-Type": "text/csv"
  348. },
  349. responseType: "application/xml"
  350. }, function(err, res) {
  351. if (err) {
  352. batch.emit('error', err);
  353. } else {
  354. logger.debug(res.batchInfo);
  355. batch.id = res.batchInfo.id;
  356. batch.emit('queue', res.batchInfo);
  357. }
  358. }).stream();
  359. };
  360. /**
  361. * Implementation of Writable
  362. *
  363. * @override
  364. * @private
  365. */
  366. Batch.prototype._write = function(record, enc, cb) {
  367. record = _.clone(record);
  368. if (this.job.operation === "insert") {
  369. delete record.Id;
  370. } else if (this.job.operation === "delete") {
  371. record = { Id: record.Id };
  372. }
  373. delete record.type;
  374. delete record.attributes;
  375. this._uploadStream.write(record, enc, cb);
  376. };
  377. /**
  378. * Returns duplex stream which accepts CSV data input and batch result output
  379. *
  380. * @returns {stream.Duplex}
  381. */
  382. Batch.prototype.stream = function() {
  383. return this._dataStream;
  384. };
  385. /**
  386. * Execute batch operation
  387. *
  388. * @method Bulk~Batch#execute
  389. * @param {Array.<Record>|stream.Stream|String} [input] - Input source for batch operation. Accepts array of records, CSV string, and CSV data input stream in insert/update/upsert/delete/hardDelete operation, SOQL string in query operation.
  390. * @param {Callback.<Array.<RecordResult>|Array.<BatchResultInfo>>} [callback] - Callback function
  391. * @returns {Bulk~Batch}
  392. */
  393. Batch.prototype.run =
  394. Batch.prototype.exec =
  395. Batch.prototype.execute = function(input, callback) {
  396. var self = this;
  397. if (typeof input === 'function') { // if input argument is omitted
  398. callback = input;
  399. input = null;
  400. }
  401. // if batch is already executed
  402. if (this._result) {
  403. throw new Error("Batch already executed.");
  404. }
  405. var rdeferred = Promise.defer();
  406. this._result = rdeferred.promise;
  407. this._result.then(function(res) {
  408. self._deferred.resolve(res);
  409. }, function(err) {
  410. self._deferred.reject(err);
  411. });
  412. this.once('response', function(res) {
  413. rdeferred.resolve(res);
  414. });
  415. this.once('error', function(err) {
  416. rdeferred.reject(err);
  417. });
  418. if (_.isObject(input) && _.isFunction(input.pipe)) { // if input has stream.Readable interface
  419. input.pipe(this._dataStream);
  420. } else {
  421. var data;
  422. if (_.isArray(input)) {
  423. _.forEach(input, function(record) { self.write(record); });
  424. self.end();
  425. } else if (_.isString(input)){
  426. data = input;
  427. this._dataStream.write(data, 'utf8');
  428. this._dataStream.end();
  429. }
  430. }
  431. // return Batch instance for chaining
  432. return this.thenCall(callback);
  433. };
  434. /**
  435. * Promise/A+ interface
  436. * http://promises-aplus.github.io/promises-spec/
  437. *
  438. * Delegate to deferred promise, return promise instance for batch result
  439. *
  440. * @method Bulk~Batch#then
  441. */
  442. Batch.prototype.then = function(onResolved, onReject, onProgress) {
  443. return this._deferred.promise.then(onResolved, onReject, onProgress);
  444. };
  445. /**
  446. * Promise/A+ extension
  447. * Call "then" using given node-style callback function
  448. *
  449. * @method Bulk~Batch#thenCall
  450. */
  451. Batch.prototype.thenCall = function(callback) {
  452. if (_.isFunction(callback)) {
  453. this.then(function(res) {
  454. process.nextTick(function() {
  455. callback(null, res);
  456. });
  457. }, function(err) {
  458. process.nextTick(function() {
  459. callback(err);
  460. });
  461. });
  462. }
  463. return this;
  464. };
  465. /**
  466. * @typedef {Object} Bulk~BatchInfo
  467. * @prop {String} id - Batch ID
  468. * @prop {String} jobId - Job ID
  469. * @prop {String} state - Batch state
  470. * @prop {String} stateMessage - Batch state message
  471. */
  472. /**
  473. * Check the latest batch status in server
  474. *
  475. * @method Bulk~Batch#check
  476. * @param {Callback.<Bulk~BatchInfo>} [callback] - Callback function
  477. * @returns {Promise.<Bulk~BatchInfo>}
  478. */
  479. Batch.prototype.check = function(callback) {
  480. var self = this;
  481. var bulk = this._bulk;
  482. var logger = bulk._logger;
  483. var jobId = this.job.id;
  484. var batchId = this.id;
  485. if (!jobId || !batchId) {
  486. throw new Error("Batch not started.");
  487. }
  488. return bulk._request({
  489. method : 'GET',
  490. path : "/job/" + jobId + "/batch/" + batchId,
  491. responseType: "application/xml"
  492. }).then(function(res) {
  493. logger.debug(res.batchInfo);
  494. return res.batchInfo;
  495. }).thenCall(callback);
  496. };
  497. /**
  498. * Polling the batch result and retrieve
  499. *
  500. * @method Bulk~Batch#poll
  501. * @param {Number} interval - Polling interval in milliseconds
  502. * @param {Number} timeout - Polling timeout in milliseconds
  503. */
  504. Batch.prototype.poll = function(interval, timeout) {
  505. var self = this;
  506. var jobId = this.job.id;
  507. var batchId = this.id;
  508. if (!jobId || !batchId) {
  509. throw new Error("Batch not started.");
  510. }
  511. var startTime = new Date().getTime();
  512. var poll = function() {
  513. var now = new Date().getTime();
  514. if (startTime + timeout < now) {
  515. var err = new Error("Polling time out. Job Id = " + jobId + " , batch Id = " + batchId);
  516. err.name = 'PollingTimeout';
  517. self.emit('error', err);
  518. return;
  519. }
  520. self.check(function(err, res) {
  521. if (err) {
  522. self.emit('error', err);
  523. } else {
  524. if (res.state === "Failed") {
  525. if (parseInt(res.numberRecordsProcessed, 10) > 0) {
  526. self.retrieve();
  527. } else {
  528. self.emit('error', new Error(res.stateMessage));
  529. }
  530. } else if (res.state === "Completed") {
  531. self.retrieve();
  532. } else {
  533. self.emit('progress', res);
  534. setTimeout(poll, interval);
  535. }
  536. }
  537. });
  538. };
  539. setTimeout(poll, interval);
  540. };
  541. /**
  542. * @typedef {Object} Bulk~BatchResultInfo
  543. * @prop {String} id - Batch result ID
  544. * @prop {String} batchId - Batch ID which includes this batch result.
  545. * @prop {String} jobId - Job ID which includes this batch result.
  546. */
  547. /**
  548. * Retrieve batch result
  549. *
  550. * @method Bulk~Batch#retrieve
  551. * @param {Callback.<Array.<RecordResult>|Array.<Bulk~BatchResultInfo>>} [callback] - Callback function
  552. * @returns {Promise.<Array.<RecordResult>|Array.<Bulk~BatchResultInfo>>}
  553. */
  554. Batch.prototype.retrieve = function(callback) {
  555. var self = this;
  556. var bulk = this._bulk;
  557. var jobId = this.job.id;
  558. var job = this.job;
  559. var batchId = this.id;
  560. if (!jobId || !batchId) {
  561. throw new Error("Batch not started.");
  562. }
  563. return job.info().then(function(jobInfo) {
  564. return bulk._request({
  565. method : 'GET',
  566. path : "/job/" + jobId + "/batch/" + batchId + "/result"
  567. });
  568. }).then(function(res) {
  569. var results;
  570. if (job.operation === 'query') {
  571. var conn = bulk._conn;
  572. var resultIds = res['result-list'].result;
  573. results = res['result-list'].result;
  574. results = _.map(_.isArray(results) ? results : [ results ], function(id) {
  575. return {
  576. id: id,
  577. batchId: batchId,
  578. jobId: jobId
  579. };
  580. });
  581. } else {
  582. results = _.map(res, function(ret) {
  583. return {
  584. id: ret.Id || null,
  585. success: ret.Success === "true",
  586. errors: ret.Error ? [ ret.Error ] : []
  587. };
  588. });
  589. }
  590. self.emit('response', results);
  591. return results;
  592. }).fail(function(err) {
  593. self.emit('error', err);
  594. throw err;
  595. }).thenCall(callback);
  596. };
  597. /**
  598. * Fetch query result as a record stream
  599. * @param {String} resultId - Result id
  600. * @returns {RecordStream} - Record stream, convertible to CSV data stream
  601. */
  602. Batch.prototype.result = function(resultId) {
  603. var jobId = this.job.id;
  604. var batchId = this.id;
  605. if (!jobId || !batchId) {
  606. throw new Error("Batch not started.");
  607. }
  608. var resultStream = new RecordStream.Parsable();
  609. var resultDataStream = resultStream.stream('csv');
  610. var reqStream = this._bulk._request({
  611. method : 'GET',
  612. path : "/job/" + jobId + "/batch/" + batchId + "/result/" + resultId
  613. }).stream().pipe(resultDataStream);
  614. return resultStream;
  615. };
  616. /*--------------------------------------------*/
  617. /**
  618. * @private
  619. */
  620. var BulkApi = function() {
  621. BulkApi.super_.apply(this, arguments);
  622. };
  623. inherits(BulkApi, HttpApi);
  624. BulkApi.prototype.beforeSend = function(request) {
  625. request.headers = request.headers || {};
  626. request.headers["X-SFDC-SESSION"] = this._conn.accessToken;
  627. };
  628. BulkApi.prototype.isSessionExpired = function(response) {
  629. return response.statusCode === 400 &&
  630. /<exceptionCode>InvalidSessionId<\/exceptionCode>/.test(response.body);
  631. };
  632. BulkApi.prototype.hasErrorInResponseBody = function(body) {
  633. return !!body.error;
  634. };
  635. BulkApi.prototype.parseError = function(body) {
  636. return {
  637. errorCode: body.error.exceptionCode,
  638. message: body.error.exceptionMessage
  639. };
  640. };
  641. /*--------------------------------------------*/
  642. /**
  643. * Class for Bulk API
  644. *
  645. * @class
  646. * @param {Connection} conn - Connection object
  647. */
  648. var Bulk = function(conn) {
  649. this._conn = conn;
  650. this._logger = conn._logger;
  651. };
  652. /**
  653. * Polling interval in milliseconds
  654. * @type {Number}
  655. */
  656. Bulk.prototype.pollInterval = 1000;
  657. /**
  658. * Polling timeout in milliseconds
  659. * @type {Number}
  660. */
  661. Bulk.prototype.pollTimeout = 10000;
  662. /** @private **/
  663. Bulk.prototype._request = function(request, callback) {
  664. var conn = this._conn;
  665. request = _.clone(request);
  666. var baseUrl = [ conn.instanceUrl, "services/async", conn.version ].join('/');
  667. request.url = baseUrl + request.path;
  668. var options = { responseType: request.responseType };
  669. delete request.path;
  670. delete request.responseType;
  671. return new BulkApi(this._conn, options).request(request).thenCall(callback);
  672. };
  673. /**
  674. * Create and start bulkload job and batch
  675. *
  676. * @param {String} type - SObject type
  677. * @param {String} operation - Bulk load operation ('insert', 'update', 'upsert', 'delete', or 'hardDelete')
  678. * @param {Object} [options] - Options for bulk loading operation
  679. * @param {String} [options.extIdField] - External ID field name (used when upsert operation).
  680. * @param {String} [options.concurrencyMode] - 'Serial' or 'Parallel'. Defaults to Parallel.
  681. * @param {Array.<Record>|stream.Stream|String} [input] - Input source for bulkload. Accepts array of records, CSV string, and CSV data input stream in insert/update/upsert/delete/hardDelete operation, SOQL string in query operation.
  682. * @param {Callback.<Array.<RecordResult>|Array.<Bulk~BatchResultInfo>>} [callback] - Callback function
  683. * @returns {Bulk~Batch}
  684. */
  685. Bulk.prototype.load = function(type, operation, options, input, callback) {
  686. var self = this;
  687. if (!type || !operation) {
  688. throw new Error("Insufficient arguments. At least, 'type' and 'operation' are required.");
  689. }
  690. if (!_.isObject(options) || options.constructor !== Object) { // when options is not plain hash object, it is omitted
  691. callback = input;
  692. input = options;
  693. options = null;
  694. }
  695. var job = this.createJob(type, operation, options);
  696. job.once('error', function (error) {
  697. if (batch) {
  698. batch.emit('error', error); // pass job error to batch
  699. }
  700. });
  701. var batch = job.createBatch();
  702. var cleanup = function() {
  703. batch = null;
  704. job.close();
  705. };
  706. var cleanupOnError = function(err) {
  707. if (err.name !== 'PollingTimeout') {
  708. cleanup();
  709. }
  710. };
  711. batch.on('response', cleanup);
  712. batch.on('error', cleanupOnError);
  713. batch.on('queue', function() { batch.poll(self.pollInterval, self.pollTimeout); });
  714. return batch.execute(input, callback);
  715. };
  716. /**
  717. * Execute bulk query and get record stream
  718. *
  719. * @param {String} soql - SOQL to execute in bulk job
  720. * @returns {RecordStream.Parsable} - Record stream, convertible to CSV data stream
  721. */
  722. Bulk.prototype.query = function(soql) {
  723. var m = soql.replace(/\([\s\S]+\)/g, '').match(/FROM\s+(\w+)/i);
  724. if (!m) {
  725. throw new Error("No sobject type found in query, maybe caused by invalid SOQL.");
  726. }
  727. var type = m[1];
  728. var self = this;
  729. var recordStream = new RecordStream.Parsable();
  730. var dataStream = recordStream.stream('csv');
  731. this.load(type, "query", soql).then(function(results) {
  732. // Ideally, it should merge result files into one stream.
  733. // Currently only first batch result is the target (mostly enough).
  734. var r = results[0];
  735. var result = self.job(r.jobId).batch(r.batchId).result(r.id);
  736. result.stream().pipe(dataStream);
  737. }).fail(function(err) {
  738. recordStream.emit('error', err);
  739. });
  740. return recordStream;
  741. };
  742. /**
  743. * Create a new job instance
  744. *
  745. * @param {String} type - SObject type
  746. * @param {String} operation - Bulk load operation ('insert', 'update', 'upsert', 'delete', 'hardDelete', or 'query')
  747. * @param {Object} [options] - Options for bulk loading operation
  748. * @returns {Bulk~Job}
  749. */
  750. Bulk.prototype.createJob = function(type, operation, options) {
  751. return new Job(this, type, operation, options);
  752. };
  753. /**
  754. * Get a job instance specified by given job ID
  755. *
  756. * @param {String} jobId - Job ID
  757. * @returns {Bulk~Job}
  758. */
  759. Bulk.prototype.job = function(jobId) {
  760. return new Job(this, null, null, null, jobId);
  761. };
  762. /*--------------------------------------------*/
  763. /*
  764. * Register hook in connection instantiation for dynamically adding this API module features
  765. */
  766. jsforce.on('connection:new', function(conn) {
  767. conn.bulk = new Bulk(conn);
  768. });
  769. module.exports = Bulk;