A Zoo Needs A Keeper!

Architecting a distributed system is not a walk in the park; in fact, there are plethora of challenges and potential debacles your system can suffer from if you could not cater for them beforehand. To give you a flavor:

  • One of the most prominent issues is synchronizing configurations or computations consisting of 100x or even 1000x of nodes.

  • Failure management, especially when partial hiccups occur ex. a component goes down due to a network failure before receiving a message, how would the sender know if the message has been delivered, processed, or failed?

  • To scale your system, usually you spin up new nodes horizontally into the cluster to boost performance. How would you announce such changes in the cluster membership to all nodes and clients too?

  • Race condition is common in a distributed system ex. when two threads access a shared variable at the same time, and they try to change it simultaneously. The nature of the thread management (ex. swapping scheduling…etc.) in operating system is not predicable, you won’t know for sure the order in which the threads will attempt to access the shared data.

  • Deadlock is one great issue in such system; two processes could be holding back for each other, waiting for each one to release a resource the other needs next.

  • A cluster where the master goes down and you do not have enough node to preserve the quorum; you usually end up with split-brain syndrome and you need some external voter to elect a leader.

Those were just a narrowed down example of the challenges a distributed system design should cater for, and there are plenty that would include ex. resource management, heterogeneity, communications, security / privacy, QoS, task allocation…. etc.

Thus, from what has been noted so far, making a reliable, fast, and scalable cluster coordination is full of thorns, leading to an overall inconsistency in the cluster. This warrants the need of a coordination service for the distributed systems.

Since distributed systems are like a zoo, in the sense they are chaotic and hard to manage, they should be kept under control. Hence the need for the ZooKeeper. It is an open-source centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services for distributed systems that was originally developed at Yahoo Research and is written in Java.

You probably have had a large database cluster with handful of nodes one too many times and realized the need for centralized management of the entire cluster in terms of name services, group services, synchronization services, configuration management…etc. You could do all that from scratch or you could simply use ZooKeeper.

ZooKeeper values in a distributed system

Naming service is a service that maps a name to some information associated with that name. A DNS service is a name service that maps a domain name to an IP address. With ZooKeeper, you identify each node in the cluster with an identity, to keep a track of which servers or services are up and running and look up their status by name.

ZooKeeper has the flexibility of updating every node. Hence, that feature permits it to store updated information about each node across the cluster. Moreover, in Zookeeper, the status of each node is maintained in real-time. That leaves lesser chances for errors as well as ambiguity, that’s how it manages the cluster.

While modifying, ZooKeeper locks the data, so, if a failure occurs in the database, that helps the cluster to recover it automatically.

By stamping each update, it keeps track with a number denoting its order. This guarantees sequential consistency. It also provides atomicity, where updates either succeed or fail, No partial results.

ZooKeeper can centrally store and manage the configuration of your distributed system. This means that any new nodes joining will pick up the up-to-date centralized configuration from ZooKeeper as soon as they join the system. This also allows you to centrally change the state of your distributed system by changing the centralized configuration through one of the ZooKeeper clients.

A distributed system may have to deal with the problem of nodes going down, and you may want to implement an automatic fail-over strategy. ZooKeeper provides off-the-shelf support for doing so via leader election.

Applications and Organizations using ZooKeeper

Talend ESB uses ZooKeeper as endpoint repository of both REST and SOAP Web services. By using ZooKeeper Talend ESB can provide failover and load balancing capabilities in a very light-weight manner. Redis Failover is a ZooKeeper-based automatic master/slave failover solution for Ruby. The Eclipse ECF project provides an implementation of its Abstract Discovery services using Zookeeper. Box uses ZooKeeper for service discovery, service coordination, Solr and Hadoop support…etc.

Kafka, Storm, HBase, Hadoop MapReduce, Accumulo, Solr…etc. are few distributed systems from Apache projects already use ZooKeeper as their coordination service.

