Streaming Top K
Maintain the k largest elements while values arrive one at a time.
Streaming Top K
Streaming Top K maintains the largest $k$ elements of a data stream. The input does not need to fit in memory, and the algorithm can update the answer after each new value.
The standard method keeps a min heap of size $k$. The smallest value in the heap is the current cutoff for membership in the top $k$.
Problem
Given a stream of values
$$ x_1, x_2, \dots, x_n $$
maintain the $k$ largest values seen so far.
Algorithm
Use a min heap of size at most $k$.
streaming_top_k(k):
H = empty min heap
for each incoming value x:
if size(H) < k:
push x into H
else if x > minimum(H):
pop minimum from H
push x into H
return elements of H
The heap contains the current top $k$ values, possibly in heap order rather than sorted order.
Example
Let the stream be:
$$ 7, 2, 9, 4, 3 $$
and let:
$$ k = 3 $$
The final top $3$ values are:
$$ [4, 7, 9] $$
A min heap stores these values with $4$ at the root, because $4$ is the smallest value still inside the top $3$.
Correctness
After each input value is processed, the heap contains the largest $k$ values from the prefix seen so far.
If the heap has fewer than $k$ elements, the new value must be kept. If the heap is full and the new value is no larger than the heap minimum, then there are already $k$ values at least as large as it, so it cannot belong to the top $k$. If it is larger than the heap minimum, replacing the minimum preserves exactly the best $k$ candidates.
By induction over the stream length, the final heap contains the $k$ largest values of the whole stream.
Complexity
| operation | cost |
|---|---|
| memory | $O(k)$ |
| update per item | $O(\log k)$ |
| total for $n$ values | $O(n \log k)$ |
| sorted final output | $O(k \log k)$ |
The algorithm is online because it can report the current top $k$ after each update.
When to Use
Use Streaming Top K when:
- values arrive incrementally
- the full input is too large to store
- $k$ is much smaller than the stream length
- an approximate or delayed batch method is unnecessary
For static in-memory arrays, Quickselect can find an unordered top $k$ in expected $O(n)$ time.
Implementation
import heapq
class StreamingTopK:
def __init__(self, k):
self.k = k
self.heap = []
def add(self, x):
if self.k <= 0:
return
if len(self.heap) < self.k:
heapq.heappush(self.heap, x)
elif x > self.heap[0]:
heapq.heapreplace(self.heap, x)
def values(self):
return list(self.heap)
def sorted_values(self):
return sorted(self.heap)
package main
import "container/heap"
type StreamingTopK struct {
k int
h *MinHeap
}
func NewStreamingTopK(k int) *StreamingTopK {
h := &MinHeap{}
heap.Init(h)
return &StreamingTopK{
k: k,
h: h,
}
}
func (s *StreamingTopK) Add(x int) {
if s.k <= 0 {
return
}
if s.h.Len() < s.k {
heap.Push(s.h, x)
return
}
if x > (*s.h)[0] {
heap.Pop(s.h)
heap.Push(s.h, x)
}
}
func (s *StreamingTopK) Values() []int {
out := make([]int, s.h.Len())
copy(out, *s.h)
return out
}