Deferred Execution Models

As a Google Intern at Hyderabad, I got introduced to a couple of really cool libraries (TensorFlow and FlumeJava) that use the so called Deferred Execution Model. I found some very interesting similarities between the two libraries and in this blog post, I'll seek to discuss on how deferred execution works by taking instances from the two libraries. I'll also discuss the pros and cons of Deferred Execution.

What is Deferred Execution ?

The term Defer simply means to postpone. This is exactly what happens in Deferred Execution too. 
Every variable or operation is just deferred on a dependency graph. The dependency graph is a directed acyclic graph. Each node in the graph represents either a variable or an operation. Each edge in the graph represents the dependency of a variable or an operation on another. This dependency graph can be executed starting from any node that you specify. Execution happens in such a way that, all dependency nodes (the nodes that the selected node would depend on) execute first and this follows up until finally your chosen node is executed. On the dependency graph, it's more like, the children nodes are recursively executed first in a bottom-up fashion. Algorithmically, this is just a Depth First Search starting from the node that you specify on the dependency graph, in which the nodes execute on post visit.

For example, the binary add operation depends on 2 numbers (variables). So, in this case, there would be 3 nodes, one for a variable representing the first number, one for a variable representing the second number and then the add operation. There would be 2 edges, one between the first variable and the add operation, the other between the second variable and the add operation. Here, the edges simply mean that the add operation depends on each of the variables.


Let's  dive deeper by taking examples from Tensorflow and FlumeJava.


Deferred Execution in Tensorflow

Tensorflow Basics

In Tensorflow, the dependency graph is formed by Operations and Tensors . A Tensor simply encapsulates a variable that is either explicitly specified or is returned by an operation. An Operation in Tensorflow simply takes input tensors and returns output tensors after some computation. These elements are essentially just deferred on a dependency graph and would be executed as per developer's requirement.

Lets run through some examples.

I'll start with something as trivial as a binary add operation. I'll juxtapose immediate execution (more like an antonym for deferred execution) and deferred execution, so that you can get some clarity. In Python3, a simple binary add operation boils down to just 2 or 3 lines of code. So, let's directly jump into the execution diagram for immediate execution.




Then let's see the case for deferred execution.

Here's the Tensorflow code for a simple add operation :




As you can see, that Python3 code snippet when executed would just give an output of "11".

Here's the dependency graph for the above code :


The explanation for the above graph is same as what we saw earlier. The binary add operation depends on 2 variables and they're represented here as 2 different Tensors.

The dependency graph for that example is quite simple. While working on large scale machine learning projects, it'd be better if there's a tool to help us visualize the dependency graph. Tensorflow actually has this built-in tool named Tensorboard, which helps you visualize the graphs. Tensorboard comes as a command-line tool along with the pip installation of Tensorflow. In order to visualize the graphs, all you need to do is use tf.summary.FileWriter() to log the graph of the current session. In the above code, the 2 lines involving the writer identifier would log the graph of the current session into the logs as specified in the path. Here is the graph simulation from Tensorboard :

(The nodes Variable and Variable_1 were just expanded further in the above graph)

Here's  a more complex example in which a 2 hidden layered neural network is used to train a bunch of images from the MNIST training set and then one test image is retrieved from the dataset to check the accuracy.




And here's your Tensorboard graph :



The "gradients" node in the graph just computes the gradients and exchanges the required Tensor with the operation after evaluation. This node is implicitly created by Tensorflow.

If you notice in the above code, for running the neural network on test data, I've used just sess.run(pred). This means that, it'd skip the part of the graph above fully_connected_2 in the diagram. This is a prime advantage of deferred execution. If not deferred execution, bringing in such abstractions could certainly be painstaking.

More insight into execution of nodes in Tensorflow

The line(s) with sess.run([node1,....]) in Tensorflow would execute the dependency graph up until the specified nodes, which we saw earlier. Now, this would execute only the nodes that are required to be executed and skip the rest of those that wouldn't be a part of the graph path constructed using the specified nodes and their children. In a normal single CPU/GPU machine this would certainly run sequentially. In order to incur parallelism, some infrastructure set up is certainly required. Tensorflow wouldn't implicitly build strategies to optimize the graph execution for parallelism. The developer will have to explicitly allocate workers or GPUs in order to incur batch processing. Here are some useful links to run Tensorflow using multiple devices :




Deferred Execution in FlumeJava

FlumeJava Basics

