Zookeeper

http://nil.lcs.mit.edu/6.824/2020/papers/zookeeper.pdf

1. Introduction

  • Does wait-free imply the lack of thread-level waiting or is the notion of waiting on another node included?
    • According to this it’s just an allusion to the fact that the API is non-blocking.
  • Target workloads: read-optimized, 2-100x as many reads as writes, handles 10s-100s of thousands of ops/sec
  • A coordination kernel is exposed that allows clients to specify their own primitives

Our system, Zookeeper, hence implements an API that manipulates simple wait-free data objects organized hierarchically as in file systems.

  • Guarantees
    • Linearizable writes, but not (necessarily) linearizable reads
      • What does it mean for only writes to be linearizable?
    • FIFO per-client ordering
      • Writes in this per-client ordering are ordered the same way in the global/total order of linearizable writes (^^^)
  • Clients submit requests asynchronously; a session is first registered and all requests use the same session handle
  • Sessions can be closed explicitly, but can time out as well

2. The ZK Service

  • Clients see a data tree of znodes 300
  • Each znode contains data, and each non-ephemeral znode can have children
  • Ephemeral znodes (explicitly created as such) are GCed when the creating session closes
  • The sequential flag allows creating uniquely named znodes by appending a number from a monotonically increasing counter
  • The watch flag allows clients to be notified when a given znode changes.
    • These are triggered only once, and don’t persist if the session closes
    • The notification doesn’t contain the actual change, to account for the fact that multiple changes may occur close together

2.2 Client API

  • API (both sync & async versions)
    • create(path, data, flags): create a znode at path, containing data
    • delete(path, version): delete the znode at path if version matches the znode’s version
    • exists(path, watch): check if a znode exists, optionally set a watch regardless of existence
    • getData(path, watch): get znode data/metadata, optionally set a watch only if the znode exists
    • setData(path, data, version): set znode data to data if version is a match
    • getChildren(path, watch): get the names of all the znode’s children
    • sync(path): “Waits for all updates pending at the start of the operation to propagate to the server that the client is connected to. The path is currently ignored.”

2.3 Guarantees

  • Example: leader election, a new leader is elected, deletes the ready node (at a well-known path), updates many configuration values, and creates the ready node again. When clients see the ready node, they then know this new leader is ready to accept requests/etc., and no client ever reads the configuration while it is being written, making the entire process effectively atomic *

    The above scheme still has a problem: what happens if a process sees that ready exists before the new leader starts to make a change and then starts reading the configuration while the change is in progress. This problem is solved by the ordering guarantee for the notifications: if a client is watching for a change, the client will see the notification event before it sees the new state of the system after the change is made. Consequently, if the process that reads the ready znode requests to be notified of changes to that znode, it will see a notification informing the client of the change before it can read any of the new configuration.

    • If I’m understanding that correctly, the assumption is that the client will have a watch set up on the ready znode, so that client will see that the znode has been deleted (and so not read any partially-written configuration) before it can see any configuration changes.
    • Do watch notifications have at-least-once delivery guarantees? What if the client misses the notification in some way?
  • What if two clients have a shared communication channel? If client A writes configuration and marks itself ready and then tells client B about this using a non-ZK channel, client B might attempt to read configuration from a different node that isn’t consistent yet. *

    ZooKeeper provides the sync request: when followed by a read, constitutes a slow read. sync causes a server to apply all pending write requests before processing the read without the overhead of a full write. This primitive is similar in idea to the flush primitive of ISIS [5].

    • sync may not work if:
      • a partition has occurred
      • client B is reading from a node in the minority that believes the leader is also in the minority
      • the leader in the minority believes it is still the leader
      • a new leader has been elected on the majority side
      • client A has issued its writes to the majority leader
      • the minority leader doesn’t have A’s writes, but sync appears to succeed nonetheless
    • If client B was to synchronously issue a no-op write before reading, would that guarantee a truly linearizable read?

2.4 Primitives

