Percolator: Large-scale Incremental Processing Using Distributed Transactions and Notifications - Google, 2010

I thought a bit about how to categorize this paper. Initially I thought streaming would be a good fit. While reading the paper I found distributed database to be the correct category. Now I think the general category of distributed computation to be apt.

"A database is a materialized view of a write-ahead log."

Neil Conway

Percolator is built on top of Bigtable to support incremental updates to the search index at Google. Previously they used MapReduce to rebuild the index as a batch process. Percolator allows engineers to write a computation in the form of a transaction that runs when a watched column is written to.

Encapsulating the computation as a transaction is an interesting way to simplify updating the database abstracting away the complications that come from a parallel databased undergoing concurrent updates.

These transactions have relaxed latency requirements. This allows the authors to make optimizations such as lazy lock cleanup.

  • Other Notable Features:
    • ACID transactions
      • Cross-row and cross-table
    • Snapshot isolation
      • Reads are from one consistent snap shot and writes are performed at another
      • I note that while snapshot isolation is not perfect (write skew), it can achieve great performance
  • Architecture
    • Three Binaries - Run on every machine
      • Bigtable tablet server
      • GFS chunkserver
      • Percolator worker
        • The computations, named observers, are linked into the worker
        • Worker scans for updates to the columns
    • Timestamp Oracle
      • Strictly increasing timestamps
    • Lightweight Lock Service
      • Just an optimization, not required
  • Design Notes
    • A table is indexed by row and column
      • Since they use MVCC for snapshot isolation, each cell is actually a series of timestamped values (native support in Bigtable)
    • No central transaction manager
      • Thus no global deadlock detector
      • Increases latency in the case of conflicting transactions
    • Transactions are implemented in C++
    • They use blocking calls but use a threadpool for parallelism
      • They found this to be simpler and allow for some clever batching
    • Transactions allow engineers to create consistent secondary indexes
  • Transaction Isolation
    • Each column foo is represented as a data column foo:data, has a lock column foo:lock and write column foo:write.
    • A worker coordinates a 2PC protocol
      • Prewrite: lock cells being written to
        • If there is another lock at any timestamp the transaction aborts
        • If there is a write record after the current transactions start timestamp, abort the transaction (write-write conflict)
        • Write the lock and data to each cell at the start timestamp
      • Commit
        • Get a commit timestamp from the timestamp oracle
        • Release each lock and replace it with a write record
          • A write record, stored in foo:write, really points to the timestamp of the prewrite from the first phase.
          • Bigtable supports atomic row operations
        • If the transaction fails to complete at this stage, it will be rolled forward by other workers lazily
    • Get()
      • If there is a lock in [0, start_timestamp], wait until the lock is released
      • Otherwise return the data
      • Notice how no transaction (T1) could commit before T2’s start_timesamp without having a prewrite lock on the column. commit_timestamp is after the prewrite phase.
    • Lazy Lock Cleanup
      • Each transaction has a primary lock, all other locks in the prewrite phase point to this primary lock
      • If T1 encounters a lock left by T2, it checks to see if the T2 has likely failed
        • Workers have an ephemeral path in Chubby
        • There is also a timeout
          • The wall time in the lock can be updated periodically for long running transactions
            • I’d like to see the implementation of this. My guess is the worker has a thread that periodically checks and refreshes all the locks held by the worker across all transactions
      • If T1 feels T2 has failed, it attempts to cleanup the primary lock first (this is atomic), rolling back the transaction
      • If the crash was in the commit phase, the primary will have been converted to a write record. T1 is responsible for rolling the transaction forward
  • Timestamp Oracle
    • Creates a batch of timestamps and logs the highest timestamp to stable storage. Now the oracle can hand out timestamps from memory. If the orace crashes, it can just start skip to beyond the highest logged timestamp
    • It is not sharded, single machine (2 million timestamps/sec)
    • Workers batch calls to the timestamp server for several transactions
  • Notifications
    • Like a trigger, but does not run in the same transaction as the write
      • This is not for data integrity but incremental processing
    • At-most-one one observer’s transaction will commit for a given change
    • Multiple changes may be batched into one observer transaction, however
      • The authors say batching allows them to deal with cells that change quite a lot
    • Each foo:data has an additional foo:observer0 column for each observer
      • Stores the timestamp of the latest start for the observer
      • If two transactions start for the same change, only one of them will commit because of a conflict on the ack column
    • Tables may be trillions of cells but only have millions of updates
      • During a write to a watched column, an additional column is written to for each dirty cell
      • These columns are in a separate locality group so that workers only need to scan the notify columns, not the actual data
    • Each worker scans with many threads, picking a random tablet and then random start key
      • Workers acquire locks in the the lightweight lock service prior to running observers on a row
        • The authors make it sound like a lock is acquired for each row scanned but I imagine it’s more a range of keys? Seeing as the authors are fans of batching calls across transactions, they probably batch requests for row level locks together.
        • It’s OK if the lock service goes down, it’s just an optimization
    • The authors noted that threads tended to pile up at a slow section of the keyspace. Many threads reaching the same point compounded the problem
      • If a thread notices it is scanning the same row as another thread it jumps to a randomly selected key
    • Weak Notification
      • Write only to notify column
  • Batching
    • A worker waits until it has collected several precommit operations at a time
      • They wait for several seconds to create a batch
    • Reads are also batched
      • They offset this with prefetching columns in the same row
        • Made based on past read patterns
        • I wonder which algorithm they chose
  • Tradeoffs
    • The authors make note of several interesting tradeoffs:

    “How much of an efficiency loss is too much to pay for the ability to add capacity endlessly simply by purchasing more machines?”

    “How does one trade off the reduction in development time provided by a layered system against the corresponding decrease in efficiency?”

  • The authors originally thought about building the indexing system directly on Bigtable but “the complexity of reasoning about concurrent state modification without the aid of strong consistency was daunting.”