FlumeJava (let's call it "Flume" from now on) runs on Google's infrastructure and was developed by Google researchers in the year 2010. Flume (not to be confused with Apache Flume) was primarily built for developing efficient data processing pipelines. The pipeline can consist of several MapReduces and Flume gives you very high level abstractions for managing such pipelines. You can think of Flume as a wrapper over Google's MapReduce tool and with additional functionality. Flume has a comprehensive documentation written in cloud.google.com.

Similar to Tensorflow, elements are deferred here as well. Individual deferred elements in Flume are called PObjects. For example, if you want a Long value to be deferred, you should represent it as PObject<Long>  in Java (PObject in general where, T is type of the deferred object). Most use cases of Flume involve deferring a Java collection and the deferred collections here are called PCollections. They're represented as PCollection<T> where T is the type of each element in the collection. The PCollections aren't in memory as the regular Java collections. They're rather distributed. PCollections are usually read from sources like files or databases.

The simplest way to batch process the data in Flume is by using a DoFn. DoFn is simply a class which has an overridable void method named process. This is the method where you should write your computation code for one single element in the collection. In a DoFn, Flume creates multiple workers and assigns all of them to do the computation simultaneously. The DoFns are usually invoked inside a method called parallelDo()  , which built into every PCollection. The DoFns can also be instantiated anonymously inside the parallelDo() function.

Flume also provides better abstractions on top of DoFn like MapFn and FilterFn, which are just special cases of DoFn.

Here's some sample Java code that uses Flume :



Here's the dependency graph for the above code (only for the part inside    getWordsFromFileSystem()   function as that's the only thing that composes the graph nodes ) :





[I don't have a lot of code to showcase here as I haven't used this technology outside Google]

For more information about the API, please refer to the following links :


More insight into execution of nodes in FlumeJava

In Flume, the analogy to sess.run([node1,....])  is :

FlumeJava.run(PCollection1, PObject1, .....) .

Unlike Tensorflow, the value of the collection or object wouldn't be explicitly returned by FlumeJava.run(). Instead, Flume would just materialize them. This means that, the computation is done and they can brought down to an in-memory variable or collection if required.

Execution in Flume happens in a much more sophisticated way as compared to Tensorflow. This is mainly because, Flume uses Google's infrastructure for computation. In Flume, when you execute a PObject or PCollection using
FlumeJava.run(PCollection, PObject.....)  , it'd automagically figure out the best possible strategy to run all the nodes in the dependency graph. Flume's strategies are so versatile that it even combines multiple operations (DoFns)  into one for optimality.

If we take a deeper look at Flume's strategizing, we'll be able to find out that by strategizing, Flume is trying to solve the problem of DAG Scheduling. Now, for a variable number of processors, this is an NP-Complete problem. Hence, Flume technically wouldn't give you the most optimal strategy to execute a given graph. Yet, this is much much more optimal and efficient than the regular sequential execution.



Analogy between the two libraries


 Tensorflow FlumeJava
 tensorflow.Session.run(...)FlumeJava.run(...) 
 Tensor PObject / PCollection 
tensorflow.Operation(...)

[ Any operation in Tensorflow ]
DoFn.class

[ Any map reduce operation in Flume]




By now, you'd certainly have a decent idea on how deferred execution works. Let's discuss some merits and demerits of deferred execution.


Why use Deferred Execution ?

  • We saw that all dependency nodes would be executed before your specified node in the dependency graph. This means that, only the required graph paths which contains all the dependency nodes need to be executed and the rest of it can be skipped. This certainly saves a lot of computation cost and has a positive impact on the efficiency.
  • Deferred execution certainly prevents a lot of messy implementations. Your main focus here would just be on the dependency graph and the rest of it is just about playing around with the output of the graph run. 
  • Deferred execution also gives a larger scope for optimizing execution and inducing parallelism. If you're a big organization and if you have decent infrastructure set up and you're using Tensorflow, you can certainly abstract out the allocation of workers and optimizing execution for parallelism and reuse the abstraction. Flume exemplifies this advantage of deferred execution by automatically strategizing before execution. 

Cons of Deferred Execution

  • When you're just starting to learn about deferred execution, you might find it a little difficult to get acquainted to this, at least initially. It'd certainly take some effort to get used to and develop stuff using deferred execution. 
  • Logical errors in this case may sometimes be harder to debug. If the program follows immediate execution, one simply has to insert debug statements in between to trace the control flow. It isn't that straightforward in deferred execution. Debug statements wouldn't even be possible here in the cases where deferred elements are involved. 
    • For eg., in the neural network example above, there isn't any point in inserting debug statements in the train() function. This is because, it isn't this function that is called repeatedly. You'd rather repeatedly execute the deferred operations like loss minimization that are present in the function. 

Comments

Popular posts from this blog

Introduction