I thought a bit about how to categorize this paper. Initially I thought
streaming would be a good fit. While reading the paper I found distributed
database to be the correct category. Now I think the general category of
distributed computation to be apt.
"A database is a materialized view of a write-ahead log."
Percolator is built on top of Bigtable to support
incremental updates to the search index at Google. Previously they used
MapReduce to rebuild the index as a batch process.
Percolator allows engineers to write a computation in the form of a transaction
that runs when a watched column is written to.
Encapsulating the computation as a transaction is an interesting way to simplify
updating the database abstracting away the complications that come from a
parallel databased undergoing concurrent updates.
These transactions have relaxed latency requirements. This allows the authors to
make optimizations such as lazy lock cleanup.
- Other Notable Features:
- ACID transactions
- Cross-row and cross-table
- Snapshot isolation
- Reads are from one consistent snap shot and writes are performed at
another
- I note that while snapshot isolation is not perfect (write skew), it can
achieve great performance
- Architecture
- Three Binaries - Run on every machine
- Bigtable tablet server
- GFS chunkserver
- Percolator worker
- The computations, named observers, are linked into the worker
- Worker scans for updates to the columns
- Timestamp Oracle
- Strictly increasing timestamps
- Lightweight Lock Service
- Just an optimization, not required
- Design Notes
- A table is indexed by row and column
- Since they use MVCC for snapshot isolation, each cell is actually a series
of timestamped values (native support in Bigtable)
- No central transaction manager
- Thus no global deadlock detector
- Increases latency in the case of conflicting transactions
- Transactions are implemented in C++
- They use blocking calls but use a threadpool for parallelism
- They found this to be simpler and allow for some clever batching
- Transactions allow engineers to create consistent secondary indexes
- Transaction Isolation
- Each column
foo
is represented as a data column foo:data
, has a lock
column foo:lock
and write column foo:write
.
- A worker coordinates a 2PC protocol
- Prewrite: lock cells being written to
- If there is another lock at any timestamp the transaction aborts
- If there is a write record after the current transactions start
timestamp, abort the transaction (write-write conflict)
- Write the lock and data to each cell at the start timestamp
- Commit
- Get a commit timestamp from the timestamp oracle
- Release each lock and replace it with a write record
- A write record, stored in
foo:write
, really points to the timestamp
of the prewrite from the first phase.
- Bigtable supports atomic row operations
- If the transaction fails to complete at this stage, it will be rolled
forward by other workers lazily
Get()
- If there is a lock in
[0, start_timestamp]
, wait until the lock is
released
- Otherwise return the data
- Notice how no transaction (T1) could commit before T2’s
start_timesamp
without having a prewrite lock on the column. commit_timestamp
is after
the prewrite phase.
- Lazy Lock Cleanup
- Each transaction has a primary lock, all other locks in the prewrite phase
point to this primary lock
- If T1 encounters a lock left by T2, it checks to see if the T2 has likely
failed
- Workers have an ephemeral path in Chubby
- There is also a timeout
- The wall time in the lock can be updated periodically for long running
transactions
- I’d like to see the implementation of this. My guess is the worker
has a thread that periodically checks and refreshes all the locks
held by the worker across all transactions
- If T1 feels T2 has failed, it attempts to cleanup the primary lock first
(this is atomic), rolling back the transaction
- If the crash was in the commit phase, the primary will have been converted
to a write record. T1 is responsible for rolling the transaction forward
- Timestamp Oracle
- Creates a batch of timestamps and logs the highest timestamp to stable
storage. Now the oracle can hand out timestamps from memory. If the orace
crashes, it can just start skip to beyond the highest logged timestamp
- It is not sharded, single machine (2 million timestamps/sec)
- Workers batch calls to the timestamp server for several transactions
- Notifications
- Like a trigger, but does not run in the same transaction as the write
- This is not for data integrity but incremental processing
- At-most-one one observer’s transaction will commit for a given change
- Multiple changes may be batched into one observer transaction, however
- The authors say batching allows them to deal with cells that change quite
a lot
- Each
foo:data
has an additional foo:observer0
column for each observer
- Stores the timestamp of the latest start for the observer
- If two transactions start for the same change, only one of them will
commit because of a conflict on the ack column
- Tables may be trillions of cells but only have millions of updates
- During a write to a watched column, an additional column is written to
for each dirty cell
- These columns are in a separate locality group so that workers only need
to scan the notify columns, not the actual data
- Each worker scans with many threads, picking a random tablet and then random
start key
- Workers acquire locks in the the lightweight lock service prior to
running observers on a row
- The authors make it sound like a lock is acquired for each row scanned
but I imagine it’s more a range of keys? Seeing as the authors are fans
of batching calls across transactions, they probably batch requests for
row level locks together.
- It’s OK if the lock service goes down, it’s just an optimization
- The authors noted that threads tended to pile up at a slow section of the
keyspace. Many threads reaching the same point compounded the problem
- If a thread notices it is scanning the same row as another thread it jumps
to a randomly selected key
- Weak Notification
- Write only to notify column
- Batching
- A worker waits until it has collected several precommit operations at a time
- They wait for several seconds to create a batch
- Reads are also batched
- They offset this with prefetching columns in the same row
- Made based on past read patterns
- I wonder which algorithm they chose
- Tradeoffs
- The authors make note of several interesting tradeoffs:
“How much of an efficiency loss is too much to pay for the ability to add
capacity endlessly simply by purchasing more machines?”
“How does one trade off the reduction in development time provided by a
layered system against the corresponding decrease in efficiency?”
- The authors originally thought about building the indexing system directly on
Bigtable but “the complexity of reasoning about concurrent state modification
without the aid of strong consistency was daunting.”