Rackspace’s Email & Apps team uses ZooKeeper to coordinate sharding and responsibility changes in a distributed e-mail client that pulls and indexes data for search. ZooKeeper also provides distributed locking for connections to prevent a cluster from overwhelming servers. ZooKeeper is used for a myriad of services inside Yahoo! for doing leader election, configuration management, sharding, locking, group membership…etc. ZooKeeper at Zynga is used for a variety of services including configuration management, leader election, sharding and more. Basically, for centralized configuration management, Cloudera search integrates search functionality with Hadoop by using ZooKeeper.

ZooKeeper Architecture: How it works?

Zookeeper follows a client-server architecture, where ZooKeeper all server nodes in its cluster store a copy of the data. ZooKeeper cluster leader is elected upon startup. A ZooKeeper service cluster called “Ensemble

A client is one of the nodes in your distributed system cluster. It accesses information from the server. Every client sends a message to the server at regular intervals that helps the server to know that the client is alive.

A server sends an acknowledgement when any client connects. In the case when there is no response from the connected server, the client automatically redirects the message to another server.

A leader is one of the servers’ nodes which gives all the information to the clients as well as an acknowledgment that the service is alive. It performs automatic recovery if any of the connected nodes failed.

A follower server node “follows” leader instructions. Clients read requests are processed by any of the servers it is connected to it. Write requests are forwarded to the leader node for processing.

Whenever a change is made, it is not considered successful until it has been written to a quorum successfully in the ensemble servers. Therefore, a write operation requires the agreement of (in general) at least half the nodes in an ensemble and hence the cost of a vote can increase significantly as more voters are added.

To resolve that, ZooKeeper introduced a node called “Observer” in the ensemble. Observers are non-voting members of an ensemble which only hear the results of votes, not the agreement protocol that leads up to them. Other than this simple distinction, Observers function the same as Followers - clients may connect to them and send read and write requests to them. Observers forward these requests to the Leader like Followers do, but they then simply wait to hear the result of the vote. Because of this, we can increase the number of Observers as much as we like without harming the performance of votes.

ZooKeeper Data Model - ZDM


Data nodes in ZooKeeper are organized in a hierarchical namespace ex data tree, which is like a standard file system. Names are a series of path elements separated by slashes (/). Each node in the ZooKeeper namespace is uniquely identified by a path. Unlike standard file systems, each node in the ZooKeeper namespace can have data and child nodes associated with it. Just as having a file system allows files to be directories. Each node is called a “zNode” and all kept in memory.

zNodes can store data, but it should be noted that the storage capacity is limited, no more than 1 MB and they maintain stat structure and version number for data changes. They are categorized into the following:

  • Persistence: This type of zNodes is alive even after the client which created it, is disconnected.

  • Ephemeral: This type of zNodes is alive as long its client is. Therefore, when the client gets a disconnect from ensemble, it will also be deleted. Moreover, they are not allowed to have children.

  • Sequential: They can be either ephemeral or persistent. Each time a sequential node is created, the ZooKeeper will automatically add 10 digits after the path, starting from 1. Each sequential node has a separate counter, which is monotonically increasing and maintained by the leader instance of ZooKeeper. Counter value of a new zNode under a parent is always larger than value of existing children.

ZooKeeper zNode Operations

Clients can work with ZooKeeper data nodes (read/write) through CLI or ZooKeeper API. ZooKeeper clients are your distributed system cluster nodes. There are two client API libraries maintained by the ZooKeeper project, one in Java and another in C. Regarding other programming languages, some libraries have been made that wrap either the Java or the C client. ZooKeeper’s data model and its API support the following nine basic operations:

  • create - It creates a zNode in a specified path of the ZooKeeper namespace.

  • delete - It deletes a zNode from a specified path of the ZooKeeper namespace.

  • exists - It checks if a zNode exists in the path.

  • getChildren - It gets a list of children of a zNode.

  • getData - It gets the data associated with a zNode.

  • setData - It sets / writes data into the data field of a zNode.

  • getACL - It gets the ACL (Access Control List) of a zNode. ZooKeeper uses ACL to control access to its zNodes. The ACL implementation is quite like UNIX file access permissions. ACL permissions supported are: CREATE, READ, WRITE, DELETE, ADMIN.

  • setACL - It sets the ACL of a zNode.

  • sync - It synchronizes a client’s view of a zNode with ZooKeeper.

