Throughput Optimization: Maximizing System Capacity
Throughput is the number of operations your system can process per unit of time — requests per second, messages per second, or transactions per minute. While latency measures how fast a single operation completes, throughput measures how many operations your system handles concurrently. Optimizing throughput is essential for systems that process large volumes of data or serve many concurrent users.
Throughput vs Latency
| Aspect | Latency | Throughput |
|---|---|---|
| Measures | Time per operation | Operations per time |
| Optimize by | Making each op faster | Processing more ops concurrently |
| Trade-off | Often conflicts with throughput (batching) | Often conflicts with latency |
Batch Processing Patterns
Processing items in batches amortizes overhead (network round trips, disk seeks, connection setup) across many items, dramatically improving throughput at the cost of slightly higher per-item latency.
import time
from collections import deque
import threading
class BatchProcessor:
def __init__(self, batch_size=100, flush_interval=1.0, process_fn=None):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.process_fn = process_fn
self.buffer = deque()
self.lock = threading.Lock()
self.last_flush = time.monotonic()
# Background flush thread
self.flush_thread = threading.Thread(target=self._flush_loop,
daemon=True)
self.flush_thread.start()
def add(self, item):
with self.lock:
self.buffer.append(item)
if len(self.buffer) >= self.batch_size:
self._flush()
def _flush_loop(self):
while True:
time.sleep(self.flush_interval)
with self.lock:
if self.buffer:
self._flush()
def _flush(self):
batch = list(self.buffer)
self.buffer.clear()
if batch:
self.process_fn(batch)
# Database batch insert: 100x faster than individual inserts
def batch_insert(records):
values = [(r["name"], r["email"]) for r in records]
db.executemany(
"INSERT INTO users (name, email) VALUES (%s, %s)",
values
)
processor = BatchProcessor(batch_size=100, flush_interval=0.5,
process_fn=batch_insert)
Async I/O and Event Loops
import asyncio
import aiohttp
import aiomysql
async def high_throughput_processor():
# Connection pools for async I/O
db_pool = await aiomysql.create_pool(
host="db.example.com", db="myapp",
minsize=10, maxsize=50
)
http_session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100)
)
async def process_item(item):
# These happen concurrently, not sequentially
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"UPDATE orders SET status = %s WHERE id = %s",
["processed", item["order_id"]]
)
async with http_session.post(
"https://webhook.example.com/notify",
json={"order_id": item["order_id"]}
) as resp:
return await resp.json()
# Process 1000 items concurrently with semaphore limit
semaphore = asyncio.Semaphore(50)
async def bounded_process(item):
async with semaphore:
return await process_item(item)
items = await fetch_pending_items()
results = await asyncio.gather(
*[bounded_process(item) for item in items]
)
return results
Connection Pooling Deep Dive
# PgBouncer configuration for high-throughput PostgreSQL
[databases]
myapp = host=db.internal port=5432 dbname=myapp
[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
auth_type = md5
pool_mode = transaction # Return connection after each transaction
max_client_conn = 1000 # Accept up to 1000 app connections
default_pool_size = 25 # Only 25 actual DB connections
reserve_pool_size = 5
reserve_pool_timeout = 3
server_idle_timeout = 60
Queue-Based Processing
import redis
import json
import multiprocessing
class WorkQueue:
def __init__(self, queue_name, redis_url="redis://localhost"):
self.queue_name = queue_name
self.redis = redis.from_url(redis_url)
def enqueue(self, task):
self.redis.rpush(self.queue_name, json.dumps(task))
def dequeue(self, timeout=5):
result = self.redis.blpop(self.queue_name, timeout=timeout)
if result:
return json.loads(result[1])
return None
def size(self):
return self.redis.llen(self.queue_name)
class WorkerPool:
def __init__(self, queue, process_fn, num_workers=8):
self.queue = queue
self.process_fn = process_fn
self.num_workers = num_workers
def start(self):
workers = []
for i in range(self.num_workers):
p = multiprocessing.Process(target=self._worker_loop,
args=(i,))
p.start()
workers.append(p)
return workers
def _worker_loop(self, worker_id):
while True:
task = self.queue.dequeue(timeout=5)
if task:
try:
self.process_fn(task)
except Exception as e:
self.queue.enqueue({**task, "retry_count":
task.get("retry_count", 0) + 1})
Fan-Out Pattern
# Fan-out: One message triggers processing on multiple workers
class FanOutProcessor:
def __init__(self, queues):
self.queues = queues
def fan_out(self, message):
for queue in self.queues:
queue.enqueue(message)
# Example: Image upload triggers multiple processing pipelines
image_queues = [
WorkQueue("thumbnail-generation"),
WorkQueue("content-moderation"),
WorkQueue("metadata-extraction"),
WorkQueue("search-indexing")
]
fan_out = FanOutProcessor(image_queues)
fan_out.fan_out({"image_id": "img123", "url": "s3://bucket/img123.jpg"})
Throughput Optimization Checklist
| Technique | Throughput Improvement | Complexity |
|---|---|---|
| Batch Processing | 10-100x for DB operations | Low |
| Connection Pooling | 5-20x | Low |
| Async I/O | 5-50x for I/O-bound work | Medium |
| Horizontal Scaling | Linear (add N workers = Nx throughput) | Medium |
| Queue-Based Architecture | Decouples producers/consumers | Medium |
Throughput optimization connects to horizontal scaling for adding capacity, performance optimization for individual request speed, and high traffic systems for handling extreme loads.
Frequently Asked Questions
Q: How do I measure throughput accurately?
Use load testing tools like k6 or Locust to measure sustained throughput under various conditions. Monitor requests per second (RPS) at the load balancer level for the true system throughput. Track throughput alongside latency percentiles — throughput gains that degrade p99 latency may not be worthwhile.
Q: When should I use batch processing vs real-time processing?
Use batch processing when latency tolerance is high (analytics, reports, bulk imports) and throughput matters most. Use real-time processing when users expect immediate results (API responses, chat messages). Many systems use both: real-time for user-facing operations and batch for background processing.
Q: How do I prevent queue backlog from growing unbounded?
Set maximum queue depth and reject new items when full (back-pressure). Implement dead-letter queues for failed items. Monitor queue depth and set alerts. Use auto-scaling to add consumers when queue depth grows. Consider message TTL to expire stale items automatically.