Kth Largest in Stream
Maintain the k-th largest value while values arrive one at a time.
Kth Largest in Stream
Kth Largest in Stream maintains the k-th largest value of all values seen so far. It uses a min heap of size $k$.
The heap stores the current top $k$ values. The smallest value in that heap is the k-th largest value overall.
Problem
Given a stream of values
$$ x_1, x_2, \dots, x_n $$
support two operations:
add(x): insert a new value
query(): return the k-th largest value seen so far
The query is valid only after at least $k$ values have been inserted.
Algorithm
Maintain a min heap $H$ of size at most $k$.
add(x):
if size(H) < k:
push x into H
else if x > minimum(H):
pop minimum from H
push x into H
query():
return minimum(H)
Example
Let $k = 3$ and stream:
$$ 7, 2, 9, 4, 3 $$
After all values arrive, the three largest values are:
$$ [4, 7, 9] $$
The heap minimum is $4$, so the 3rd largest value is:
$$ 4 $$
Correctness
After each insertion, the heap contains the largest $k$ values seen so far, or all values if fewer than $k$ values have arrived.
If the heap has fewer than $k$ values, the new value must be kept. If the heap is full and the new value is less than or equal to the heap minimum, then there are already $k$ values at least as large as it, so it cannot affect the k-th largest value. If the new value is larger than the heap minimum, replacing the minimum restores the top $k$ set.
Therefore the heap minimum is exactly the k-th largest value.
Complexity
| operation | cost |
|---|---|
| add | $O(\log k)$ |
| query | $O(1)$ |
| memory | $O(k)$ |
The total cost for $n$ insertions is:
$$ O(n \log k) $$
When to Use
Use Kth Largest in Stream when:
- values arrive online
- the full input should not be stored
- only the k-th largest value is needed
- $k$ is much smaller than the number of values
For static arrays, Quickselect is faster on average.
Implementation
import heapq
class KthLargestInStream:
def __init__(self, k):
self.k = k
self.heap = []
def add(self, x):
if len(self.heap) < self.k:
heapq.heappush(self.heap, x)
elif x > self.heap[0]:
heapq.heapreplace(self.heap, x)
def query(self):
if len(self.heap) < self.k:
raise ValueError("fewer than k values inserted")
return self.heap[0]
package main
import "container/heap"
type KthLargestInStream struct {
k int
h *MinHeap
}
func NewKthLargestInStream(k int) *KthLargestInStream {
h := &MinHeap{}
heap.Init(h)
return &KthLargestInStream{
k: k,
h: h,
}
}
func (s *KthLargestInStream) Add(x int) {
if s.h.Len() < s.k {
heap.Push(s.h, x)
return
}
if x > (*s.h)[0] {
heap.Pop(s.h)
heap.Push(s.h, x)
}
}
func (s *KthLargestInStream) Query() (int, bool) {
if s.h.Len() < s.k {
return 0, false
}
return (*s.h)[0], true
}