Acyclic DAGs and Elixir

Intro

A few years ago now (2016) I was talking to a friend, he was relating the difficulty he was having managing background job pipelines in his Rails application. The biggest issue was tracking and acting on the success/failure states from outside Sidekick while preserving the dependency structure of the DAG.

He was describing the work he’d done simplifying the graph into an algebra to help with the priority and status. I had been doing some work with Elixir at the time and couldn't help but start to think about the problem from that perspective. This lead me to build a small library in Elixir called Assembly Line.

In this post I'm going to attempt to describe the algorithmic logic while using concrete examples from the Assembly Line implementation.

Note: this is a bit of a consolidation/rewrite of two older posts on Svbtle from a few years ago: DAG Management in Elixir, Orchestrating Dependent Tasks with Elixir.

Approach

As I started to think about the problem I looked for a good way to translate a genric DAG into a structure that could represent both the order and the state. While the full DAG could potentially be large and complex while processing it we are really working with levels. With a level being a set of tasks that are blocking some other task. This lead to looking at the DAG as a stack. Which we can represent with a simple list composed of either single tasks or another list of single tasks.

The list structure gives two benefits. First, it clearly defines the scope of our parallel execution in that all tasks in a sublist can be performed simultaneously. Second, it makes determining what work needs to be done simple as we need only ever work off the head of the list.

Simple DAGs

A ----------|
            |----------> C ----------> D
B ----------|

Using the stack representation in the graph above could be represented as [[A, B], C, D] where the group [A, B] must complete before executing C but neither A nor B have any dependency on each other.

Complex DAGs

A ---------> B -----------|
                          |------------> D
             C -----------|

Lets say we have a slightly more complex graph which isn't balanced. If we were to try and handle this with our original list representation alone it wouldn't work. We don't have an elegant way to handle the dependency in the bigger branch while maintaining an effecient processing. We could use [A, [B, C], D] to ensure proper order however if A is slow that blocks processing of everything else until it finishes.

However if we make the datastructure recursive then we can maintain our efficiency while preserving our ordering.

inner = [A, B]
outer = [C, inner], D]

This pattern also holds for more complex subgraphs

A ------> B ------|
                  |----> F -----|
          C ------|             |
                                |------> H
          D ------|             |
                  |----> G -----|
          E ------|

Given the graph above we can still use the recursive strategy to process it efficiently:

g1_inner = [A, B]
g1 = [[C, g1_inner], F]
g2 = [[D, E], G]
pipeline = [[g1, g2], H]

Implementation

The Assembly Line implementation has three main abstractions which map to the above approach:

Job

A AssemblyLine.job is a representation of a task that needs to be performed, its parameters and if finished the outcome.

%Job{task: term, args: list, worker: term, result: term, queue: String.t}
JobQueue

The JobQueue is simply a list of AssemblyLine.Job structs, and or AssemblyLine.JobQueue.Server instances.

[[%Job{}, "inner"], %Job{}, %Job{}]
Server

The AssemblyLine.JobQueue.Server maps to a singleton list and is the basic building block for the recursive structure when more complex workflows are needed.

Server instances can be nested inside other server instances in order to build out a more complex workflow:

{:ok, inner} = AssemblyLine.Supervisor.start_queue("inner", [%Job{}, %Job{}])
{:ok, outer} = AssemblyLine.Supervisor.start_queue("outer", [[%Job{}, "inner"], %Job{}, %Job{}])

Processing a Queue

The AssemblyLine.JobQueue.Handler module holds logic for processing actual Queues. It does not work directly on a job queue structure rather it manages a Server struct. The light weight process based nature of the BEAM makes dealing with a recursive datastructe like this very natural and highly efficient. The Handler manages the distribution of the parallel work loads and tracks the status of individual jobs.