Designed to replace a third-part data warehouse (like a Greenplum or Vertica)
for their Ads data
They were experiencing long load times
Tenzing can use data on GFS, BigTable or even MySQL
The execution engine is MapReduce
They keep the latency low by keep workers active and ready
Architecture
Distributed Worker Pool - Execution system
Master
Small pool of masters (“a few dozen”), they coordinate the work for the
query across the worker nodes
Master Watcher
Worker Nodes
Workers poll the master for work
Query Server
Parses and optimizes the query, transforming it into a physical plan
Query server finds a master with the help of the master watcher to submit
a query
Monitors the result area for new data and reads it as it arrives, streamed
to the lcient
Metadata Server
Holds table details / schemas, ACL
Uses BigTable
SQL - There’s a lot here, but I’m just focusing on parts that I found
interesting
Projection and Filtering
If the source is indexed and the query uses a constant range the the
compiler can just use the index for the particular range
Great for date scans and point queries
Can use ColumnIO headers to determine if there is any relevant data in
the file. If not, they can stop reading the file after the header.
Aggregation
Hash Based
For a hash aggregation, there is no need for sorting the data before it
reaches the reducer, they customized the MapReduce code to allow them
to skip the sorting step
Joins
Broadcast Join - Used if one of the tables can fit in memory on each node
The data-structure is determined at run time, can be an integer array or
sparse integer map or a specific hash table implementation
Filters run on the join data while loading, and only columns that are
required
They use a single copy of the data in memory shared between
multi-threaded workers
If a table is used often and does not change, they’ll cache the data on
local disk
Remote Lookup Joins
If the source is index on the join key, Tenzing will look it up remotely
asynchronously in a batch with an LRU cache
Distributed Merge-Sort Joins
Used when the tables are roughly the same size
Distributed Hash Joins
Used when one table is much larger than the other
Like aggregation, sorting is not required so it’s turned off for
performance
Uses the partitioning phase on the join key
The mappers for both tables can be run in parallel
The join MapReduce can finish when one source MR finishes and the other
starts creating data (does not have to wait for them to both finish)
Nested Queries
If multiple MR jobs are needed, Tenzing will put the following mapper in
the same process as the mapper!
Structured Data
Can only deal with flat relational data, protocol buffer fields must be
flattened if they are used (unlike Dremel)
Views
Logical, used for security
DDL
The have tools that can look at a MySQL DBw and import the table schema
automatically
They can determine the structure of Protocol Buffers
Optimization
Tenzing streams results between MapReduce phases and use GFS as a backup
Chaining phases in memory, co-locating a reduce and subsequent map task in
the same process
In traditional MapReduce jobs, row based serialization is used for
intermediate data because it needs to be sorted
However, for some tasks that do not require sorting, they can use a
block-based shuffle combining many rows into a compressed block of
about 1 MB
They found this led to about 3X faster shuffling of data
Tenzing can detect if it’s faster to just run the query locally
Query Engine
Originally translated SQL to Sawzall then used Sawzall’s JIT
There was a high serialization / deserialization over head to convert to
Sawzall’s type system
The current implementation uses Dremel’s SQL expression
evaluation engine
Faster than Sawzall but still slow due to it’s interpreter nature
They are working on an LLVM implementation
So far they found it be very fast for many queries