Top K by Heap
Find the k largest or k smallest elements by maintaining a bounded heap.
Top K by Heap
Top K by Heap finds the largest or smallest $k$ elements without sorting the whole input. It keeps a heap of size $k$ and discards values that cannot belong to the final answer.
For the $k$ largest elements, use a min heap. The heap root is the smallest value among the current candidates. When a new value is larger than the root, replace the root.
Problem
Given an array $A$ of length $n$ and an integer $k$, return the $k$ largest elements of $A$.
The output may be unordered unless sorted output is explicitly required.
Algorithm
Maintain a min heap of size at most $k$.
top_k_by_heap(A, k):
H = empty min heap
for x in A:
if size(H) < k:
push x into H
else if x > minimum(H):
pop minimum from H
push x into H
return elements of H
Example
Let
$$ A = [7, 2, 9, 4, 3] $$
and
$$ k = 3 $$
The three largest values are:
$$ [4, 7, 9] $$
The heap may return them in heap order rather than sorted order.
Correctness
After each scanned prefix, the heap contains the $k$ largest values from that prefix, or all values if fewer than $k$ have been seen. When the heap is full, any new value less than or equal to the heap minimum cannot enter the top $k$, because there are already $k$ values at least as large as it.
At the end of the scan, the invariant applies to the whole array, so the heap contains exactly the top $k$ elements.
Complexity
| operation | cost |
|---|---|
| heap size | $k$ |
| each update | $O(\log k)$ |
| total time | $O(n \log k)$ |
| space | $O(k)$ |
If the final result must be sorted, add:
$$ O(k \log k) $$
When to Use
Use Top K by Heap when $k$ is much smaller than $n$, the input arrives as a stream, or memory must stay bounded.
For in-memory arrays where unordered top-k is enough, Quickselect or Nth Element can achieve expected $O(n)$ time.
Implementation
import heapq
def top_k_by_heap(a, k):
if k <= 0:
return []
heap = []
for x in a:
if len(heap) < k:
heapq.heappush(heap, x)
elif x > heap[0]:
heapq.heapreplace(heap, x)
return heap
package main
import "container/heap"
type MinHeap []int
func (h MinHeap) Len() int {
return len(h)
}
func (h MinHeap) Less(i, j int) bool {
return h[i] < h[j]
}
func (h MinHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *MinHeap) Push(x any) {
*h = append(*h, x.(int))
}
func (h *MinHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
func TopKByHeap(a []int, k int) []int {
if k <= 0 {
return nil
}
h := &MinHeap{}
heap.Init(h)
for _, x := range a {
if h.Len() < k {
heap.Push(h, x)
} else if x > (*h)[0] {
heap.Pop(h)
heap.Push(h, x)
}
}
return []int(*h)
}