| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- var _ = require("./_");
- var genIterator = require("./genIterator");
- var Promise = _.Promise;
- /**
- * Create a composable observable object.
- * Promise can't resolve multiple times, this class makes it possible, so
- * that you can easily map, filter and even back pressure events in a promise way.
- * For live example: [Double Click Demo](https://jsbin.com/niwuti/edit?html,js,output).
- * @version_added v0.7.2
- * @param {Function} executor `(next) ->` It's optional.
- * @return {Observable}
- * @example
- * ```js
- * var Observable = require("yaku/lib/Observable");
- * var linear = new Observable();
- *
- * var x = 0;
- * setInterval(linear.next, 1000, x++);
- *
- * // Wait for 2 sec then emit the next value.
- * var quad = linear.subscribe(async x => {
- * await sleep(2000);
- * return x * x;
- * });
- *
- * var another = linear.subscribe(x => -x);
- *
- * quad.subscribe(
- * value => { console.log(value); },
- * reason => { console.error(reason); }
- * );
- *
- * // Emit error
- * linear.error(new Error("reason"));
- *
- * // Unsubscribe an observable.
- * quad.unsubscribe();
- *
- * // Unsubscribe all subscribers.
- * linear.subscribers = [];
- * ```
- * @example
- * Use it with DOM.
- * ```js
- * var filter = fn => v => fn(v) ? v : new Promise(() => {});
- *
- * var keyup = new Observable((next) => {
- * document.querySelector('input').onkeyup = next;
- * });
- *
- * var keyupText = keyup.subscribe(e => e.target.value);
- *
- * // Now we only get the input when the text length is greater than 3.
- * var keyupTextGT3 = keyupText.subscribe(filter(text => text.length > 3));
- *
- * keyupTextGT3.subscribe(v => console.log(v));
- * ```
- */
- var Observable = module.exports = function Observable (executor) {
- var self = this;
- genHandler(self);
- self.subscribers = [];
- executor && executor(self.next, self.error);
- };
- _.extendPrototype(Observable, {
- /**
- * Emit a value.
- * @param {Any} value
- * so that the event will go to `onError` callback.
- */
- next: null,
- /**
- * Emit an error.
- * @param {Any} value
- */
- error: null,
- /**
- * The publisher observable of this.
- * @type {Observable}
- */
- publisher: null,
- /**
- * All the subscribers subscribed this observable.
- * @type {Array}
- */
- subscribers: null,
- /**
- * It will create a new Observable, like promise.
- * @param {Function} onNext
- * @param {Function} onError
- * @return {Observable}
- */
- subscribe: function (onNext, onError) {
- var self = this, subscriber = new Observable();
- subscriber._onNext = onNext;
- subscriber._onError = onError;
- subscriber._nextErr = genNextErr(subscriber.next);
- subscriber.publisher = self;
- self.subscribers.push(subscriber);
- return subscriber;
- },
- /**
- * Unsubscribe this.
- */
- unsubscribe: function () {
- var publisher = this.publisher;
- publisher && publisher.subscribers.splice(publisher.subscribers.indexOf(this), 1);
- }
- });
- function genHandler (self) {
- self.next = function (val) {
- var i = 0, len = self.subscribers.length, subscriber;
- while (i < len) {
- subscriber = self.subscribers[i++];
- Promise.resolve(val).then(
- subscriber._onNext,
- subscriber._onError
- ).then(
- subscriber.next,
- subscriber._nextErr
- );
- }
- };
- self.error = function (err) {
- self.next(Promise.reject(err));
- };
- self.emit = function () {
- console.trace("Observable[[emit]] is deprecated, use [[next]] instead."); // eslint-disable-line
- self.next.apply(0, arguments);
- };
- }
- function genNextErr (next) {
- return function (reason) {
- next(Promise.reject(reason));
- };
- }
- /**
- * Merge multiple observables into one.
- * @version_added 0.9.6
- * @param {Iterable} iterable
- * @return {Observable}
- * @example
- * ```js
- * var Observable = require("yaku/lib/Observable");
- * var sleep = require("yaku/lib/sleep");
- *
- * var src = new Observable(next => setInterval(next, 1000, 0));
- *
- * var a = src.subscribe(v => v + 1; });
- * var b = src.subscribe((v) => sleep(10, v + 2));
- *
- * var out = Observable.merge([a, b]);
- *
- * out.subscribe((v) => {
- * console.log(v);
- * })
- * ```
- */
- Observable.merge = function merge (iterable) {
- var iter = genIterator(iterable);
- return new Observable(function (next) {
- var item;
- function onError (e) {
- next(Promise.reject(e));
- }
- while (!(item = iter.next()).done) {
- item.value.subscribe(next, onError);
- }
- });
- };
|