Examples of locking/etc. primitives that can be built up from ZK’s znode tree.

  • Config Management: clients read a given znode with the watch flag set. They’re notified when an update occurs, at which point they issue another read with the watch flag set.
  • Rendezvous: workers don’t know how to connect to the coordinator. Workers connect to a given znode with the watch flag set, the coordinator starts up and populates that same znode with connection info. Either the workers read the connection info straight away, or are able to read it after the first watch notification fires.
  • Group Membership: each member populates a unique ephemeral znode as a child of a well-known znode. If that member crashes or the session is terminated, the znode is removed. Listing the children of the well-known znode gives you up-to-date group membership information.
  • Simple Locks: create an ephemeral znode. If the znode is created, you hold the lock. When the lock is released, all waiting clients vie for the lock (herd effect), which could be an issue.
  • Simple Locks without Herd Effect: more work for the client, but this allows for waiters to be woken up in order 400
  • Read-Write Locks: ditto 400
  • Double Barrier: wait for N workers to come alive before they all start work together. Each worker creates a unique znode that’s a child of a well-known znode. All workers start working when the number of children exceeds N.

3. Applications

  • The Fetching Service is a distributed web crawler that uses ZK for consistent configuration and leader election.
  • Katta is a distributed indexer that uses ZK for leader election, group membership (list of workers), and configuration.
  • Yahoo! Message Broker is a distributed pubsub system that uses ZK for configuration, failure detection, group membership, leader election, and for control plane access. Is Kafka based on this? Some of the nomenclature - brokers, topics - is similar

4. Implementation

400

Figure 4 shows the highlevel components of the ZooKeeper service. Upon receiving a request, a server prepares it for execution (request processor). If such a request requires coordination among the servers (write requests), then they use an agreement protocol (an implementation of atomic broadcast), and finally servers commit changes to the ZooKeeper database fully replicated across all servers of the ensemble. In the case of read requests, a server simply reads the state of the local database and generates a response to the request.

  • Each node contains an in-memory “replicated database” that reads are served from.

  • Writes to this state require consensus, and are written to a WAL before being applied.

  • Periodic snapshots keep the size of the WAL in check.

  • All writes go through the quorum leader

  • The request processor converts incoming write requests into transactions that capture the final state of that znode were the transaction to be applied. This allows transactions to be idempotent, but also makes write skew impossible.

  • ZAB uses “simple majority quorums”, but the how seems to be out-of-scope for this paper

    • The leader broadcasts state changes (writes) to followers
    • All changes are delivered in the order they’re sent
    • Changes from previous leaders are delivered to a new leader before that new leader starts broadcasting its own changes
  • Messages may be re-delivered during crash recovery, but the fact that transactions are idempotent makes this ok, as long as they’re delivered without being reordered.

  • Snapshots are captured live

    • This naturally means that snapshots are not consistent
    • Recovery involves first hydrating a snapshot, then replaying all messages delivered since the snapshot process was started
    • Because transactions are idempotent, this brings the in-memory database back to a consistent state
  • Servers process writes sequentially

    • Notifications are sent out + watches are cleared for each processed write
    • Notifications are only sent out by the specific server a client is connected to
    • So different clients could receive notifications at wildly different times
  • sync calls for a given server are appended to the queue of requests between the leader and that server *

    In order for this to work, the follower must be sure that the leader is still the leader. If there are pending transactions that commit, then the server does not suspect the leader. If the pending queue is empty, the leader needs to issue a null transaction to commit and orders the sync after that transaction.

    • So if I’m reading this correctly, the only way stale reads are possible here is if:
      • The server receives sync
      • An unrelated pending transaction commits, so the server assumes the leader is still the leader, and asks the leader to put a sync at the end of it’s queue of messages to that server
      • A partition occurs before the sync has been added to the queue. The leader loses its quorum, but it and the server that received sync both still believe that it’s the leader
      • Messages that the leader would’ve added to the queue preceding the sync if the partition were not to occur are now lost to the server
      • The server receives the sync and wrongly assumes that it is now consistent up to the time the sync was added to the queue
    • I can’t think of a way stale reads are possible if you always preface a sync with an explicit no-op write 🤔
  • Clients send heartbeats to keep sessions alive

5. Evaluation

Performance when reads are served locally: 500

Performance when reads are served through the leader: 500

Edit