Sunday, February 28, 2021

Top "n" using a Priority Queue

If you ever need to capture the smallest or largest "n" from a stream of data, the approach more often than not will be to use a simple data-structure called the Priority Queue. 

Priority Queues do one thing very well - once a bunch of data is added, it can return the lowest value (or the highest value) in constant time. 

How is this useful to answer a top or bottom "n" type question. Let's see. 

Consider this hypothetical stream of data:




And you have to answer the smallest 2 at any point as this stream of data comes in. The trick is to use a Priority Queue 

  • with a size limit of 2
  • which returns the largest of these 2 when asked for (sometimes referred to as a Max Priority Queue)

Two considerations as data streams in:
  • if the size of the priority queue is less than 2 then add the value to the priority queue
  • if the size of the priority queue is equal to 2, then compare the value from the stream with the largest in the queue
    • if less then remove the largest and add the new value
    • if more then do nothing

At the bottom is the state of the Priority Queue as each data in the stream is processed:



See how it always holds the bottom 2. 

Similarly for the largest 3, a Priority Queue with a max capacity of 3 which returns the smallest (referred to as a Min Priority Queue) can be used the following way:
  • if the size of the priority queue is less than 3, then add to the priority queue
  • if the size is equal to 2, then check the value from the stream with the smallest in the queue
    • if more then remove smallest add the value from stream and ignore otherwise



Implementation

Here is a simple kotlin based implementation that uses the built in PriorityQueue in Java standard library.


fun findNSmallestAndLargest(nums: List<Int>, n: Int): Pair<List<Int>, List<Int>> {
    val minFirst: Comparator<Int> = Comparator.naturalOrder<Int>()
    val maxFirst: Comparator<Int> = minFirst.reversed()
    val minPq: PriorityQueue<Int> = PriorityQueue(minFirst)
    val maxPq: PriorityQueue<Int> = PriorityQueue(maxFirst)
    for (num in nums) {
        checkAndAddIfSmallest(maxPq, n, num)
        checkAndAddIfLargest(minPq, n, num)
    }
    return maxPq.toList() to minPq.toList()
}

private fun checkAndAddIfSmallest(maxPq: PriorityQueue<Int>, n: Int, num: Int) {
    if (maxPq.size < n) {
        maxPq.add(n)
    } else if (num < maxPq.peek()) {
        maxPq.poll()
        maxPq.add(num)
    }
}

private fun checkAndAddIfLargest(minPq: PriorityQueue<Int>, n: Int, num: Int) {
    if (minPq.size < n) {
        minPq.add(n)
    } else if (num > minPq.peek()) {
        minPq.poll()
        minPq.add(num)
    }
}

The implementation is very straightforward and follows the outlined algorithm to the letter.

Monday, January 18, 2021

Generating a stream of Fibonacci numbers

A Java stream represents potentially an infinite sequence of data. This is a simple post that will go into the mechanics involved in generating a simple stream of Fibonacci numbers.


The simplest way to get this stream of data is to use the generate method of Stream.

As you can imagine to generate a specific Fibonacci number in this sequence, the previous two numbers are required, which means the state of the previous two numbers need to be maintained somewhere. The two solutions that I will be describing here both maintain this state, however they do it differently.



Mutable State

In the first approach, I am just going to maintain state this way:


class FibState {
    private long[] prev = new long[2];
    private int index = 0;

    public long nextFib() {
        long result = (index == 0) ? 1
                : ((index == 1) ? 1 : prev[0] + prev[1]);
        prev[0] = prev[1];
        prev[1] = result;
        index++;
        return result;
    }
}


with index keeping track of the index of the current fibonacci number in the sequence and prev capturing the most recent two in the sequence. So the next in the series is generated by mutating the index and the changing the array holding the recent values. So given this state, how do we generate the stream, using code which looks like this:

   
Stream<Long> streamOfFib() {
    FibState fibState = new FibState();
    return Stream.generate(() -> fibState.nextFib());
}

This is using a closure to capture the fibState and mutating it repeatedly as the stream of numbers is generated. The approach works well, though the thought of mutating one value probably should induce a level of dread - is it thread safe (probably not), will it work for parallel streams(likely not), but should suffice for cases where the access is strictly sequential. A far better approach is to get a version of state that is immutable.

Immutable State


class FibState {
    private final long[] prev;
    private final int index;

    public FibState() {
        this(new long[]{-1, 1}, 0);
    }

    public FibState(long[] prev, int index) {
        this.prev = prev;
        this.index = index;
    }

    public FibState nextFib() {
        int nextIndex = index + 1;
        long result = (nextIndex == 1) ? 1 : prev[0] + prev[1];
        return new FibState(new long[]{prev[1], result}, nextIndex);
    }

    public long getValue() {
        return prev[1];
    }
}
Instead of mutating the state, it returns the next immutable state. Alright, so now that this version of state is available, how can it be used - by using the "iterate" function of Stream, like this:
Stream<Long> streamOfFib() {
    return Stream
            .iterate(new FibState(), fibState -> fibState.nextFib())
            .map(fibState -> fibState.getValue());
}

this function takes two parameters - the initial state and something which can generate the next state. Also to return numbers from it, I am mapping this "state" type to a number in the "map" operation.


Conclusion

This is generally instructive on how to generate a stream using Fibonacci sequence as an example. The approach will be fairly similar for any stream that you may need to generate. My personal preference is for the Immutable state variation as that delegates most of the mechanism of generating the stream to the excellent "iterate" helper function of Stream.