operators/zip.js

import { Observable } from '../Observable';
import { onSubscriptionsComplete } from '../utilities/onSubscriptionsComplete';

const startIndex = {
  values: [],
  indexAt: -1,
};

const argsCallback = function () { return Array.from(arguments); };

/**
 * @memberof operators
 * 
 * @param {Observable} sources$
 * @param {Function} combineCallback
 * @returns {Observable}
 */
export const zip = function (sources$, combineCallback = argsCallback) {
  return new Observable(function ({ next, error, complete }) {
    let subscriptions = [];
    
    let latest = sources$.map(s$ => JSON.parse(JSON.stringify(startIndex)));
    
    const checkAllHasValue = (index) => (
      latest.reduce((current, { values }) => {
        return current && Object.keys(values).includes(index + '');
      }, true)
    );
    const getValuesAtIndex = (index) => {
      return latest.map(({ values }) => {
        return values[index];
      });
    };
    
    const onComplete = () => onSubscriptionsComplete(subscriptions, complete);
    const subscribeTo = (obs$, index) => {
      return obs$.subscribe({
        next (value) {
          const currentIndex = latest[index];
          currentIndex.values.push(value);
          currentIndex.indexAt += 1;
  
          const allHasValue = checkAllHasValue(currentIndex.indexAt);
          
          if (allHasValue) {
            const values = getValuesAtIndex(currentIndex.indexAt);
            next(combineCallback(...values));
          }
        },
        error,
        complete: onComplete,
      });
    };
  
    subscriptions = sources$.map((s$, index) => subscribeTo(s$, index));
    
    return () => subscriptions.forEach((s) => s.unsubscribe());
  });
};

Observable.zip = zip;
Observable.prototype.zip = function (otherSources$, combineCallback) {
  return zip([this, ...otherSources$], combineCallback);
};