Mesa: Geo-Replicated, Near Real-Time, Scalable Data Warehousing - Google, 2014

Like all my notes in this series, I’ve only included portions I’ve found interesting.

  • Notable Features:
    • Atomic updates across tables (including materialized views and indexes)
    • Strong consistency guarantees (repeatable queries at a version number)
    • Geo-replicated (Using redundant work)
      • Data is replicated asynchronously, metadata synchronously
      • I think this ideas is really interesting
      • They use Paxos for metadata but don’t mention PaxosDB. Perhaps they used something else.
      • Separating the data from metadata allows replicas to know if they are out of data quickly
    • Uses batched updates (near real-time)
      • They tune it for consistency on the order of minutes
    • Availability
      • The authors claim that there is no downtime during a datacenter event
      • They cheat by providing a lot of redundancy
      • You still need a majority in paxos to have an update. They should have specified that they allow consistent reads at an old (committed) version
    • They aim for 99th percentile latencies to be “in the hundreds of milliseconds” for point queries
      • This seems slow if they wish to serve web clients
    • Schema Changes
      • Like F1, they support online schema changes
  • Why not datastore x?

    I was thinking this question when I first heard of this paper.

    • Bigtable does not support multi-row (let alone multi-table) transactions
    • Megastore, Spanner, and F1 could not stand up to the update throughput requirements
      • Hence the batching
  • Data Model
    • They recognize that a single fact may be part of many other aggregations (materialized views)
    • Each table defines a key space, K, and value space, V.
    • The keys and values can be made up of multiple columns

      For example: (Date, Publisher, Country) -> (ClickCount, Cost)

    • Each value column had a defined aggregation function
      • Must be associative, often commutative
      • A sum for example
    • Each table can have multiple indexes. These are just total orderings of the keys
    • Materialized views must use the same aggregation functions
  • Updates
    • Updates are batched together by an external system
      • They don’t mention it, but it seems a perfect job for MillWheel
      • They created a library to construct and verify well-formed batches
      • These batches are assigned a sequential version number via Paxos
      • The batch contains a single aggregated value for each updated key
    • Each instance of Mesa applies update batches in order
      • By maintaining order they do not require commutative aggregations
      • Especially useful when they wish to revert a change by adding the negative
      • “A negative fact can never be incorporated before it’s positive counterpart”
    • Mesa will compute the updated to views and indexes (in each DC)
  • Version Management
    • The ordered collection of updates is enough to serve queries, but is not efficient
    • They define a delta as the aggregations of versions the versions [Vx, Vy] (Where x < y)
    • With a delta, they can compress many batches together. They keep the individual versions for a period of time to serve queries at a particular version
    • Deltas can be aggregated together as well (as long as they are applied in order)
      • I note that we’ve created a LSM with aggregation functions
      • Deltas are stored sorted by key (linear merge)
    • They periodically compact old versions into a base [V0, B]
      • They give the example of 24 hours
      • Then they delete old deltas and versions covered by the base
    • The delta compaction policy determines how many deltas and individual versions to keep around and when to create new deltas to subsume smaller deltas

      Specifically:

      • What deltas must be created before a version can be queried
        • For performance, I assume
      • When deltas should be created outside of the update path
      • When deltas can be deleted
    • The authors note that even though a new base may have been generated, new deltas relative to the new base must also be created
      • Otherwise performance would be abysmal
  • Physical Data
    • They use an RCFile style file format
      • (Partition the data into row-groups and then store data column wise for compression and I/O efficiency)
    • Indexes are also materialized, they store a complete set of the data
      • I’m surprised they didn’t opt to make the index a materialized view instead of a special case. I’d be interested to hear the reasoning behind this.
      • To be fair, it’s not a special case, each data file belongs to a specific index. There’s no notion of the main data order.
    • Each data file has an index file that stores fixed size prefixes of the first row key for each row group along with the offset
  • Single DC Architecture
    • Each DC instance can scale independently and each subsystem can scale independently
    • Update and Maintenance Subsystem
      • Uses a controller (master) / worker architecture
      • Controller maintains the metadata in BigTable
        • It’s the only writer of the metadata in BigTable
      • Controller listens to a metadata feed of table updates
      • Controller schedules the work to be done in different queues
        • There is a pool of workers for each job type
          • Allows them to scale independently
          • They use their cluster scheduling system to dynamically adjust the number of workers based on the fraction of idle workers available
        • Each worker polls the controller for work to be done
        • Workers get a lease on the work and can request more time
        • Controller will only accept work from the last worker it assigned the work to (the tasks are made idempotent)
        • GC is done to remove abandoned work results
    • Query Subsystem
      • Limited query language
        • Includes conditional filtering and group-by
        • The authors say that other engines use these primitives to build a richer query language
      • Each query is labeled
        • Queries that drive dashboards are labeled as having low-latency requirements
        • Bulk query workloads are labeled differently
      • Query servers for an instance of Mesa are grouped into sets
        • Each set can serve all tables
        • Similar to TAO, Unicorn, and PNUTS
        • They try to send queries for the same data to the same set for optimal caching
        • Each query server registers the data it actively caches with a locater service
          • I wonder how the servers determine which data to cache
          • If a client can not find a server that is caching a piece of data how did the authors decide the client should proceed?
  • Multi-DC
    • Update
      • A committer coordinates consistent updates across the instances of Mesa
        • Assigns each batch a version and writes the data into a datastore using Paxos
          • Again, they didn’t say PaxosDB as in Photon
        • Committer is stateless
      • The committer keeps checking to see if the update has propagated to a sufficient number instances
        • This includes checking multiple tables influenced by the update
        • Once the criteria is met, the committer updates the committed version number to reflect the update. Queries will now show the results of the update
      • Note how there is no need to have locks between readers and writers
    • They use a peer-to-peer mechanism to load data onto a new instance and in the case of recovery
  • Query Optimizations
    • Delta Pruning
      • Each delta’s metadata includes the key range affected
      • If a delta does not cover data required for a query, it will be skipped
      • Particularly effective on time series data
        • Since data is stored in key order, later updates will cover only a small portion of the total keyspace. Queries for old data can just check the base version.
    • Scan-to-seek
      • Even if the predicate is not on a key not at the front of the prefix, the authors point out that you can still use the index to skip many records
    • Resume Key
      • Since most clients stream results, and each block includes a resume key, clients can issue the same query to another server and resume processing where it left of. This is great for failover
  • Worker Parallelization
    • MapReduce is used for many operations. However, it is a challenge to ensure reasonably equal distribution of load across the key space
    • When writing a delta, the worker samples a key every so often and records it’s offset
    • These samples can be used to determine a reasonable input split balance
  • Schema Changes
    • Naive Method:
      • Create a copy of the table with the new schema at a specific version
      • Replay the updates on the copy until it catches up
      • Switch the version used for queries atomically
        • They note that queries can still run against the old schema for some time
    • Linked Schema Change
      • Makes the change visible immediately while translating to the new schema on the fly
      • All new deltas are written with the new version
      • It is not always possible. The authors give the example of a schema change reversing the sort order of a table