Skip to main content
📈Scalability

Throughput Optimization: Maximizing System Capacity

Throughput is the number of operations your system can process per unit of time — requests per second, messages per second, or transactions per minute. Whi...

📖 4 min read

Throughput Optimization: Maximizing System Capacity

Throughput is the number of operations your system can process per unit of time — requests per second, messages per second, or transactions per minute. While latency measures how fast a single operation completes, throughput measures how many operations your system handles concurrently. Optimizing throughput is essential for systems that process large volumes of data or serve many concurrent users.

Throughput vs Latency

Aspect Latency Throughput
Measures Time per operation Operations per time
Optimize by Making each op faster Processing more ops concurrently
Trade-off Often conflicts with throughput (batching) Often conflicts with latency

Batch Processing Patterns

Processing items in batches amortizes overhead (network round trips, disk seeks, connection setup) across many items, dramatically improving throughput at the cost of slightly higher per-item latency.

import time
from collections import deque
import threading

class BatchProcessor:
    def __init__(self, batch_size=100, flush_interval=1.0, process_fn=None):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.process_fn = process_fn
        self.buffer = deque()
        self.lock = threading.Lock()
        self.last_flush = time.monotonic()

        # Background flush thread
        self.flush_thread = threading.Thread(target=self._flush_loop,
                                            daemon=True)
        self.flush_thread.start()

    def add(self, item):
        with self.lock:
            self.buffer.append(item)
            if len(self.buffer) >= self.batch_size:
                self._flush()

    def _flush_loop(self):
        while True:
            time.sleep(self.flush_interval)
            with self.lock:
                if self.buffer:
                    self._flush()

    def _flush(self):
        batch = list(self.buffer)
        self.buffer.clear()
        if batch:
            self.process_fn(batch)

# Database batch insert: 100x faster than individual inserts
def batch_insert(records):
    values = [(r["name"], r["email"]) for r in records]
    db.executemany(
        "INSERT INTO users (name, email) VALUES (%s, %s)",
        values
    )

processor = BatchProcessor(batch_size=100, flush_interval=0.5,
                          process_fn=batch_insert)

Async I/O and Event Loops

import asyncio
import aiohttp
import aiomysql

async def high_throughput_processor():
    # Connection pools for async I/O
    db_pool = await aiomysql.create_pool(
        host="db.example.com", db="myapp",
        minsize=10, maxsize=50
    )
    http_session = aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(limit=100)
    )

    async def process_item(item):
        # These happen concurrently, not sequentially
        async with db_pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    "UPDATE orders SET status = %s WHERE id = %s",
                    ["processed", item["order_id"]]
                )

        async with http_session.post(
            "https://webhook.example.com/notify",
            json={"order_id": item["order_id"]}
        ) as resp:
            return await resp.json()

    # Process 1000 items concurrently with semaphore limit
    semaphore = asyncio.Semaphore(50)

    async def bounded_process(item):
        async with semaphore:
            return await process_item(item)

    items = await fetch_pending_items()
    results = await asyncio.gather(
        *[bounded_process(item) for item in items]
    )
    return results

Connection Pooling Deep Dive

# PgBouncer configuration for high-throughput PostgreSQL
[databases]
myapp = host=db.internal port=5432 dbname=myapp

[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
auth_type = md5
pool_mode = transaction   # Return connection after each transaction
max_client_conn = 1000    # Accept up to 1000 app connections
default_pool_size = 25    # Only 25 actual DB connections
reserve_pool_size = 5
reserve_pool_timeout = 3
server_idle_timeout = 60

Queue-Based Processing

import redis
import json
import multiprocessing

class WorkQueue:
    def __init__(self, queue_name, redis_url="redis://localhost"):
        self.queue_name = queue_name
        self.redis = redis.from_url(redis_url)

    def enqueue(self, task):
        self.redis.rpush(self.queue_name, json.dumps(task))

    def dequeue(self, timeout=5):
        result = self.redis.blpop(self.queue_name, timeout=timeout)
        if result:
            return json.loads(result[1])
        return None

    def size(self):
        return self.redis.llen(self.queue_name)

class WorkerPool:
    def __init__(self, queue, process_fn, num_workers=8):
        self.queue = queue
        self.process_fn = process_fn
        self.num_workers = num_workers

    def start(self):
        workers = []
        for i in range(self.num_workers):
            p = multiprocessing.Process(target=self._worker_loop,
                                       args=(i,))
            p.start()
            workers.append(p)
        return workers

    def _worker_loop(self, worker_id):
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                try:
                    self.process_fn(task)
                except Exception as e:
                    self.queue.enqueue({**task, "retry_count":
                                       task.get("retry_count", 0) + 1})

Fan-Out Pattern

# Fan-out: One message triggers processing on multiple workers
class FanOutProcessor:
    def __init__(self, queues):
        self.queues = queues

    def fan_out(self, message):
        for queue in self.queues:
            queue.enqueue(message)

# Example: Image upload triggers multiple processing pipelines
image_queues = [
    WorkQueue("thumbnail-generation"),
    WorkQueue("content-moderation"),
    WorkQueue("metadata-extraction"),
    WorkQueue("search-indexing")
]

fan_out = FanOutProcessor(image_queues)
fan_out.fan_out({"image_id": "img123", "url": "s3://bucket/img123.jpg"})

Throughput Optimization Checklist

Technique Throughput Improvement Complexity
Batch Processing 10-100x for DB operations Low
Connection Pooling 5-20x Low
Async I/O 5-50x for I/O-bound work Medium
Horizontal Scaling Linear (add N workers = Nx throughput) Medium
Queue-Based Architecture Decouples producers/consumers Medium

Throughput optimization connects to horizontal scaling for adding capacity, performance optimization for individual request speed, and high traffic systems for handling extreme loads.

Frequently Asked Questions

Q: How do I measure throughput accurately?

Use load testing tools like k6 or Locust to measure sustained throughput under various conditions. Monitor requests per second (RPS) at the load balancer level for the true system throughput. Track throughput alongside latency percentiles — throughput gains that degrade p99 latency may not be worthwhile.

Q: When should I use batch processing vs real-time processing?

Use batch processing when latency tolerance is high (analytics, reports, bulk imports) and throughput matters most. Use real-time processing when users expect immediate results (API responses, chat messages). Many systems use both: real-time for user-facing operations and batch for background processing.

Q: How do I prevent queue backlog from growing unbounded?

Set maximum queue depth and reject new items when full (back-pressure). Implement dead-letter queues for failed items. Monitor queue depth and set alerts. Use auto-scaling to add consumers when queue depth grows. Consider message TTL to expire stale items automatically.

Related Articles