Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams - Google, 2013

  • [Paper] [Mirror]

  • Joining multiple streams with exactly once processing
  • Joins are distributed geographically for fault tolerance
  • Not for general computation like MillWheel

  • Originally designed for joining search query data with ad click data
    • These are two separate streams that do not have a guaranteed global order
      • It’s not unusual to have one servers clicks be severely delayed
    • The streams come from many front end servers via logs
    • It’s not uncommon for one stream to be delayed
    • They didn’t want to send the query data with the URL, just an ID
      • Giant URLs
      • Have to trust the user data
  • They can never charge an advertiser twice, but they want to make sure they join all of the results eventually, leading to:
    • at most-once at any time
    • near-exact in real-time
    • exactly-once eventually
  • They achieve fault tolerance partially but running the join in multiple DCs
    • They use a system called the IdRegistry which stops multiple outputs of the join in a read-modify-write transaction
    • Each cluster retries the join until their is a success
  • IdRegistry
    • KV store, PaxosDB
    • Before an output is written, the node attempts to commit in IdRegisty using the join key
    • If the record already exists, it skips it, assuming it has been joined successfully
    • While the IdRegisty commit is atomic, the node could still fail after writing the value
      • There is a system that periodically checks and re-injects events that were not present in the output but present in the IdRegisty
    • For performance, the node checks the IdRegistry before performing the join lookup, but confirms at the end with the atomic transaction
    • Since the IdRegisty is located in geographically distance DCs, there can only be a few transactions per second (less than 10) as the latency between DCs is high
      • They batch many transactions into one Paxos transaction
        • They make sure two transactions don’t conflict in one batch at the application level
      • They also shard the IdRegisty
        • The partitioning can be adjusted dynamically
        • They can put a new map in PaxosDB which is effective from a specified timestamp onwards
  • Dispatcher and Joiner
    • The dispatcher reads the click logs and sends data the joiner to joined (after checking the IdRegistry)
    • The joiner is stateless, it looks up the query data in the event store
    • Initially, it’s not clear why these need to be separate processes
      • I believe they did this so they could scale the components individually
    • The dispatcher stores status of the log files it’s reading in GFS
      • This allows for worker restart
    • Sometimes the query data may not be in the EventRegistry yet, so the dispatcher is responsible for periodically retrying
      • Exponential backoff
    • Sometimes the joiner may send a transaction to IdRegisty, but fail to receive the ack
      • The value stored for the join key in the IdRegistry is an ID for the joiner
      • If the joiner retries, the IdRegistry will see that it’s from the same joiner and just ack the request (without actually changing anything)
  • EventStore
    • Stores the query log side of the join
    • CacheEventStore
      • In-memory store
      • It’s not perfect, but misses go to the LogsEventStore
    • LogsEventStore
      • While there is no global order for the data in the logs, we can approximate the position
      • A reader runs through the logs saving periodically saving the timestamp of an event to a map
        • This is done as part of filling the cache
      • This mapping is stored in BigTable with the key (ServerIP:ProcessID:Timestamp)
        • This assumes events are ordered per pid
      • When the store wants to read a query, it consults the mapping to find a place to start a range scan on BigTable
  • Design Lessons
    • Minimizing critical state in PaxosDB was important
    • IdRegistry can easily become a bottleneck
      • I wonder if instead of actively querying the IdRegistry for the first check if they could instead read a delayed log of the transactions
        • However that means they are basically acting as read-only delayed replicas
        • My guess is most of the load is actually in the Paxos latency