operators/combine.js

import { Observable } from '../Observable';
import { onSubscriptionsComplete } from '../utilities/onSubscriptionsComplete';

const nullHash = void(0);

const argsCallback = function () { return Array.from(arguments); };

/**
 * combines multiple observables at the same time.
 * it will only call the observer's next function when all observables have emitted at least one value
 * 
 * @memberof operators
 *
 * @param {Observable[]} sources$
 * @param {Function} combineCallback
 * @returns {Observable}
 */
export const combine = function (sources$, combineCallback = argsCallback) {
  return new Observable(function ({ next, error, complete }) {
    let subscriptions = [];
    
    let latest = sources$.map(s$ => nullHash);
    
    let allHasValue = false;
    const checkAllHasValue = () => latest.filter((l) => l == nullHash).length <= 0;
    
    const onComplete = () => onSubscriptionsComplete(subscriptions, complete);
    const subscribeTo = (obs$, index) => {
      return obs$.subscribe({
        next (value) {
          latest[index] = value;
  
          allHasValue = allHasValue || checkAllHasValue();
          
          if (allHasValue) {
            next(combineCallback(...latest));
          }
        },
        error,
        complete: onComplete,
      });
    };
  
    subscriptions = sources$.map((s$, index) => subscribeTo(s$, index));
    
    return () => subscriptions.forEach((s) => s.unsubscribe());
  });
};

Observable.combine = combine;
Observable.prototype.combine = function (otherSources$, combineCallback) {
  return combine([this, ...otherSources$], combineCallback);
};