Build your own Redis (3/5) - Redis Replication

Build your own Redis (3/5) - Redis Replication
Photo by Kevin Seguin / Unsplash

Following our exploration of Redis's persistence mechanisms, we now dive into one of Redis's most critical architectural features: replication. Replication enables data redundancy, read scaling, and high availability by maintaining copies of data across multiple Redis instances. Let's explore how Redis achieves this through sophisticated master-replica synchronization.

Imagine you're building a distributed system that needs to serve thousands of read requests while maintaining data consistency across multiple servers. You face a fundamental challenge: how do you ensure that multiple copies of your data stay synchronized without sacrificing performance? This is where Redis's elegant replication architecture comes into play.

Redis uses a master-replica replication model where one master instance can have multiple replica instances. Replicas connect to the master and receive a continuous stream of updates, maintaining exact copies of the master's data. This approach provides both read scaling (clients can read from replicas) and data redundancy (replicas can take over if the master fails).

Replication Handshake

The replication process begins with an intricate handshake sequence between replica and master. Think of it as a formal introduction where both parties establish their capabilities and agree on communication protocols.

The handshake flow involves these key steps:

Here's the handshake implementation:

async def __ping_master_node(self, master_address: str) -> bool:
    master_host, master_port = master_address.split(" ")

    try:
        reader, writer = await asyncio.open_connection(
            master_host, int(master_port)
        )

        # Send PING
        response = await send_command(
            RESPArray(value=[RESPBulkString(value="PING")])
        )
        if not response or response != b"+PONG\r\n":
            return False

        # Send REPLCONF listening-port
        replconf_listening_port = RESPArray(value=[
            RESPBulkString(value="REPLCONF"),
            RESPBulkString(value="listening-port"),
            RESPBulkString(value=str(self.port)),
        ])
        response = await send_command(replconf_listening_port)
        if not response or response != b"+OK\r\n":
            return False

        # Send REPLCONF capa psync2
        replconf_capa_psync2 = RESPArray(value=[
            RESPBulkString(value="REPLCONF"),
            RESPBulkString(value="capa"),
            RESPBulkString(value="psync2"),
        ])
        response = await send_command(replconf_capa_psync2)
        if not response or response != b"+OK\r\n":
            return False

        # Send PSYNC command
        psync_command = RESPArray(value=[
            RESPBulkString(value="PSYNC"),
            RESPBulkString(value="?"),
            RESPBulkString(value="-1"),
        ])
        writer.write(psync_command.serialize())
        await writer.drain()

        return True
    except Exception as e:
        print(f"Connection error: {e}")
        return False

PSYNC: Partial Synchronization

Redis supports two synchronization modes: full synchronization and partial synchronization. The PSYNC command is the key to this flexibility.

  • Full Synchronization (PSYNC ? -1): The replica requests a complete snapshot of the master's data. The master responds with a FULLRESYNC message followed by the RDB file containing all data.
  • Partial Synchronization (PSYNC <replid> <offset>): The replica requests only the changes since a specific replication ID and offset. This is much more efficient for replicas that have been disconnected temporarily.

Here's how we handle PSYNC responses:

case RedisCommand.PSYNC:
    repl_id = data.value[1].value
    repl_offset = int(data.value[2].value)

    if repl_id != "?" or repl_offset != -1:
        raise NotImplementedError(
            "Only PSYNC with ? and -1 is supported"
        )

    # Send current dataset to replica
    rdb_content = self.__data_store.dump_to_rdb()
    await self.__send_data(
        writer,
        RESPSimpleString(
            value=f"FULLRESYNC {self.__repl_info.master_replid} {self.__repl_info.master_repl_offset}"
        ),
    )
    await self.__send_data(
        writer, RESPBytesLength(value=len(rdb_content))
    )
    await self.__send_data(writer, rdb_content)

Command Propagation

Once the initial synchronization is complete, the master continuously propagates write commands to all connected replicas. This ensures that replicas stay up-to-date with the master's data changes.

