Pregel: A System for Large-Scale Graph Processing - Google, 2010

“Graph algorithms often exhibit poor locality of memory access, very little work per vertex, and a changing degree of parallelism over the course of execution.”

  • Overview of a Pregel Computation
    • A sequence of iterations called supersteps
      • Execute the user function on each vertex
        • This function can send messages to any other vertex (to be read in the next superstep),
        • read messages from other vertexes (from the previous superstep),
        • add or remove out edges,
        • mutate a vertex state,
        • and mutate out-edge state
    • At the end of each user function, the vertex can vote to halt and deactivates itself
      • If a vertex receives a message it is reactivated
      • Once all vertexes have voted to halt and there are no messages in transit, the computation halts
  • The graph is directed, edges are stored with the source vertex
  • The output of a computation is the values output by the vertexes
    • Often this is a smaller directed graph (i.e. a small cluster or MST)
    • Could also be a few aggregate values
  • Message Passing instead of Remote Reads
    • Sufficiently Expressive
    • Better Performance
    • They can asynchronously batch messages
      • I note that they can also be streaming out messages while still running vertex programs on other parts of the partition
  • API Notes
    • Combiners, like a MapReduce combiner, they can mitigate the overhead of message passing for some computations by having a combiner
    • Aggregators
      • Useful for graph wide statistics
  • Mutating the Graph
    • They need to be careful with how they deal with mutations that change the same part of the graph
    • They use a specific ordering:
      1. Edge Removals
      2. Vertex Removals
      3. Vertex Additions
      4. Edge Additions
      5. Then Compute(), the vertex program
    • They provide mechanisms to override the behavior when there are multiple requests to create the same vertex
  • Partitioning
    • Assignment is based on Vertex ID
    • Users can replaces the basic hash(id) mod N partitioner
    • Some users partition the graph so that vertices from the same web site are located on the same partition
      • I wonder how they deal with large websites like Wikipedia
      • Perhaps they don’t all go to the same shard, or maybe they can special case a small set of large sites
  • Cluster Architecture
    • They launch Pregel on a set of machines, one of which is designated as the master
      • Probably through Chubby or ZooKeeper style node ordering
    • The rest send registration messages to the master
    • Master assigns one or more partitions to each worker
    • Each worker knows about the partitioning layout
    • Master assigns a part of the input to each worker
      • The workers enqueue vertices that are meant to go to other workers
      • I wonder if it might be faster to prepartition the graph
    • During each superstep, messages are sent asynchronously to overlap computation and network usage
  • Fault Tolerance
    • Checkpointing is the main mechanism
    • Before a superstep, the master tells all workers to save their state (including incoming messages) to a persistent store (probably GFS)
    • Master actively pings workers (not the other way around)
      • If the worker does not receive a ping, it kills itself
      • The master resigns the partitions to other nodes
        • All nodes restart their work from the last checkpoint
        • They can’t just resume the failed partitions
      • If this was a common problem, I think all messages could be logged and then replayed on the restarted nodes. However, there is a high cost associated with that persistence and you need to be careful about nondeterministic compuations
        • Woah, I wrote that idea a several days after reading the paper for the first time. Turns out that is exactly what Pregel does
        • Their implementation cleverly uses local disk. It’s fast and since we only need to access local disk for the nodes that are still up!
        • The authors note that many algorithms can be made deterministic, even random ones (deterministic seed)
        • They call their technique confined recovery
  • Worker Notes
    • Messages for remote vertexes are buffered
      • Once the buffer reaches a specified size, the buffer is flushed asynchronously (as a batch)
  • Master Notes
    • The datastructures for the master is proportional to the partition count, not the size of the graph
  • Aggregators
    • Instead of bombarding the master with all of the intermediate values, the aggregation messages are sent in a tree structure
      • The authors note that this makes use of many CPUs for the reduction
    • The value is sent to the master at the start of the next superstep
    • They do not discuss how the create this tree nor how failures are detected
      • I wonder if they just use the node numbers to determine the place in the tree (similar to how we can represent a binary tree in an array)