How we extended our Raft protocol and simplified its implementation.

ByMichal MaslankaonJune 16, 2022
Simplifying Redpanda Raft implementation

Introduction

Redpanda uses the well-known Raft distributed consensus algorithm as a foundation for the distributed log. In principle, the Raft protocol requires that the majority of participants agree on a decision before it is accepted as the final outcome. For Redpanda it means that an entry is successfully replicated if the majority of the nodes persisted that entry with the same term at the same place in the log.

Redpanda started with a basic implementation of Raft protocol. Initially it did not even support reconfigurations! Over the course of Redpanda’s platform development we encountered many challenges that ultimately forced us to extend the original Raft protocol so that we could make Redpanda scalable, reliable, and fast. Some of the custom extensions that exist in Redpanda Raft are:

  • priority based voting
  • prevotes
  • reconfiguration
  • learners support
  • out of band heartbeat generation
  • flush debouncing
  • support for Apache Kafka's ACKS

Any one of those mechanisms could be its own article, but in this one I’m going to focus on how we handle Kafka’s ACKS producer property.

Kafka ACKS Explained

Redpanda needs to deal with different consistency guarantees as specified by the Kafka protocol. Clients may use the ACKS property to allow users to trade consistency and durability for performance. The Kafka protocol specifies three settings for the ACKS property: 0,1, and -1(all). Those settings control when clients are sent an acknowledgement of the success of their produce requests:

  • ACKS=all means all in-sync replicas must store the request before returning to the client.
  • ACKS=1 means only a leader has to store the request before returning to the client.
  • ACKS=0 means the client considers the request successfully delivered immediately after the message is put onto the wire.

Kafka ACKS in Redpanda

To achieve compatibility with Kafka, Redpanda supports all Kafka’s ACKS levels (i.e. -1,1,0).

The ACKS=-1 or ACKS=all works the same as the original version of the Raft protocol. The replication call finishes when the requested entry is safely replicated to the majority of the replicas and is flushed (fsync’ed).

In the ACKS=1 or ACKS=0 replication finishes as soon as the write to the leader disk is finished but before it is flushed.

The entries' visibility for readers is also influenced by the replication consistency levels. For entries replicated with ACKS=-1, Redpanda makes them visible as soon as they are fsync’ed on the majority of nodes. On the other hand, entries with the ACKS=0,1 are made visible as soon as they are written to the follower's disk, but they are not required to be fsync’ed.

How Raft replication works in Redpanda

In the previous implementation, handling of ACKS=-1 and ACKS=0,1 used different approaches and code paths. ACKS=-1 replication requests went through the batching on the leader and then were dispatched to the followers. The Redpanda Raft implementation writes data to the leader disk and then dispatches append entries requests to the followers in parallel with the leader disk flush. In this case, the Raft implementation may have multiple requests in flight per follower (currently, the number of requests pending is limited by the per-follower semaphore).

ACKS=0,1 replication was implemented differently. Control was returned to the caller immediately after the data was written to the underlying log. On the next heartbeat, the follower was recognized to be behind the leader and was recovered. The recovery mechanism read a chunk of data and sent it to the follower in a loop (a single request at a time).

Why simplify Raft replication?

There were many reasons to simplify the Raft logic:

  • Simpler code and a unified approach makes the Raft implementation easier to maintain and optimize.
  • Without the simplification, mixing different consistency modes with high throughput may lead to spikes in latency.
  • The simplified and unified replication mechanism makes it easier to implement future Raft features like in-memory writes and remote leaders (where the leader for a partition is not co-located with data).

The simplification approach

Our simplification solution assumes that all supported ACKS values will be handled in the same way, with the use of replicate_batcher and replicate_entries_stm. This approach has several advantages over the previous approach to the ACKS=1,0 consistency level. Namely, this approach allows us to simplify and unify replication code, control back pressure and memory usage, and limit replication latency.

The simplified implementation of ACKS=-1 replication is robust and well-tested. No large code changes are required to use replicate_batcher and replicate_stm to handle ACKS=0,1 replication. The way that the replicate_stm handles append_entries requests, dispatching, and leader log writes will allow us to handle those requests in parallel in a future release.

Additionally, using replicate_stm prevents us from creating the append_entries requests twice. Batches coming from replicate_batcher are not written to the disk and then read again, but instead are shared to generate a leader disk write and follower append_entries requests. This approach significantly reduces the storage::batch_cache pressure and may also reduce disk usage. (In some clusters with limited memory cache, the hit ratio is not always 1.0, even if there are no reads other than the reads issued by replicate_stm.)