The propagation mechanism follows this flow:

The propagation implementation:

async def __propagate_to_slaves(self, data: RESPObject):
    failed_replicas = []

    for replica_writer in list(self.__replica_acks.keys()):
        if not replica_writer.is_closing():
            success = await self.__send_data(replica_writer, data)
            if not success:
                failed_replicas.append(replica_writer)
        else:
            failed_replicas.append(replica_writer)

    # Clean up failed replicas
    for failed_writer in failed_replicas:
        self.__replica_acks.pop(failed_writer, None)
        try:
            failed_writer.close()
            await failed_writer.wait_closed()
        except Exception as e:
            print(f"Error closing failed replica connection: {e}")

Replication Offsets and Acknowledgments

Redis maintains replication offsets to track the progress of each replica. The master assigns a unique offset to every write command, and replicas acknowledge the offsets they've received and processed. This mechanism enables:

  • Replication Lag Detection: Master can identify lagging replicas
  • Partial Resynchronization: Replicas can request only missing commands
  • Wait Command: Clients can wait for specific replicas to acknowledge writes

The offset tracking system works like this:

The WAIT command provides configurable durability guarantees:

case RedisCommand.WAIT:
    num_replicas = int(data.value[1].value)
    timeout_ms = int(data.value[2].value)

    client_offset = self.__repl_info.master_repl_offset
    start_time = time.monotonic()

    while True:
        # Check how many replicas have acknowledged our offset
        active_replicas = [
            (w, r, o) for w, (r, o) in self.__replica_acks.items()
            if not w.is_closing()
        ]
        acked_replicas = sum(
            1 for _, _, offset in active_replicas
            if offset >= client_offset
        )

        if acked_replicas >= num_replicas:
            return RESPInteger(value=acked_replicas)

        # Handle timeout
        if timeout_ms > 0:
            elapsed_ms = (time.monotonic() - start_time) * 1000
            if elapsed_ms >= timeout_ms:
                return RESPInteger(value=acked_replicas)

        # Request acknowledgments from lagging replicas
        for replica_writer, replica_reader, current_offset in active_replicas:
            if current_offset < client_offset:
                ack_cmd = RESPArray(value=[
                    RESPBulkString(value="REPLCONF"),
                    RESPBulkString(value="GETACK"),
                    RESPBulkString(value="*"),
                ])
                await self.__send_data(replica_writer, ack_cmd)

        await asyncio.sleep(0.1)

REPLCONF: Replication Configuration

The REPLCONF command manages various replication configuration options:

  • listening-port: Replica announces its listening port
  • capa: Replica announces capabilities (like psync2)
  • ack: Replica acknowledges processed commands
  • getack: Master requests acknowledgment status=

Here's the REPLCONF implementation:

case RedisCommand.REPLCONF:
    attr = data.value[1].value.lower()

    if self.__repl_info.role == RedisReplicationRole.MASTER:
        if attr.lower() == "listening-port":
            # Register new replica
            self.__replica_acks[writer] = (reader, 0)
            print(f"New connected replica: {writer.get_extra_info('peername')}")
        elif attr.lower() == "ack":
            # Replica acknowledging commands
            self.__replica_acks[writer] = (
                reader,
                int(data.value[2].value),
            )
        elif attr.lower() == "capa":
            # Replica capability announcement
            if data.value[2].value.lower() == "psync2":
                pass  # Handle PSYNC2 capability

        await self.__send_data(writer, RESPSimpleString(value="OK"))
    else:
        # Replica handling
        if attr.lower() == "getack":
            await self.__send_data(
                writer,
                RESPArray(value=[
                    RESPBulkString(value="REPLCONF"),
                    RESPBulkString(value="ACK"),
                    RESPBulkString(value=str(self.__repl_ack_offset)),
                ]),
            )

Master-Replica Roles

Redis instances can operate in three roles:

  • Master: Accepts writes, propagates to replicas
  • Replica/Slave: Receives updates from master, serves reads
  • Standalone: No replication configured

