Leader Election in Distributed Systems
Leader election is a fundamental problem in distributed computing where a group of nodes must agree on a single node to act as the coordinator or leader. The leader takes responsibility for tasks that require centralized control, such as assigning work, coordinating writes, or managing cluster state. When the leader fails, the remaining nodes must elect a new leader quickly and correctly.
Why Leader Election Matters
- Database replication: One node accepts writes and replicates to followers (e.g., MySQL primary, MongoDB primary)
- Job scheduling: Only one scheduler should assign tasks to prevent duplicates
- Distributed locks: A lock manager coordinates access (related to distributed locks)
- Consensus protocols: Raft and Paxos elect leaders to drive agreement
- Message queuing: Kafka partition leaders handle reads and writes for their partitions
The Bully Algorithm
The bully algorithm is the simplest leader election algorithm. Each node has a unique numeric ID. The node with the highest ID becomes the leader. When a node detects the leader has failed, it initiates an election.
class BullyElection:
def __init__(self, node_id, all_nodes):
self.node_id = node_id
self.all_nodes = all_nodes # {id: address}
self.leader = None
self.is_election_running = False
def start_election(self):
self.is_election_running = True
higher_nodes = [nid for nid in self.all_nodes
if nid > self.node_id]
if not higher_nodes:
self.declare_victory()
return
responses = []
for nid in higher_nodes:
try:
resp = send_message(self.all_nodes[nid],
{"type": "ELECTION",
"from": self.node_id})
if resp.get("type") == "ALIVE":
responses.append(nid)
except ConnectionError:
pass
if not responses:
self.declare_victory()
# else: wait for a VICTORY message from a higher node
def declare_victory(self):
self.leader = self.node_id
for nid, addr in self.all_nodes.items():
if nid != self.node_id:
send_message(addr, {"type": "VICTORY",
"leader": self.node_id})
def on_message(self, msg):
if msg["type"] == "ELECTION":
send_response({"type": "ALIVE"})
self.start_election()
elif msg["type"] == "VICTORY":
self.leader = msg["leader"]
self.is_election_running = False
Raft Consensus Protocol
Raft is the most widely adopted consensus protocol. It simplifies the Paxos algorithm into three sub-problems: leader election, log replication, and safety. Raft is used by etcd, CockroachDB, and TiKV.
Raft Node States:
| State | Description | Transitions |
|---|---|---|
| Follower | Passive; responds to leader and candidates | Election timeout → Candidate |
| Candidate | Requests votes from other nodes | Majority votes → Leader; higher term seen → Follower |
| Leader | Handles all client requests, replicates log | Higher term seen → Follower |
class RaftNode:
def __init__(self, node_id, peers):
self.node_id = node_id
self.peers = peers
self.state = "follower"
self.current_term = 0
self.voted_for = None
self.election_timeout = random.uniform(150, 300) # ms
def start_election(self):
self.current_term += 1
self.state = "candidate"
self.voted_for = self.node_id
votes_received = 1 # vote for self
for peer in self.peers:
try:
response = send_request_vote(peer, {
"term": self.current_term,
"candidate_id": self.node_id,
"last_log_index": self.last_log_index(),
"last_log_term": self.last_log_term()
})
if response["vote_granted"]:
votes_received += 1
except ConnectionError:
pass
majority = (len(self.peers) + 1) // 2 + 1
if votes_received >= majority:
self.become_leader()
else:
self.state = "follower"
def become_leader(self):
self.state = "leader"
# Send heartbeats to prevent new elections
self.send_heartbeats()
def on_request_vote(self, candidate_term, candidate_id,
last_log_index, last_log_term):
if candidate_term < self.current_term:
return {"vote_granted": False, "term": self.current_term}
if candidate_term > self.current_term:
self.current_term = candidate_term
self.state = "follower"
self.voted_for = None
if self.voted_for in (None, candidate_id):
if self.is_log_up_to_date(last_log_index, last_log_term):
self.voted_for = candidate_id
return {"vote_granted": True, "term": self.current_term}
return {"vote_granted": False, "term": self.current_term}
ZooKeeper Leader Election
ZooKeeper provides a reliable leader election recipe using ephemeral sequential znodes:
from kazoo.client import KazooClient
from kazoo.recipe.election import Election
zk = KazooClient(hosts="zk1:2181,zk2:2181,zk3:2181")
zk.start()
election = Election(zk, "/election/scheduler")
def leader_func():
print("I am the leader! Starting scheduler...")
run_scheduler()
# This blocks until this node becomes leader
election.run(leader_func)
# If the leader dies, its ephemeral znode is deleted
# and the next candidate automatically becomes leader
The Split-Brain Problem
Split-brain occurs when a network partition causes two parts of a cluster to independently elect their own leader. Both leaders accept writes, causing data divergence and potential corruption.
Solutions:
| Solution | How It Works | Trade-off |
|---|---|---|
| Quorum-based | Leader needs majority of nodes to operate | Minority partition becomes unavailable |
| Fencing | Old leader is fenced off from shared resources | Requires fencing mechanism (STONITH) |
| Epoch/Term numbers | Higher epoch leader takes precedence | Requires all nodes to check epoch |
Real-World Examples
- Apache Kafka: Each partition has a leader broker elected via ZooKeeper (or KRaft in newer versions). Only the leader handles reads and writes for that partition.
- etcd: Uses Raft for leader election. The leader processes all write requests and replicates them to followers.
- Redis Sentinel: Monitors Redis masters and uses a voting mechanism to elect a new master when the current one fails.
- Elasticsearch: Uses a Zen Discovery protocol (now replaced with a Raft-based approach in 7.x+) for master node election.
Algorithm Comparison
| Algorithm | Complexity | Fault Tolerance | Used By |
|---|---|---|---|
| Bully | Simple | Low | Educational, simple systems |
| Raft | Moderate | High | etcd, CockroachDB, TiKV, Consul |
| Paxos | High | High | Google Chubby, Spanner |
| ZAB | Moderate | High | Apache ZooKeeper |
Leader election connects closely to quorum systems for achieving agreement and consistent hashing for distributed routing. See our 2PC guide for coordinated commit protocols.
Frequently Asked Questions
Q: What happens if the leader fails during an operation?
In Raft, uncommitted entries from the failed leader may be lost. Only entries replicated to a majority are committed and guaranteed to survive leader failure. The new leader replays its log to bring followers up to date. Clients should retry uncommitted operations.
Q: How fast is Raft leader election?
Raft election typically completes within 150-300ms (the election timeout). The randomized timeout prevents split votes, where no candidate gets a majority. In practice, leader failover in etcd takes around 1-2 seconds including detection and election.
Q: Can I have multiple leaders for different responsibilities?
Yes, this is called partitioned leadership. Kafka assigns a different leader per partition. This improves throughput since leadership work is distributed. However, each individual partition still has exactly one leader at a time.
Q: How does Raft handle network partitions?
The partition with the majority of nodes continues to operate normally by electing a new leader if needed. The minority partition cannot elect a leader (cannot get majority votes) and becomes unavailable for writes. When the partition heals, the minority nodes rejoin and sync with the majority leader. This is the safe behavior that prevents split-brain.