Skip to content Skip to sidebar Skip to footer

How To Interleave Streams (with Backpressure)

Suppose I have two possibly infinite streams: s1 = a..b..c..d..e... s2 = 1.2.3.4.5.6.7... I want to merge the streams and then map merged stream with slowish asynchronous operatio

Solution 1:

Here's a crazy chunk of code that might help.

It turns the input streams into a single stream of 'value' events, then merges them with 'send' events (and 'end' events for bookkeeping). Then, using a state machine, it builds up queues out of the 'value' events, and dispatches values on 'send' events.

Originally I wrote a roundRobinThrottle, but I've moved it to a gist.

Here is a roundRobinPromiseMap that is very similar. The code in the gist is tested, but this is not.

# roundRobinPromiseMap :: (a -> Promise b) -> [EventStream] -> EventStream
roundRobinPromiseMap = (promiser, streams) ->
    # A bus to trigger new sends based on promise fulfillment
    promiseFulfilled = new Bacon.Bus()

    # Merge the input streams into a single, keyed stream
    theStream = Bacon.mergeAll(streams.map((s, idx) ->
        s.map((val) -> {
            type: 'value'index: idx
            value: val
        })
    ))
    # Merge in 'end' events
    .merge(Bacon.mergeAll(streams.map((s) ->
        s.mapEnd(-> {
            type: 'end'
        })
    )))
    # Merge in 'send' events that fire when the promise is fulfilled.
    .merge(promiseFulfilled.map({ type: 'send' }))
    # Feed into a state machine that keeps queues and only creates# output events on 'send' input events.
    .withStateMachine(
        {
            queues: streams.map(-> [])
            toPush: 0
            ended: 0
        }
        handleState

    )
    # Feed this output to the promiser
    theStream.onValue((value) ->
        Bacon.fromPromise(promiser(value)).onValue(->
            promiseFulfilled.push()
    ))

handleState = (state, baconEvent) ->
    outEvents = []

    if baconEvent.hasValue()
        # Handle a round robin event of 'value', 'send', or 'end'
        outEvents = handleRoundRobinEvent(state, baconEvent.value())
    else
        outEvents = [baconEvent]

    [state, outEvents]

handleRoundRobinEvent = (state, rrEvent) ->
    outEvents = []

    # 'value' : push onto queueif rrEvent.type == 'value'
        state.queues[rrEvent.index].push(rrEvent.value)
    # 'send' : send the next value by round-robin selectionelseif rrEvent.type == 'send'# Here's a sentinel for empty queues
        noValue = {}
        nextValue = noValue
        triedQueues = 0while nextValue == noValue && triedQueues < state.queues.length
            if state.queues[state.toPush].length > 0
                nextValue = state.queues[state.toPush].shift()
            state.toPush = (state.toPush + 1) % state.queues.length
            triedQueues++
        if nextValue != noValue
            outEvents.push(new Bacon.Next(nextValue))
    # 'end': Keep track of ended streamselseif rrEvent.type == 'end'
        state.ended++

    # End the round-robin stream if all inputs have endedif roundRobinEnded(state)
        outEvents.push(new Bacon.End())

    outEvents

roundRobinEnded = (state) ->
    emptyQueues = allEmpty(state.queues)
    emptyQueues && state.ended == state.queues.length

allEmpty = (arrays) ->
    for a in arrays
        return false if a.length > 0return true

Solution 2:

The core challenge here was to understand, how to formalise fairness. In the question I already mentioned worker analogy. Turned out that the obvious fairness criteria is to pick a stream that generated less events than others, or taken even further: whom generated streams waited for less time.

After that it was quite trivial to formalise the desired output using denotational semantics: code is on GitHub

I didn't had time to develop the denotational combinators to include withStateMachine from Bacon.js, so the next step was to reimplement it in JavaScript with Bacon.js directly. The whole runnable solution is available as a gist.

The idea is to make a state machine with

  • per stream costs and queues as a state
  • streams and additional feedback stream as inputs

As output of the whole system is feeded back, we can dequeue the next event when the previous flatMapped stream is ended.

For that I had to make a bit ugly rec combinator

functionrec(f) {
  var bus = newBacon.Bus();
  var result = f(bus);
  bus.plug(result);
  return result;
}

It's type is (EventStream a -> EventStream a) -> EventStream a - the type resembles other recursion combinators, e.g. fix.

It can be made with better system-wide behaviour, as Bus breaks unsubscription propagation. We have to work on that.

The Second helper function is stateMachine, which takes an array of streams and turns them into single state machine. Essentially it's .withStateMachine ∘ mergeAll ∘ zipWithIndex.

functionstateMachine(inputs, initState, f) {
  var mapped = inputs.map(function (input, i) {
    return input.map(function (x) {
      return [i, x];
    })
  });
  returnBacon.mergeAll(mapped).withStateMachine(initState, function (state, p) {
    if (p.hasValue()) {
      p = p.value();
      returnf(state, p[0], p[1]);
    } else {
      return [state, p];
    }
  });
}

Using this two helpers we can write a not-so-complex fair scheduler:

functionfairScheduler(streams, fn) {
  var streamsCount = streams.length;
  returnrec(function (res) {
    returnstateMachine(append(streams, res), initialFairState(streamsCount), function (state, i, x) {
      // console.log("FAIR: " + JSON.stringify(state), i, x);// END eventif (i == streamsCount && x.end) {
        var additionalCost = newDate().getTime() - x.started;

        // add cost to input stream cost centervar updatedState = _.extend({}, state, {
          costs: updateArray(
            state.costs,
            x.idx, function (cost) { return cost + additionalCost; }),
        });

        if (state.queues.every(function (q) { return q.length === 0; })) {
          // if queues are empty, set running: false and don't emit any eventsreturn [_.extend({}, updatedState, { running: false }), []];
        } else {
          // otherwise pick a stream with// - non-empty queue// - minimal costvar minQueueIdx = _.chain(state.queues)
            .map(function (q, i) {
              return [q, i];
            })
            .filter(function (p) {
              return p[0].length !== 0;
            })
            .sortBy(function (p) {
              return state.costs[p[1]];
            })
            .value()[0][1];

          // emit an event from that streamreturn [
            _.extend({}, updatedState, {
              queues: updateArray(state.queues, minQueueIdx, function (q) { return q.slice(1); }),
              running: true,
            }),
            [newBacon.Next({
              value: state.queues[minQueueIdx][0],
              idx: minQueueIdx,
            })],
          ];
        }
      } elseif (i < streamsCount) {
        // event from input streamif (state.running) {
          // if worker is running, just enquee the eventreturn [
            _.extend({}, state, {
              queues: updateArray(state.queues, i, function (q) { return q .concat([x]); }),
            }),
            [],
          ];
        } else {
          // if worker isn't running, start it right awayreturn [
            _.extend({}, state, {
              running: true,
            }),
            [newBacon.Next({ value: x, idx: i})],
          ]
        }
      } else {
        return [state, []];
      }

    })
    .flatMapConcat(function (x) {
      // map passed thru events,// and append special "end" eventreturnfn(x).concat(Bacon.once({
        end: true,
        idx: x.idx,
        started: newDate().getTime(),
      }));
    });
  })
  .filter(function (x) {
    // filter out END eventsreturn !x.end;
  })
  .map(".value"); // and return only value field
}

Rest of the code in the gist is quite straight-forward.

Post a Comment for "How To Interleave Streams (with Backpressure)"