Skip to main content
🧠Advanced Topics

Leader Election in Distributed Systems

Leader election is a fundamental problem in distributed computing where a group of nodes must agree on a single node to act as the coordinator or leader. T...

📖 5 min read

Leader Election in Distributed Systems

Leader election is a fundamental problem in distributed computing where a group of nodes must agree on a single node to act as the coordinator or leader. The leader takes responsibility for tasks that require centralized control, such as assigning work, coordinating writes, or managing cluster state. When the leader fails, the remaining nodes must elect a new leader quickly and correctly.

Why Leader Election Matters

  • Database replication: One node accepts writes and replicates to followers (e.g., MySQL primary, MongoDB primary)
  • Job scheduling: Only one scheduler should assign tasks to prevent duplicates
  • Distributed locks: A lock manager coordinates access (related to distributed locks)
  • Consensus protocols: Raft and Paxos elect leaders to drive agreement
  • Message queuing: Kafka partition leaders handle reads and writes for their partitions

The Bully Algorithm

The bully algorithm is the simplest leader election algorithm. Each node has a unique numeric ID. The node with the highest ID becomes the leader. When a node detects the leader has failed, it initiates an election.

class BullyElection:
    def __init__(self, node_id, all_nodes):
        self.node_id = node_id
        self.all_nodes = all_nodes  # {id: address}
        self.leader = None
        self.is_election_running = False

    def start_election(self):
        self.is_election_running = True
        higher_nodes = [nid for nid in self.all_nodes
                       if nid > self.node_id]

        if not higher_nodes:
            self.declare_victory()
            return

        responses = []
        for nid in higher_nodes:
            try:
                resp = send_message(self.all_nodes[nid],
                                   {"type": "ELECTION",
                                    "from": self.node_id})
                if resp.get("type") == "ALIVE":
                    responses.append(nid)
            except ConnectionError:
                pass

        if not responses:
            self.declare_victory()
        # else: wait for a VICTORY message from a higher node

    def declare_victory(self):
        self.leader = self.node_id
        for nid, addr in self.all_nodes.items():
            if nid != self.node_id:
                send_message(addr, {"type": "VICTORY",
                                    "leader": self.node_id})

    def on_message(self, msg):
        if msg["type"] == "ELECTION":
            send_response({"type": "ALIVE"})
            self.start_election()
        elif msg["type"] == "VICTORY":
            self.leader = msg["leader"]
            self.is_election_running = False

Raft Consensus Protocol

Raft is the most widely adopted consensus protocol. It simplifies the Paxos algorithm into three sub-problems: leader election, log replication, and safety. Raft is used by etcd, CockroachDB, and TiKV.

Raft Node States:

State Description Transitions
Follower Passive; responds to leader and candidates Election timeout → Candidate
Candidate Requests votes from other nodes Majority votes → Leader; higher term seen → Follower
Leader Handles all client requests, replicates log Higher term seen → Follower
class RaftNode:
    def __init__(self, node_id, peers):
        self.node_id = node_id
        self.peers = peers
        self.state = "follower"
        self.current_term = 0
        self.voted_for = None
        self.election_timeout = random.uniform(150, 300)  # ms

    def start_election(self):
        self.current_term += 1
        self.state = "candidate"
        self.voted_for = self.node_id
        votes_received = 1  # vote for self

        for peer in self.peers:
            try:
                response = send_request_vote(peer, {
                    "term": self.current_term,
                    "candidate_id": self.node_id,
                    "last_log_index": self.last_log_index(),
                    "last_log_term": self.last_log_term()
                })
                if response["vote_granted"]:
                    votes_received += 1
            except ConnectionError:
                pass

        majority = (len(self.peers) + 1) // 2 + 1
        if votes_received >= majority:
            self.become_leader()
        else:
            self.state = "follower"

    def become_leader(self):
        self.state = "leader"
        # Send heartbeats to prevent new elections
        self.send_heartbeats()

    def on_request_vote(self, candidate_term, candidate_id,
                        last_log_index, last_log_term):
        if candidate_term < self.current_term:
            return {"vote_granted": False, "term": self.current_term}
        if candidate_term > self.current_term:
            self.current_term = candidate_term
            self.state = "follower"
            self.voted_for = None
        if self.voted_for in (None, candidate_id):
            if self.is_log_up_to_date(last_log_index, last_log_term):
                self.voted_for = candidate_id
                return {"vote_granted": True, "term": self.current_term}
        return {"vote_granted": False, "term": self.current_term}

ZooKeeper Leader Election

ZooKeeper provides a reliable leader election recipe using ephemeral sequential znodes:

from kazoo.client import KazooClient
from kazoo.recipe.election import Election

zk = KazooClient(hosts="zk1:2181,zk2:2181,zk3:2181")
zk.start()

election = Election(zk, "/election/scheduler")

def leader_func():
    print("I am the leader! Starting scheduler...")
    run_scheduler()

# This blocks until this node becomes leader
election.run(leader_func)

# If the leader dies, its ephemeral znode is deleted
# and the next candidate automatically becomes leader

The Split-Brain Problem

Split-brain occurs when a network partition causes two parts of a cluster to independently elect their own leader. Both leaders accept writes, causing data divergence and potential corruption.

Solutions:

Solution How It Works Trade-off
Quorum-based Leader needs majority of nodes to operate Minority partition becomes unavailable
Fencing Old leader is fenced off from shared resources Requires fencing mechanism (STONITH)
Epoch/Term numbers Higher epoch leader takes precedence Requires all nodes to check epoch

Real-World Examples

  • Apache Kafka: Each partition has a leader broker elected via ZooKeeper (or KRaft in newer versions). Only the leader handles reads and writes for that partition.
  • etcd: Uses Raft for leader election. The leader processes all write requests and replicates them to followers.
  • Redis Sentinel: Monitors Redis masters and uses a voting mechanism to elect a new master when the current one fails.
  • Elasticsearch: Uses a Zen Discovery protocol (now replaced with a Raft-based approach in 7.x+) for master node election.

Algorithm Comparison

Algorithm Complexity Fault Tolerance Used By
Bully Simple Low Educational, simple systems
Raft Moderate High etcd, CockroachDB, TiKV, Consul
Paxos High High Google Chubby, Spanner
ZAB Moderate High Apache ZooKeeper

Leader election connects closely to quorum systems for achieving agreement and consistent hashing for distributed routing. See our 2PC guide for coordinated commit protocols.

Frequently Asked Questions

Q: What happens if the leader fails during an operation?

In Raft, uncommitted entries from the failed leader may be lost. Only entries replicated to a majority are committed and guaranteed to survive leader failure. The new leader replays its log to bring followers up to date. Clients should retry uncommitted operations.

Q: How fast is Raft leader election?

Raft election typically completes within 150-300ms (the election timeout). The randomized timeout prevents split votes, where no candidate gets a majority. In practice, leader failover in etcd takes around 1-2 seconds including detection and election.

Q: Can I have multiple leaders for different responsibilities?

Yes, this is called partitioned leadership. Kafka assigns a different leader per partition. This improves throughput since leadership work is distributed. However, each individual partition still has exactly one leader at a time.

Q: How does Raft handle network partitions?

The partition with the majority of nodes continues to operate normally by electing a new leader if needed. The minority partition cannot elect a leader (cannot get majority votes) and becomes unavailable for writes. When the partition heals, the minority nodes rejoin and sync with the majority leader. This is the safe behavior that prevents split-brain.

Related Articles