Tenzing: A SQL Implementation On The MapReduce Framework - Google, 2011

  • [Paper] [Mirror]

  • They reach a latency “as low as ten seconds”
  • Designed to replace a third-part data warehouse (like a Greenplum or Vertica) for their Ads data
    • They were experiencing long load times
  • Tenzing can use data on GFS, BigTable or even MySQL
  • The execution engine is MapReduce
    • They keep the latency low by keep workers active and ready
  • Architecture
    • Distributed Worker Pool - Execution system
      • Master
        • Small pool of masters (“a few dozen”), they coordinate the work for the query across the worker nodes
      • Master Watcher
      • Worker Nodes
        • Workers poll the master for work
    • Query Server
      • Parses and optimizes the query, transforming it into a physical plan
      • Query server finds a master with the help of the master watcher to submit a query
      • Monitors the result area for new data and reads it as it arrives, streamed to the lcient
    • Metadata Server
      • Holds table details / schemas, ACL
      • Uses BigTable
  • SQL - There’s a lot here, but I’m just focusing on parts that I found interesting
    • Projection and Filtering
      • If the source is indexed and the query uses a constant range the the compiler can just use the index for the particular range
        • Great for date scans and point queries
      • Can use ColumnIO headers to determine if there is any relevant data in the file. If not, they can stop reading the file after the header.
    • Aggregation
      • Hash Based
        • For a hash aggregation, there is no need for sorting the data before it reaches the reducer, they customized the MapReduce code to allow them to skip the sorting step
    • Joins
      • Broadcast Join - Used if one of the tables can fit in memory on each node
        • The data-structure is determined at run time, can be an integer array or sparse integer map or a specific hash table implementation
        • Filters run on the join data while loading, and only columns that are required
        • They use a single copy of the data in memory shared between multi-threaded workers
        • If a table is used often and does not change, they’ll cache the data on local disk
      • Remote Lookup Joins
        • If the source is index on the join key, Tenzing will look it up remotely asynchronously in a batch with an LRU cache
      • Distributed Merge-Sort Joins
        • Used when the tables are roughly the same size
      • Distributed Hash Joins
        • Used when one table is much larger than the other
        • Like aggregation, sorting is not required so it’s turned off for performance
        • Uses the partitioning phase on the join key
        • The mappers for both tables can be run in parallel
        • The join MapReduce can finish when one source MR finishes and the other starts creating data (does not have to wait for them to both finish)
    • Nested Queries
      • If multiple MR jobs are needed, Tenzing will put the following mapper in the same process as the mapper!
    • Structured Data
      • Can only deal with flat relational data, protocol buffer fields must be flattened if they are used (unlike Dremel)
    • Views
      • Logical, used for security
    • DDL
      • The have tools that can look at a MySQL DBw and import the table schema automatically
      • They can determine the structure of Protocol Buffers
  • Optimization
    • Tenzing streams results between MapReduce phases and use GFS as a backup
    • Chaining phases in memory, co-locating a reduce and subsequent map task in the same process
    • In traditional MapReduce jobs, row based serialization is used for intermediate data because it needs to be sorted
      • However, for some tasks that do not require sorting, they can use a block-based shuffle combining many rows into a compressed block of about 1 MB
      • They found this led to about 3X faster shuffling of data
    • Tenzing can detect if it’s faster to just run the query locally
    • Query Engine
      • Originally translated SQL to Sawzall then used Sawzall’s JIT
        • There was a high serialization / deserialization over head to convert to Sawzall’s type system
      • The current implementation uses Dremel’s SQL expression evaluation engine
        • Faster than Sawzall but still slow due to it’s interpreter nature
      • They are working on an LLVM implementation
        • So far they found it be very fast for many queries