Backpressure

In the previous ACKS=-1 replication implementation, the backpressure was effectively propagated from the followers to the raft::replicate caller (the Kafka produce request handler). The backpressure propagation was based on the per-follower semaphore since the RPC layer lacked a backpressure propagation mechanism.

When replicate_entries_stm was unable to acquire follower dispatch units, it would wait, holding the replicate_batcher memory reservation lock so that it did not accept writes. When the replicate_batcher memory reservation semaphore was exhausted, it prevented the first stage of replicate_stages from finishing, preventing more requests from being processed.

The ACKS=0,1 mechanism lacked backpressure propagation. The leader could be a large and arbitrary number of entries ahead of followers. This could present a problem since the user had an impression that data was replicated when, in reality, the data was only stored on the leader.

In the simplified solution, we handle backpressure in the same way for all ACKS settings. This way, there will be a guarantee that followers can always keep up with the leader.

Memory usage

Memory usage control is handled in the same way it was in the previous implementation, where we will not finish the first stage of replication when the replicate_batcher memory semaphore is exhausted. This will prevent the Kafka layer from buffering an indefinite number of batches in pending replicate requests.

Replicate batcher memory units are released after the request is handed off to the socket, so that all the user space buffers can be freed. In the future, we may consider a per-shard semaphore that controls the total amount of memory used by all the replicate batchers.

Recovery STM coordination - finishing recovery

recovery_stm is started every time the leader identifies the follower as being behind. Since recovery_stm was used for both recovering the follower (when it was actually behind) and delivering follower data replicated with the ACKS=0,1, the recovery_stm implementation contains some optimization to minimize the ACKS=0,1 workload latency.

There are two main optimizations:

  • not stopping the recovery when the last replicate request ACKS are set to 0 or 1
  • triggering the next dispatch loop immediately after the leader disk append

These optimizations make the handling of the recovering follower more complex in other scenarios, such as leadership_transfer.

Since there was no coordination between the two paths that the follower append entry requests were sent from (i.e. replicate_stm and recovery_stm), those requests could be reordered and cause log truncations and redeliveries, which led to increased latency.

In the simplified solution, since we are no longer going to rely on the recovery_stm to deliver messages to the follower in normal conditions, the complicated handling of recovery finishing logic is no longer needed. When the follower is up to date with the leader, recovery_stm is simply not running.

The only situation that will require coordination is when the follower is no longer behind the leader, but replicate_stm skips dispatching requests to the follower as it is still recovering. This may lead to a situation in which the next leader append will happen before the recovery_stm receives the follower response for the last request (i.e. it cannot finish recovery because the log end has already advanced). This case may be handled by coordinating dispatching follower requests from recovery_stm and replicate_stm. We can introduce the last-sent offset to raft::follower_index_metadata to check whether recovery should finish, instead of the check being based only on the follower response. This way, the replicate_stm will not skip sending a request to the follower if it is the next request the follower is supposed to receive.

On the other hand, the recovery_stm will not try to send the same request again, but will finish, assuming the response will be successful.

Implementing Raft simplification

To implement the simplified solution, we pass the consistency_level parameter to the raft::replicate_batcher::replicate method. The raft::replicate_entries_stm::apply_on_leader method will return immediately after the successful leader_append. The full quorum replication round will be finished after the raft::replicate_entries_stm::wait_for_majority() returns. The replicate batcher, based on the cached request consistency_level, will decide when to release the request, either after apply_on_leader finishes, or wait_for_majority finishes.

As in the previous implementation, the raft::replicate_entries_stm will release batcher memory units after dispatching the follower request. This will allow propagating backpressure for both consistency levels.

Conclusion

Redpanda Raft implementation is optimized to achieve the best results in all scenarios. We introduced unique solutions (flush coalescing, follower back pressure propagation, and out-of-band heartbeats) to achieve the best possible performance in all the circumstances. Unifying code responsible for Raft replication allows us to further optimize replication, as all the optimizations are automatically applied to all supported consistency levels.

If you have feedback on the simplification process, or would like to contribute to ongoing improvements in Redpanda, please chime in on our Github discussions here. We also welcome you to join our Slack Community to talk directly with me!

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.