Weighted Reservoir Sampling
Select a random sample from a stream where each item has a nonnegative weight.
Weighted Reservoir Sampling
Weighted Reservoir Sampling selects a random sample from a stream where each item has a weight. Higher weight items should be more likely to appear in the sample.
A common method assigns each item a random priority derived from its weight, then keeps the $k$ items with the largest priorities.
Problem
Given a stream of weighted items:
$$ (x_1, w_1), (x_2, w_2), \dots, (x_n, w_n) $$
where each weight satisfies:
$$ w_i > 0 $$
select $k$ items so that larger weights give larger sampling probability.
Algorithm
For each item, draw a random value $u$ uniformly from $(0, 1)$ and compute a key:
$$ key = u^{1/w} $$
Keep the $k$ items with the largest keys.
weighted_reservoir_sampling(stream, k):
H = empty min heap
for each (x, w) in stream:
u = random real number in (0, 1)
key = u ^ (1 / w)
if size(H) < k:
push (key, x) into H
else if key > minimum_key(H):
pop minimum from H
push (key, x) into H
return items in H
The min heap keeps the current sample. Its root is the weakest sampled priority.
Key Idea
The random key transforms weights into priorities. Larger weights make $u^{1/w}$ closer to $1$ on average, so high weight items are more likely to survive among the top $k$ keys.
This gives weighted sampling without replacement.
Example
Let the stream be:
$$ (a, 1), (b, 2), (c, 10) $$
and let:
$$ k = 1 $$
Item $c$ has the largest weight, so it has the highest chance of being selected, but selection remains randomized.
Correctness
The key formula assigns each item a random priority whose distribution depends on its weight. Selecting the largest $k$ priorities implements weighted sampling without replacement.
For $k = 1$, the probability that item $i$ has the largest key is proportional to $w_i$. For larger $k$, the top priority construction extends this idea to a weighted sample without replacement.
Complexity
| operation | cost |
|---|---|
| update per item | $O(\log k)$ |
| total time | $O(n \log k)$ |
| memory | $O(k)$ |
The algorithm is online and does not need to know the stream length.
When to Use
Use Weighted Reservoir Sampling when:
- items arrive as a stream
- the stream length is unknown
- items have unequal importance
- sampling must be done without storing all items
It is useful for weighted logs, telemetry, ranking samples, randomized load testing, and approximate analytics.
Implementation
import heapq
import random
def weighted_reservoir_sampling(stream, k):
heap = []
for x, w in stream:
if w <= 0:
continue
u = random.random()
while u == 0.0:
u = random.random()
key = u ** (1.0 / w)
if len(heap) < k:
heapq.heappush(heap, (key, x))
elif key > heap[0][0]:
heapq.heapreplace(heap, (key, x))
return [x for _, x in heap]
package main
import (
"container/heap"
"math"
"math/rand"
)
type WeightedSampleItem[T any] struct {
Value T
Weight float64
}
type WeightedKeyItem[T any] struct {
Key float64
Value T
}
type WeightedReservoirHeap[T any] []WeightedKeyItem[T]
func (h WeightedReservoirHeap[T]) Len() int {
return len(h)
}
func (h WeightedReservoirHeap[T]) Less(i, j int) bool {
return h[i].Key < h[j].Key
}
func (h WeightedReservoirHeap[T]) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *WeightedReservoirHeap[T]) Push(x any) {
*h = append(*h, x.(WeightedKeyItem[T]))
}
func (h *WeightedReservoirHeap[T]) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
func WeightedReservoirSampling[T any](stream []WeightedSampleItem[T], k int) []T {
h := &WeightedReservoirHeap[T]{}
heap.Init(h)
for _, item := range stream {
if item.Weight <= 0 {
continue
}
u := rand.Float64()
for u == 0 {
u = rand.Float64()
}
key := math.Pow(u, 1.0/item.Weight)
if h.Len() < k {
heap.Push(h, WeightedKeyItem[T]{
Key: key,
Value: item.Value,
})
} else if key > (*h)[0].Key {
heap.Pop(h)
heap.Push(h, WeightedKeyItem[T]{
Key: key,
Value: item.Value,
})
}
}
out := make([]T, 0, h.Len())
for _, item := range *h {
out = append(out, item.Value)
}
return out
}