Redis with bunpy
Install
bunpy add redisMake sure Redis is running locally:
# macOS
brew install redis && brew services start redis
# Docker
docker run -d -p 6379:6379 redis:7-alpineConnect
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
print(r.ping()) # Truedecode_responses=True tells redis-py to return Python strings instead of bytes. For binary values (images, pickled objects) leave it off.
Use a connection URL when deploying to Redis Cloud or Heroku:
import redis
import os
r = redis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379/0"), decode_responses=True)String set/get with TTL
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
# Set a key that expires after 60 seconds
r.set("greeting", "hello", ex=60)
value = r.get("greeting")
print(value) # hello
print(r.ttl("greeting")) # ~60
# setex is equivalent
r.setex("token:abc123", 3600, "user:42")
# check existence
if r.exists("token:abc123"):
print("token is valid")
# atomic increment
r.set("page_views", 0)
r.incr("page_views")
r.incr("page_views")
print(r.get("page_views")) # 2Hash operations
Hashes map string fields to string values under one key - ideal for storing records:
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
# store a user record
r.hset("user:1", mapping={
"username": "alice",
"email": "alice@example.com",
"role": "admin",
})
# read individual field
print(r.hget("user:1", "username")) # alice
# read all fields
user = r.hgetall("user:1")
print(user) # {'username': 'alice', 'email': '...', 'role': 'admin'}
# update one field
r.hset("user:1", "role", "editor")
# delete a field
r.hdel("user:1", "role")
# check field existence
print(r.hexists("user:1", "email")) # TrueList as a queue
Redis lists support O(1) push and pop on both ends - a natural fit for task queues:
import redis
import json
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
QUEUE = "jobs:email"
def enqueue(job: dict) -> None:
r.rpush(QUEUE, json.dumps(job))
def dequeue(timeout: int = 5) -> dict | None:
result = r.blpop(QUEUE, timeout=timeout)
if result:
_, raw = result
return json.loads(raw)
return None
# producer
enqueue({"to": "alice@example.com", "subject": "Welcome"})
enqueue({"to": "bob@example.com", "subject": "Reset password"})
# consumer
while True:
job = dequeue(timeout=2)
if job is None:
print("Queue empty.")
break
print(f"Sending email to {job['to']}: {job['subject']}")blpop blocks until an item arrives or the timeout expires - more efficient than polling.
Pub/Sub pattern
Use pub/sub for broadcasting events to multiple subscribers:
# publisher.py
import redis
import json
import time
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
events = [
{"type": "order.created", "order_id": 101},
{"type": "order.shipped", "order_id": 101},
{"type": "order.delivered", "order_id": 101},
]
for event in events:
r.publish("orders", json.dumps(event))
print(f"Published: {event['type']}")
time.sleep(1)# subscriber.py
import redis
import json
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe("orders")
print("Listening for order events...")
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
print(f"Received: {event['type']} for order {event['order_id']}")Run subscriber in one terminal and publisher in another:
bunpy subscriber.py &
bunpy publisher.pyRate limiting
A sliding-window rate limiter using a sorted set. Each request logs a timestamp; expired entries are pruned on every check:
import redis
import time
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
def is_allowed(user_id: str, limit: int = 10, window_seconds: int = 60) -> bool:
key = f"ratelimit:{user_id}"
now = time.time()
window_start = now - window_seconds
pipe = r.pipeline()
pipe.zremrangebyscore(key, "-inf", window_start) # remove old timestamps
pipe.zadd(key, {str(now): now}) # add current request
pipe.zcard(key) # count requests in window
pipe.expire(key, window_seconds)
results = pipe.execute()
request_count = results[2]
return request_count <= limit
# simulate 15 requests from user "42"
for i in range(15):
allowed = is_allowed("42", limit=10, window_seconds=60)
print(f"Request {i+1}: {'OK' if allowed else 'RATE LIMITED'}")The pipeline batches all four commands into one round trip, keeping the check atomic enough for most applications. For strict atomicity, use a Lua script with r.eval().
API response caching
Cache expensive API calls with a simple wrapper:
import redis
import httpx
import json
import hashlib
import os
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
def cached_get(url: str, params: dict | None = None, ttl: int = 300) -> dict:
cache_key = "cache:" + hashlib.sha256(f"{url}:{params}".encode()).hexdigest()
cached = r.get(cache_key)
if cached:
print(f"Cache HIT for {url}")
return json.loads(cached)
print(f"Cache MISS for {url}")
response = httpx.get(url, params=params, timeout=10.0)
response.raise_for_status()
data = response.json()
r.set(cache_key, json.dumps(data), ex=ttl)
return data
# first call hits the network; second call returns from Redis
data1 = cached_get(
"https://api.github.com/repos/python/cpython",
headers={"Accept": "application/vnd.github+json"},
)
data2 = cached_get(
"https://api.github.com/repos/python/cpython",
headers={"Accept": "application/vnd.github+json"},
)
print(data1["stargazers_count"])Session storage pattern
Store user sessions as hashes with an expiry. This is the same approach used by Flask-Session, Django’s Redis cache backend, and most production web frameworks:
import redis
import uuid
import json
import time
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
SESSION_TTL = 3600 # 1 hour
def create_session(user_id: int, metadata: dict | None = None) -> str:
session_id = str(uuid.uuid4())
key = f"session:{session_id}"
data = {
"user_id": str(user_id),
"created_at": str(time.time()),
**(metadata or {}),
}
r.hset(key, mapping=data)
r.expire(key, SESSION_TTL)
return session_id
def get_session(session_id: str) -> dict | None:
key = f"session:{session_id}"
data = r.hgetall(key)
if not data:
return None
r.expire(key, SESSION_TTL) # sliding expiry: refresh TTL on access
return data
def delete_session(session_id: str) -> None:
r.delete(f"session:{session_id}")
# create a session after login
sid = create_session(user_id=42, metadata={"ip": "192.168.1.1", "ua": "Mozilla/5.0"})
print(f"Session created: {sid}")
# retrieve on subsequent requests
session = get_session(sid)
print(f"User: {session['user_id']}, IP: {session['ip']}")
# destroy on logout
delete_session(sid)
print(f"Session {sid} deleted. Exists: {bool(get_session(sid))}")Using a connection pool
For long-running applications, create a pool once at startup rather than reconnecting on every request:
import redis
pool = redis.ConnectionPool(
host="localhost",
port=6379,
db=0,
decode_responses=True,
max_connections=20,
)
def get_redis() -> redis.Redis:
return redis.Redis(connection_pool=pool)
r = get_redis()
r.set("app:status", "running")
print(r.get("app:status"))Run the examples
bunpy rate_limiter.py
bunpy cache.py
bunpy session.pyRedis pipelines batch multiple commands into one network round trip - always use them when you need to fire several commands together. For operations that must be atomic, reach for Lua scripts via r.eval().