Source: record-stream.js

  1. /**
  2. * @file Represents stream that handles Salesforce record as stream data
  3. * @author Shinichi Tomita <shinichi.tomita@gmail.com>
  4. */
  5. 'use strict';
  6. var events = require('events'),
  7. stream = require('readable-stream'),
  8. Duplex = stream.Duplex,
  9. Transform = stream.Transform,
  10. PassThrough = stream.PassThrough,
  11. through2 = require('through2'),
  12. inherits = require('inherits'),
  13. _ = require('lodash/core'),
  14. CSV = require('./csv');
  15. /**
  16. * Class for Record Stream
  17. *
  18. * @class
  19. * @constructor
  20. * @extends stream.Transform
  21. */
  22. var RecordStream = module.exports = function() {
  23. RecordStream.super_.call(this, { objectMode: true });
  24. };
  25. inherits(RecordStream, Transform);
  26. /*
  27. * @override
  28. */
  29. RecordStream.prototype._transform = function(record, enc, callback) {
  30. this.emit('record', record);
  31. this.push(record);
  32. callback();
  33. };
  34. /**
  35. * Get record stream of queried records applying the given mapping function
  36. *
  37. * @param {RecordMapFunction} fn - Record mapping function
  38. * @returns {RecordStream}
  39. */
  40. RecordStream.prototype.map = function(fn) {
  41. return this.pipe(RecordStream.map(fn));
  42. };
  43. /**
  44. * Get record stream of queried records, applying the given filter function
  45. *
  46. * @param {RecordFilterFunction} fn - Record filtering function
  47. * @returns {RecordStream}
  48. */
  49. RecordStream.prototype.filter = function(fn) {
  50. return this.pipe(RecordStream.filter(fn));
  51. };
  52. /**
  53. * @class RecordStream.Serializable
  54. * @extends {RecordStream}
  55. */
  56. var Serializable = RecordStream.Serializable = function() {
  57. Serializable.super_.call(this);
  58. this._dataStream = null;
  59. };
  60. inherits(Serializable, RecordStream);
  61. /**
  62. * Create readable data stream which emits serialized record data
  63. *
  64. * @param {String} [type] - Type of outgoing data format. Currently 'csv' is default value and the only supported.
  65. * @param {Object} [options] - Options passed to converter
  66. * @returns {stream.Readable}
  67. */
  68. Serializable.prototype.stream = function(type, options) {
  69. type = type || 'csv';
  70. var converter = DataStreamConverters[type];
  71. if (!converter) {
  72. throw new Error('Converting [' + type + '] data stream is not supported.');
  73. }
  74. if (!this._dataStream) {
  75. this._dataStream = new PassThrough();
  76. this.pipe(converter.serialize(options))
  77. .pipe(this._dataStream);
  78. }
  79. return this._dataStream;
  80. };
  81. /**
  82. * @class RecordStream.Parsable
  83. * @extends {RecordStream}
  84. */
  85. var Parsable = RecordStream.Parsable = function() {
  86. Parsable.super_.call(this);
  87. this._dataStream = null;
  88. };
  89. inherits(Parsable, RecordStream);
  90. /**
  91. * Create writable data stream which accepts serialized record data
  92. *
  93. * @param {String} [type] - Type of outgoing data format. Currently 'csv' is default value and the only supported.
  94. * @param {Object} [options] - Options passed to converter
  95. * @returns {stream.Readable}
  96. */
  97. Parsable.prototype.stream = function(type, options) {
  98. type = type || 'csv';
  99. var converter = DataStreamConverters[type];
  100. if (!converter) {
  101. throw new Error('Converting [' + type + '] data stream is not supported.');
  102. }
  103. if (!this._dataStream) {
  104. this._dataStream = new PassThrough();
  105. this._parserStream = converter.parse(options);
  106. this._parserStream.pipe(this).pipe(new PassThrough({ objectMode: true, highWaterMark: ( 500 * 1000 ) }));
  107. }
  108. return this._dataStream;
  109. };
  110. /* @override */
  111. Parsable.prototype.on = function(ev, fn) {
  112. if (ev === 'readable' || ev === 'record') {
  113. this._dataStream.pipe(this._parserStream);
  114. }
  115. return Parsable.super_.prototype.on.call(this, ev, fn);
  116. };
  117. /* @override */
  118. Parsable.prototype.addListener = Parsable.prototype.on;
  119. /* --------------------------------------------------- */
  120. /**
  121. * @callback RecordMapFunction
  122. * @param {Record} record - Source record to map
  123. * @returns {Record}
  124. */
  125. /**
  126. * Create a record stream which maps records and pass them to downstream
  127. *
  128. * @param {RecordMapFunction} fn - Record mapping function
  129. * @returns {RecordStream.Serializable}
  130. */
  131. RecordStream.map = function(fn) {
  132. var mapStream = new RecordStream.Serializable();
  133. mapStream._transform = function(record, enc, callback) {
  134. var rec = fn(record) || record; // if not returned record, use same record
  135. this.push(rec);
  136. callback();
  137. };
  138. return mapStream;
  139. };
  140. /**
  141. * Create mapping stream using given record template
  142. *
  143. * @param {Record} record - Mapping record object. In mapping field value, temlate notation can be used to refer field value in source record, if noeval param is not true.
  144. * @param {Boolean} [noeval] - Disable template evaluation in mapping record.
  145. * @returns {RecordStream.Serializable}
  146. */
  147. RecordStream.recordMapStream = function(record, noeval) {
  148. return RecordStream.map(function(rec) {
  149. var mapped = { Id: rec.Id };
  150. for (var prop in record) {
  151. mapped[prop] = noeval ? record[prop] : evalMapping(record[prop], rec);
  152. }
  153. return mapped;
  154. });
  155. function evalMapping(value, mapping) {
  156. if (_.isString(value)) {
  157. var m = /^\$\{(\w+)\}$/.exec(value);
  158. if (m) { return mapping[m[1]]; }
  159. return value.replace(/\$\{(\w+)\}/g, function($0, prop) {
  160. var v = mapping[prop];
  161. return _.isNull(v) || _.isUndefined(v) ? "" : String(v);
  162. });
  163. } else {
  164. return value;
  165. }
  166. }
  167. };
  168. /**
  169. * @callback RecordFilterFunction
  170. * @param {Record} record - Source record to filter
  171. * @returns {Boolean}
  172. */
  173. /**
  174. * Create a record stream which filters records and pass them to downstream
  175. *
  176. * @param {RecordFilterFunction} fn - Record filtering function
  177. * @returns {RecordStream.Serializable}
  178. */
  179. RecordStream.filter = function(fn) {
  180. var filterStream = new RecordStream.Serializable();
  181. filterStream._transform = function(record, enc, callback) {
  182. if (fn(record)) { this.push(record); }
  183. callback();
  184. };
  185. return filterStream;
  186. };
  187. /** ---------------------------------------------------------------------- **/
  188. /**
  189. * @private
  190. */
  191. var CSVStreamConverter = {
  192. serialize: function(options) {
  193. options = options || {};
  194. var wroteHeaders = false;
  195. var headers = options.headers;
  196. return through2({ writableObjectMode: true },
  197. function transform(record, enc, callback) {
  198. if (!wroteHeaders) {
  199. if (!headers) {
  200. headers = CSV.extractHeaders([ record ]);
  201. }
  202. this.push(CSV.arrayToCSV(headers) + '\n', 'utf8');
  203. wroteHeaders = true;
  204. }
  205. this.push(CSV.recordToCSV(record, headers, { nullValue: options.nullValue }) + '\n', 'utf8');
  206. callback();
  207. }
  208. );
  209. },
  210. parse: function() {
  211. var buf = [];
  212. return through2({ readableObjectMode: true },
  213. function transform(data, enc, callback) {
  214. buf.push(data);
  215. callback();
  216. },
  217. function flush(callback) {
  218. var data = buf.map(function(d) {
  219. return d.toString('utf8');
  220. }).join('');
  221. var records = CSV.parseCSV(data);
  222. var _this = this;
  223. records.forEach(function(record) {
  224. _this.push(record);
  225. });
  226. this.push(null);
  227. callback();
  228. }
  229. );
  230. }
  231. };
  232. /**
  233. * @private
  234. */
  235. var DataStreamConverters = RecordStream.DataStreamConverters = {
  236. csv: CSVStreamConverter
  237. };