akazuko

Leader election

Adding leader election to your application using etcd.

Tags: tech, consensus


This was previously published by me at medium.

In a distributed system, leader election can be critical to coordinate work. In this post, I will quickly show how you can use etcd, a consistent key-value store which uses RAFT protocol for consensus and is used by famous projects like Kubernetes etc, to add leader election to your distributed application. First, we need a c++ etcd v3 client. We will use GitHub — etcd-cpp-apiv3/etcd-cpp-apiv3 (follow the instructions on the ReadMe to build).

Approach

We will use the add and watch api provided by etcd.

Step 1

Each app instance takes lease with a keep alive time and using the lease ID tries to add a key value pair where key is “election key for our distributed application” and value is “unique ID of the app instance”. The writes in etcd are guaranteed to be atomic. Thus, the very first instance which is able to write the key with the lease becomes the leader. Any other instance which started the request to add key to become a leader in parallel will see that the key already exists and will take the existing value of key as the leader ID.

Now, in case the leader app instance dies due to any failure, the lease will expire and the election key will be deleted by etcd. We want at this moment to start a new election. This is where we will use watch api from etcd.

Step 2

Each app instance creates a watcher instance on the election key and reacts to “delete” action on the key to trigger the election again. In case the leader instance is slow and keep alive update is not sent in time and the lease expires even though the leader app instance has not crashed yet, we want to make sure that this also informs the current leader of the new election contest result. Thus, app also reacts to “set” event on the election key to read the new elected leader ID.

Note: The following section assumes that etcd is already up & running.

Let’s write some code

class MyApp
{
public:
    MyApp(const char* etcdConnectionString, std::string id);

    ~MyApp();

    bool isLeader();

    std::string& GetID();

    std::shared_ptr<etcd::KeepAlive> GetKeepAlive();

    void StartElection();

    void WatchForLeaderChange();

private:
    void WatchForLeaderChangeCallback(etcd::Response response)

    std::string m_id;
    std::string m_leaderId;
    std::shared_ptr<etcd::Client> m_etcd;
    std::shared_ptr<etcd::KeepAlive> m_keepalive;
    std::unique_ptr<etcd::Watcher> m_watcher;
};

Our aim is to implement StartElection and WatchForLeaderChange member functions. In order to do so, we require an etcd client, keepalive instance which takes care of refreshing lease based on our keep alive time and a watcher instance. As per etcd client documentation, watcher instance takes a callback. In our case, we will use WatchForLeaderChangeCallback member function for that purpose. Let’s fill these functions in the above class now.

using namespace std::chrono_literals;

std::string ELECTIONKEY = "MyApp/leader";

class MyApp
{
    public:
        MyApp(const char* etcdConnectionString, std::string id)
        : m_id(std::move(id)), m_leaderId("")
        {
            m_etcd = std::make_shared<etcd::Client>(etcdConnectionString);
        }

        ~MyApp()
        {
            // we should cancel the watcher first so that deletion which happens
            // as part of keepalive cancel does not trigger the watcher callback
            if (m_watcher.get() != nullptr)
            {
                m_watcher->Cancel();
            }

            if (m_keepalive.get() != nullptr)
            {
                m_keepalive->Cancel();
            }
        }

        bool isLeader()
        {
            return m_id == m_leaderId;
        }

        std::string& GetID()
        {
            return m_id;
        }

        std::shared_ptr<etcd::KeepAlive> GetKeepAlive()
        {
            if (m_keepalive.get() == nullptr)
            {
                // 10 is the lease keep alive time
                m_keepalive = m_etcd->leasekeepalive(10).get();
            }
            return m_keepalive;
        }

        void StartElection()
        {
            int numberOfTries = 10;
            while (numberOfTries--)
            {
                pplx::task<etcd::Response> response_task = m_etcd->add(ELECTIONKEY, GetID(), GetKeepAlive()->Lease());
                try
                {
                    etcd::Response response = response_task.get();

                    if (response.is_ok())
                    {
                        // if able to add the key that means this is the new leader
                        std::cout << "I am the leader" << std::endl;
                    }
                    
                    // capture the current leader (stored as value of the ELECTIONKEY)
                    // in case of is_ok() -> False, this returns the existing key's value
                    m_leaderId = response.value().as_string();
                    std::cout << "Rx leader: " << m_leaderId << std::endl;
                    break;
                }
                catch (std::exception const& ex)
                {
                    std::cerr << ex.what();
                    if (numberOfTries == 0)
                        throw ex;
                }
            }
        }

        void WatchForLeaderChange()
        {
            auto callback = [&](etcd::Response response) { this->WatchForLeaderChangeCallback(response); };
            m_watcher = std::make_unique<etcd::Watcher>(*m_etcd, ELECTIONKEY, callback);
        }

    private:
        void WatchForLeaderChangeCallback(etcd::Response response)
        {
            if (response.action() == "delete")
            {
                m_leaderId = "";
                StartElection();
            }
            else if (response.action() == "set")
            {
                m_leaderId = response.value().as_string();
            }
        }

        std::string m_id;
        std::string m_leaderId;
        std::shared_ptr<etcd::Client> m_etcd;
        std::shared_ptr<etcd::KeepAlive> m_keepalive;
        std::unique_ptr<etcd::Watcher> m_watcher;
};

Let’s look at the final code with main function.

#include <iostream>
#include <memory>
#include <chrono>

#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
#include "etcd/Watcher.hpp"

using namespace std::chrono_literals;

std::string ELECTIONKEY = "MyApp/leader";

class MyApp { /* ...class from above snippet */ };

int main(int argc, char ** argv)
{
    auto appId = std::string(argv[1]);
    MyApp app("127.0.0.1:2379" /* etcdConnectionString */, appId);

    // Start election and begin watching for any leader change
    app.StartElection();
    app.WatchForLeaderChange();

    // to keep the instance alive for sometime and once client exits
    // any other alive instances should contest elections once the
    // lease on the election key set by previous leader expires
    // usually, your application's request processing thread starts here
    std::this_thread::sleep_for(120s);
    return 0;
}

Compile and run.

# compile
g++ -std=c++14 -I /usr/local/include /usr/local/lib/libetcd-cpp-api.dylib MyApp.cpp -o MyApp

# run instance 1
./MyApp id1

# in another terminal, run instance 2
./MyApp id2

# in another terminal, run instance 3
./MyApp id3

Since we started instance 1 first, it becomes the leader. Try stopping the first instance, after 10 seconds (our lease keep alive time), etcd will delete the key and a delete action would be processed by watcher per instance triggering a new election.

Result: Leader election is enabled for our distributed application.

Note: Leader unavailability post current leader crash till new leader is elected is sensitive to keep alive time for the lease on election key. Tune it accordingly for your application.