Observer.js

import { noop } from './utilities';
/**
 * @param {{next: Function, error: Function, complete: Function}} observer
 */
export class Observer {
  constructor (observer = {}) {
    this.isComplete = false;
    this.dispose = noop;
    
    this.setupObserver(observer);
  }

  /**
   * Cleans up and disposes of all events in the stream
   *
   * @returns {Observer}
   */
  cleanup () {
    this.isComplete = true;

    this.dispose();
  
    this.setupObserverObject = {
      next: noop,
      error: noop,
      complete: noop,
    };
    this.dispose = noop;
    
    return this;
  }
  
  catchErrors (callback) {
    return (...args) => {
      try {
        return callback(...args);
      } catch (errors) {
        this.onError(errors);
      }
    };
  }

  /**
   * @param {Function} callback
   * @returns {Function}
   */
  use (callback) {
    const callbackCatch = this.catchErrors(callback);
    const response = callbackCatch({
      next: (value) => this.onNext(value),
      error: (...errors) => this.onError(...errors),
      complete: () => this.onComplete(),
    });
    
    if (typeof response === 'function') {
      this.dispose = this.catchErrors(response);
    } else {
      this.dispose = noop;
    }
    
    return this.dispose;
  }
  
  onNext (value) {
    if (this.isComplete) {
      return null;
    }
    
    return this.setupObserverObject.next(value);
  }
  
  onError (...errors) {
    if (this.isComplete) {
      return null;
    }
  
    return this.setupObserverObject.error(...errors);
  }
  
  onComplete () {
    if (this.isComplete) {
      return null;
    }
    
    this.cleanup();
  
    return this.setupObserverObject.complete();
  }
  
  setupObserver (setupObserverObject = { next: noop, error: noop, complete: noop }) {
    // assumes that an object was passed as first value to subscription
    if (typeof setupObserverObject.next !== 'function' && typeof setupObserverObject.next === 'object') {
      return this.setupObserver(setupObserverObject.next);
    }
    
    const { next = noop, error = noop, complete = noop } = setupObserverObject;
    
    this.setupObserverObject = {
      next: this.catchErrors(next),
      error,
      complete: this.catchErrors(complete),
    };
  
    return this;
  }
}