The role determination happens at startup based on configuration:

class RedisReplicationRole(StrEnum):
    MASTER = "master"
    SLAVE = "slave"
    REPLICA = "replica"

# In RedisServer.__init__
self.__repl_info = RedisReplicationInformation(
    role=RedisReplicationRole.MASTER
    if replicaof is None
    else RedisReplicationRole.SLAVE,
    connected_slaves=0,
    master_replid=self.__generate_master_replid(),
    master_repl_offset=0,
    # ... other fields
)

Continuous Synchronization

After initial synchronization, the master continuously streams write commands to replicas. Replicas process these commands in the same order as the master, ensuring consistency.

The replica's message handler processes incoming commands:

async def handle_master_message(
    self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
    try:
        while True:
            data = await reader.read(1024)
            if not data:
                break
            
            commands = self.resp_parser.parse(data)
            
            for cmd in commands:
                if isinstance(cmd, RESPSimpleString) and cmd.value == "PING":
                    self.__repl_ack_offset += len(data)
                elif cmd.type == RESPObjectType.ARRAY:
                    await self.handle_command(reader, writer, cmd, True)
                elif cmd.type == RESPObjectType.BULK_BYTES:
                    # Handle RDB data during initial sync
                    pass
    except Exception as e:
        print(f"Error handling master message: {str(e)}")
    finally:
        print("Closing connection to master")
        writer.close()
        await writer.wait_closed()

Replication Information

Redis provides detailed replication information through the INFO command, showing the current replication state, connected replicas, and synchronization offsets:

@dataclass
class RedisReplicationInformation:
    role: RedisReplicationRole
    connected_slaves: int
    master_replid: str
    master_repl_offset: int
    second_repl_offset: int
    repl_backlog_active: bool
    repl_backlog_size: int
    repl_backlog_first_byte_offset: int
    repl_backlog_histlen: int

    def serialize(self) -> RESPBulkString:
        return RESPBulkString(
            value=(
                f"role:{self.role}\n"
                f"connected_slaves:{self.connected_slaves}\n"
                f"master_replid:{self.master_replid}\n"
                f"master_repl_offset:{self.master_repl_offset}\n"
                f"second_repl_offset:{self.second_repl_offset}\n"
                f"repl_backlog_active:{1 if self.repl_backlog_active else 0}\n"
                f"repl_backlog_size:{self.repl_backlog_size}\n"
                f"repl_backlog_first_byte_offset:{self.repl_backlog_first_byte_offset}\n"
                f"repl_backlog_histlen:{self.repl_backlog_histlen}\n"
            )
        )

Fault Tolerance and Recovery

Redis replication includes robust fault handling mechanisms:

  • Automatic Reconnection: Replicas automatically reconnect to masters
  • Partial Resynchronization: Efficiently sync only missing data
  • Replica Promotion: Replicas can be promoted to master during failover
  • Backlog Management: Masters maintain replication backlogs for partial resync

The fault tolerance flow:

Performance Considerations

Redis replication is designed for high performance:

  • Asynchronous Replication: Write commands propagate asynchronously
  • Non-blocking Operations: Replication doesn't block master operations
  • Efficient Serialization: Uses RESP protocol for command transmission
  • Selective Replication: Only write commands are replicated

Lessons

Redis's replication architecture offers valuable insights into distributed systems design:

  • Handshake Protocol: The multi-stage handshake ensures compatibility and capability negotiation between master and replicas.
  • Offset-Based Tracking: Using offsets instead of timestamps provides precise tracking of replication progress and enables efficient partial resynchronization.
  • Asynchronous Design: Non-blocking replication ensures that master performance remains high even with many replicas.
  • Flexible Durability: The WAIT command allows applications to choose their preferred balance between performance and durability guarantees.

In the next part, we'll explore Redis's data structures, examining how Redis implements efficient in-memory data structures like strings, lists, sets, and streams, and how these structures enable the rich command set that makes Redis so versatile.

Subscribe to The Technician

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe