Source: api/streaming.js

  1. /**
  2. * @file Manages Streaming APIs
  3. * @author Shinichi Tomita <shinichi.tomita@gmail.com>
  4. */
  5. 'use strict';
  6. var events = require('events'),
  7. inherits = require('inherits'),
  8. _ = require('lodash/core'),
  9. Faye = require('faye'),
  10. jsforce = require('../core');
  11. /**
  12. * Streaming API topic class
  13. *
  14. * @class Streaming~Topic
  15. * @param {Streaming} steaming - Streaming API object
  16. * @param {String} name - Topic name
  17. */
  18. var Topic = function(streaming, name) {
  19. this._streaming = streaming;
  20. this.name = name;
  21. };
  22. /**
  23. * @typedef {Object} Streaming~StreamingMessage
  24. * @prop {Object} event
  25. * @prop {Object} event.type - Event type
  26. * @prop {Record} sobject - Record information
  27. */
  28. /**
  29. * Subscribe listener to topic
  30. *
  31. * @method Streaming~Topic#subscribe
  32. * @param {Callback.<Streaming~StreamingMesasge>} listener - Streaming message listener
  33. * @returns {Subscription} - Faye subscription object
  34. */
  35. Topic.prototype.subscribe = function(listener) {
  36. return this._streaming.subscribe(this.name, listener);
  37. };
  38. /**
  39. * Unsubscribe listener from topic
  40. *
  41. * @method Streaming~Topic#unsubscribe
  42. * @param {Callback.<Streaming~StreamingMesasge>} listener - Streaming message listener
  43. * @returns {Streaming~Topic}
  44. */
  45. Topic.prototype.unsubscribe = function(listener) {
  46. this._streaming.unsubscribe(this.name, listener);
  47. return this;
  48. };
  49. /*--------------------------------------------*/
  50. /**
  51. * Streaming API Generic Streaming Channel
  52. *
  53. * @class Streaming~Channel
  54. * @param {Streaming} steaming - Streaming API object
  55. * @param {String} name - Channel name (starts with "/u/")
  56. */
  57. var Channel = function(streaming, name) {
  58. this._streaming = streaming;
  59. this._name = name;
  60. };
  61. /**
  62. * Subscribe to hannel
  63. *
  64. * @param {Callback.<Streaming~StreamingMessage>} listener - Streaming message listener
  65. * @returns {Subscription} - Faye subscription object
  66. */
  67. Channel.prototype.subscribe = function(listener) {
  68. return this._streaming.subscribe(this._name, listener);
  69. };
  70. Channel.prototype.unsubscribe = function(listener) {
  71. this._streaming.unsubscribe(this._name, listener);
  72. return this;
  73. };
  74. Channel.prototype.push = function(events, callback) {
  75. var isArray = _.isArray(events);
  76. events = isArray ? events : [ events ];
  77. var conn = this._streaming._conn;
  78. if (!this._id) {
  79. this._id = conn.sobject('StreamingChannel').findOne({ Name: this._name }, 'Id')
  80. .then(function(rec) { return rec.Id });
  81. }
  82. return this._id.then(function(id) {
  83. var channelUrl = '/sobjects/StreamingChannel/' + id + '/push';
  84. return conn.requestPost(channelUrl, { pushEvents: events });
  85. }).then(function(rets) {
  86. return isArray ? rets : rets[0];
  87. }).thenCall(callback);
  88. };
  89. /*--------------------------------------------*/
  90. /**
  91. * Streaming API class
  92. *
  93. * @class
  94. * @extends events.EventEmitter
  95. * @param {Connection} conn - Connection object
  96. */
  97. var Streaming = function(conn) {
  98. this._conn = conn;
  99. };
  100. inherits(Streaming, events.EventEmitter);
  101. /** @private **/
  102. Streaming.prototype._createClient = function(replay) {
  103. var endpointUrl = [ this._conn.instanceUrl, "cometd" + (replay ? "/replay" : ""), this._conn.version ].join('/');
  104. var fayeClient = new Faye.Client(endpointUrl, {});
  105. fayeClient.setHeader('Authorization', 'OAuth '+this._conn.accessToken);
  106. return fayeClient;
  107. };
  108. /** @private **/
  109. Streaming.prototype._getFayeClient = function(channelName) {
  110. var isGeneric = channelName.indexOf('/u/') === 0;
  111. var clientType = isGeneric ? 'generic' : 'pushTopic';
  112. if (!this._fayeClients || !this._fayeClients[clientType]) {
  113. this._fayeClients = this._fayeClients || {};
  114. this._fayeClients[clientType] = this._createClient(isGeneric);
  115. if (Faye.Transport.NodeHttp) {
  116. Faye.Transport.NodeHttp.prototype.batching = false; // prevent streaming API server error
  117. }
  118. }
  119. return this._fayeClients[clientType];
  120. };
  121. /**
  122. * Get named topic
  123. *
  124. * @param {String} name - Topic name
  125. * @returns {Streaming~Topic}
  126. */
  127. Streaming.prototype.topic = function(name) {
  128. this._topics = this._topics || {};
  129. var topic = this._topics[name] =
  130. this._topics[name] || new Topic(this, name);
  131. return topic;
  132. };
  133. /**
  134. * Get Channel for Id
  135. * @param {String} channelId - Id of StreamingChannel object
  136. * @returns {Streaming~Channel}
  137. */
  138. Streaming.prototype.channel = function(channelId) {
  139. return new Channel(this, channelId);
  140. };
  141. /**
  142. * Subscribe topic/channel
  143. *
  144. * @param {String} name - Topic name
  145. * @param {Callback.<Streaming~StreamingMessage>} listener - Streaming message listener
  146. * @returns {Subscription} - Faye subscription object
  147. */
  148. Streaming.prototype.subscribe = function(name, listener) {
  149. var channelName = name.indexOf('/') === 0 ? name : '/topic/' + name;
  150. var fayeClient = this._getFayeClient(channelName);
  151. return fayeClient.subscribe(channelName, listener);
  152. };
  153. /**
  154. * Unsubscribe topic
  155. *
  156. * @param {String} name - Topic name
  157. * @param {Callback.<Streaming~StreamingMessage>} listener - Streaming message listener
  158. * @returns {Streaming}
  159. */
  160. Streaming.prototype.unsubscribe = function(name, listener) {
  161. var channelName = name.indexOf('/') === 0 ? name : '/topic/' + name;
  162. var fayeClient = this._getFayeClient(channelName);
  163. fayeClient.unsubscribe(channelName, listener);
  164. return this;
  165. };
  166. /*--------------------------------------------*/
  167. /*
  168. * Register hook in connection instantiation for dynamically adding this API module features
  169. */
  170. jsforce.on('connection:new', function(conn) {
  171. conn.streaming = new Streaming(conn);
  172. });
  173. module.exports = Streaming;