Moreover, there is the notion of “Watches”; a watch event is a one-time trigger which is sent to the client that set a “Watch”. It occurred when data from that 'Watch" changes. ZDM “Watch” allows clients to get notifications when zNode changes. ZDM read operations like getData(), getChidlren(), exists() have the option of setting a “Watch”.

“Watches” are ordered, the order of “Watch” events corresponds to the order of the updates. A client will be able to see a watch event for zNode before seeing the new data which corresponds to that zNode.

Lastly, at the heart of ZooKeeper is an atomic messaging system that keeps all the servers are in-sync. More details can be found here

Day-in-the-life scenarios & use cases

Leader Election Scenario

Leader election refers to the problem of selecting a leader among a group of nodes belonging to one logical cluster. This is a hard problem in the face of node crashes. Looking into using ZooKeeper for leader election, a straightforward way of doing that is to let every distributed system server (ZooKeeper client) publishes its information in a zNode that is both sequential and ephemeral. Then, whichever server has the lowest sequential zNode (old member) is the leader for your distributed cluster!

With Zookeeper, the problem can be solved using the following steps:

  1. Create a permanent zNode (/election/) in ZooKeeper.

  2. Each client in the group creates an ephemeral node with “sequence” flag set (/election/node_), this means node should be both ephemeral and sequential. The sequence flag ensures that Zookeeper appends a monotonically increasing number to the node name. For instance, the first client who creates the ephemeral node will have the node named /election/node_1 while the second client creates node /election/node_2

  3. After the ephemeral node creation, the client will fetch all the children nodes under /election. The client that created the node with smallest sequence is elected the leader. This is a straightforward way of electing the leader.

  4. Each client will “Watch” the presence of node with sequence value that is one less than its sequence. If the “Watched” node gets deleted, the client will again repeat step 3 to check if it has become the leader.

  5. If all clients need to know of the current leader, they can subscribe to group notifications for the node “/election” and determine the current leader on their own.

Group Membership Scenario

Group membership is a “popular” ready-made feature available in Zookeeper. This function is achieved by exploiting the child notification mechanism.

  1. A permanent node “/members” represents the logical group node.

  2. Clients create ephemeral nodes under the group node to indicate membership.

  3. All the members of the group will subscribe to the group “/members” thereby being aware of other members in the group.

  4. If the client shuts down for any reason, ZooKeeper guarantees that the ephemeral nodes corresponding to the client will automatically be removed and group members notified.

Global Locks Construction Scenario

Fully distributed locks are globally synchronous, meaning at any snapshot in time no two clients think they hold the same lock. These can be implemented using ZooKeeper.

  1. Create a permanent node “/lock” in ZooKeeper.

  2. Each client in the group creates an ephemeral node with “sequence” flag set (/lock/node_) by calling create() method. This means this node is type of both ephemeral and sequence.

  3. After the ephemeral node creation, the client will fetch all the children nodes under /lock directory by calling getChildren() method. The client that created the node with smallest sequence in the child list is said to be holding the lock.

  4. Each client will “Watch” the presence of node with sequence value that is one less than its sequence. If the “Watched” node gets deleted, the client will again repeat step 3 to check if it is holding the lock.

The unlocking protocol is simple: clients wishing to release a lock simply delete the node they created in step 2. The removal of a node will only cause one client (ex. the one created next higher sequence node) to wake up since each node is watched by exactly one client. In this way, you avoid the herd effect. If all clients need to know about the change of lock ownership, they can listen to group notifications for the node “/lock” and determine the current owner.

ZooKeeper in Kafka

Apache Kafka uses Apache ZooKeeper to store its metadata. Data such as the location of partitions and the configuration of topics are stored outside of Kafka itself, in a separate ZooKeeper cluster.

Kafka uses Zookeeper to do leadership election of Kafka Broker and Topic Partition pairs. Also, to manage service discovery for Kafka Brokers that form the cluster. Zookeeper sends changes of the topology to Kafka, so each node in the cluster knows when a new broker joined, a Broker died, a topic was removed or a topic was added, etc. Zookeeper provides an in-sync view of Kafka Cluster configuration.

1 Like