Observable.js 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. var _ = require("./_");
  2. var genIterator = require("./genIterator");
  3. var Promise = _.Promise;
  4. /**
  5. * Create a composable observable object.
  6. * Promise can't resolve multiple times, this class makes it possible, so
  7. * that you can easily map, filter and even back pressure events in a promise way.
  8. * For live example: [Double Click Demo](https://jsbin.com/niwuti/edit?html,js,output).
  9. * @version_added v0.7.2
  10. * @param {Function} executor `(next) ->` It's optional.
  11. * @return {Observable}
  12. * @example
  13. * ```js
  14. * var Observable = require("yaku/lib/Observable");
  15. * var linear = new Observable();
  16. *
  17. * var x = 0;
  18. * setInterval(linear.next, 1000, x++);
  19. *
  20. * // Wait for 2 sec then emit the next value.
  21. * var quad = linear.subscribe(async x => {
  22. * await sleep(2000);
  23. * return x * x;
  24. * });
  25. *
  26. * var another = linear.subscribe(x => -x);
  27. *
  28. * quad.subscribe(
  29. * value => { console.log(value); },
  30. * reason => { console.error(reason); }
  31. * );
  32. *
  33. * // Emit error
  34. * linear.error(new Error("reason"));
  35. *
  36. * // Unsubscribe an observable.
  37. * quad.unsubscribe();
  38. *
  39. * // Unsubscribe all subscribers.
  40. * linear.subscribers = [];
  41. * ```
  42. * @example
  43. * Use it with DOM.
  44. * ```js
  45. * var filter = fn => v => fn(v) ? v : new Promise(() => {});
  46. *
  47. * var keyup = new Observable((next) => {
  48. * document.querySelector('input').onkeyup = next;
  49. * });
  50. *
  51. * var keyupText = keyup.subscribe(e => e.target.value);
  52. *
  53. * // Now we only get the input when the text length is greater than 3.
  54. * var keyupTextGT3 = keyupText.subscribe(filter(text => text.length > 3));
  55. *
  56. * keyupTextGT3.subscribe(v => console.log(v));
  57. * ```
  58. */
  59. var Observable = module.exports = function Observable (executor) {
  60. var self = this;
  61. genHandler(self);
  62. self.subscribers = [];
  63. executor && executor(self.next, self.error);
  64. };
  65. _.extendPrototype(Observable, {
  66. /**
  67. * Emit a value.
  68. * @param {Any} value
  69. * so that the event will go to `onError` callback.
  70. */
  71. next: null,
  72. /**
  73. * Emit an error.
  74. * @param {Any} value
  75. */
  76. error: null,
  77. /**
  78. * The publisher observable of this.
  79. * @type {Observable}
  80. */
  81. publisher: null,
  82. /**
  83. * All the subscribers subscribed this observable.
  84. * @type {Array}
  85. */
  86. subscribers: null,
  87. /**
  88. * It will create a new Observable, like promise.
  89. * @param {Function} onNext
  90. * @param {Function} onError
  91. * @return {Observable}
  92. */
  93. subscribe: function (onNext, onError) {
  94. var self = this, subscriber = new Observable();
  95. subscriber._onNext = onNext;
  96. subscriber._onError = onError;
  97. subscriber._nextErr = genNextErr(subscriber.next);
  98. subscriber.publisher = self;
  99. self.subscribers.push(subscriber);
  100. return subscriber;
  101. },
  102. /**
  103. * Unsubscribe this.
  104. */
  105. unsubscribe: function () {
  106. var publisher = this.publisher;
  107. publisher && publisher.subscribers.splice(publisher.subscribers.indexOf(this), 1);
  108. }
  109. });
  110. function genHandler (self) {
  111. self.next = function (val) {
  112. var i = 0, len = self.subscribers.length, subscriber;
  113. while (i < len) {
  114. subscriber = self.subscribers[i++];
  115. Promise.resolve(val).then(
  116. subscriber._onNext,
  117. subscriber._onError
  118. ).then(
  119. subscriber.next,
  120. subscriber._nextErr
  121. );
  122. }
  123. };
  124. self.error = function (err) {
  125. self.next(Promise.reject(err));
  126. };
  127. self.emit = function () {
  128. console.trace("Observable[[emit]] is deprecated, use [[next]] instead."); // eslint-disable-line
  129. self.next.apply(0, arguments);
  130. };
  131. }
  132. function genNextErr (next) {
  133. return function (reason) {
  134. next(Promise.reject(reason));
  135. };
  136. }
  137. /**
  138. * Merge multiple observables into one.
  139. * @version_added 0.9.6
  140. * @param {Iterable} iterable
  141. * @return {Observable}
  142. * @example
  143. * ```js
  144. * var Observable = require("yaku/lib/Observable");
  145. * var sleep = require("yaku/lib/sleep");
  146. *
  147. * var src = new Observable(next => setInterval(next, 1000, 0));
  148. *
  149. * var a = src.subscribe(v => v + 1; });
  150. * var b = src.subscribe((v) => sleep(10, v + 2));
  151. *
  152. * var out = Observable.merge([a, b]);
  153. *
  154. * out.subscribe((v) => {
  155. * console.log(v);
  156. * })
  157. * ```
  158. */
  159. Observable.merge = function merge (iterable) {
  160. var iter = genIterator(iterable);
  161. return new Observable(function (next) {
  162. var item;
  163. function onError (e) {
  164. next(Promise.reject(e));
  165. }
  166. while (!(item = iter.next()).done) {
  167. item.value.subscribe(next, onError);
  168. }
  169. });
  170. };