From Data Science at Scale with Python and Dask by Jesse C. Daniel
This article discusses Dask, how it compares to Apache Spark, and how to create and understand directed acyclic graphs using the example of the delicious Italian pasta dish bucatini all’Amatriciana.
There’s no argument that Apache Spark has become the most popular framework for scalable computing in recent years. Initially launched in 2010 as an in-memory alternative to the MapReduce processing engine for Apache Hadoop, it was able to capture a large base of users who were already using Hadoop for big data storage. Dask came along a few years later in 2014, and is maintained by Anaconda Inc. under direction of Matthew Rocklin, the architect and lead maintainer of the project. While still relatively new, Dask’s user base is rapidly growing and is gaining a lot of attention and support from the larger Python community. To that end, Dask offers some key advantages over Spark and other frameworks:
- Dask is written in Python and integrates well with other Python libraries
- Dask is designed to scale arbitrary code, leading to flexible support of complex algorithms
- Dask has a very low start-up and maintenance overhead
The first point, that Dask is a native Python library designed to live alongside other analytical libraries such as NumPy, Pandas, and Scikit-Learn, gives it an important advantage in adoptability. Because the skillset of a data scientist is expected to be very diverse, many data scientists in practice come from a wide variety of backgrounds and possess different levels of technical expertise. Python is widely recognized as both a notoriously easy language for beginners to learn and a very powerful, flexible language in the hands of experts. The very low language barrier makes it possible for even novice programmers to get up and running very quickly, and many data science courses taught both online and in universities include some level of Python instruction. One of the most popular Python libraries for data analysis and manipulation is Pandas. Pandas is very well suited for manipulating structured row-and-column data, and has an easy to use DataFrame API — in fact, as a reader of this book, you should already be quite familiar with the capabilities of Pandas. Dask also has a DataFrame API, which was cleverly designed to mimic much of the functionality of the Pandas API. Therefore, existing code that was developed against Pandas DataFrames can typically be ported into Dask with minimal effort.
The other major advantage of Dask’s Python origin is that there are no delays or dependencies when new functionality or enhancements are released. Because Spark’s primary development occurs in Java and Scala, new features and enhancements added to the Java and Scala APIs will typically take a few release cycles to be added to Spark’s Python API (PySpark). PySpark users may find that they need to eventually migrate their codebase to Scala or Java to get the most out of Spark. This can cause long-term issues with code maintenance if the data science teams don’t have an available contingent of skilled Scala or Java developers to call on when code breaks or requires enhancements. With Dask, there are no other languages to support, so organizations who have built a large base of Python applications do not need to worry about fragmented infrastructure.
The next case for Dask is that it was designed not only to handle analysis of large structured datasets, but also enable relatively easy construction of custom workloads that don’t “fit” into the traditional paradigm of structured data analysis. This makes it possible to scale and distribute any arbitrary piece of Python code so long as it can be expressed as a task graph (see the next section for more on task graphs and Dask’s specific implementation of directed acyclic graphs). This also means that Dask is very well suited to scale complex algorithms and simulations used in scientific computing as well as custom machine learning algorithms. The tradeoff for this flexibility is that Dask does not come with as many out-of-the-box optimizations as Spark. Because Spark comes with many well-defined object classes and methods that were designed with structured data analysis in mind, common operations like sorts, joins, and window functions have been thoroughly optimized to deliver the best performance when scaled across many machines. But Dask is far more transparent and granular in the way it constructs and organizes task graphs, so finding bottlenecks in workloads is not terribly difficult if you’re already familiar with standard code profiling techniques in Python.
The final point, that Dask has a very low start-up and maintenance overhead, makes it a very powerful tool for prototyping and rapidly deploying those prototypes into production. Although the Apache community has been hard at work the last couple of years making Spark easier to get up and running thanks to deployment tools like Ambari, the popularization of Docker, and Amazon Elastic MapReduce, administering the Hadoop ecosystem that Spark needs to run on can still be a daunting task. For comparison, installing Dask and adding it into your Python projects is incredibly simple. It is installed like any other Python library using conda, pip, or your choice of Python package manager. It also is very trivial to scale up Dask from a single machine to thousands, meaning Dask workloads can be developed and tested locally and shipped to large clusters with minimal effort. Paired with a containerization system like Docker, it can be very easy to expand and contract the size of clusters on the fly, making it also very easy to set up parallel development and production environments as needed. Even in projects where Spark became the final solution (mostly due to project requirements or political reasons), I have still found Dask very useful to help get in front of the necessary analysis while I wait for the Spark infrastructure to be stood up and configured, and it continues to be my go-to tool for developing proof-of-concepts and prototypes.
Dask uses directed acyclic graphs (DAGs) to handle task scheduling and distributed computations. It wouldn’t be possible to fully comprehend how Dask works without having a solid understanding of directed acyclic graphs, so let’s learn what directed acyclic graphs are, how they work, and some important terminology to remember.
Cooking with DAGs
Directed acyclic graphs come from a larger body of mathematics known as graph theory. Unlike what you may expect from the name, graph theory doesn’t have anything to do with pie charts or bar graphs. Instead, a graph, used in the context of graph theory, is defined as a representation of a set of objects which have some kind of relation to one another. Although this definition is vague and abstract, it means graphs are useful for representing a wide variety of information. Directed acyclic graphs have some special properties that give them a slightly narrower definition. But rather than continuing to talk about graphs in the abstract, let’s have a look at an example of using a directed acyclic graph to model a real process.
When I’m not busy writing, teaching, or analyzing data, I love cooking. To me, few things in this world can compare to a piping hot plate of pasta. And right up at the top of my all-time-favorite pasta dishes is bucatini all’Amatriciana. If you enjoy Italian cuisine, you’ll love the bite of thick bucatini noodles, the sharp saltiness of Pecorino Romano cheese, and the peppery richness of the tomato sauce cooked with guanciale and onion. But I digress! My intent here isn’t for you to drop the article and run to your kitchen. Rather, I want to explain how making a delicious plate of bucatini all’Amatriciana can be modeled using a directed acyclic graph. First, have a look at the instructions in Figure 1. We’ll work on converting those instructions to a DAG.
Cooking a recipe consists of following a series of sequential steps where raw ingredients are transformed into intermediate states until all the ingredients are ultimately combined into a single complete dish. For example, when you dice an onion, you start with a whole onion, cut it into pieces, and then you’re left with some amount of diced onion. In software engineering parlance, we’d describe the process of dicing onions as a function.
Dicing onions, although important, is a small part of the whole recipe. To complete the entire recipe, we must define many more steps (or functions). Each of these functions are called nodes in a graph. Because most steps in a recipe follow a logical order (for example, you wouldn’t plate the noodles before cooking them), each node can take on dependencies, which means that a prior step (or steps) must be complete before starting the next node’s operation. Another step of the recipe is to sauté the diced onions in olive oil, which is represented by another node. It’s impossible to sauté diced onions if you haven’t diced any onions yet! Because sautéing the diced onion is directly dependent on and related to dicing the onion, these two nodes are connected with a line.
Figure 2 represents a graph of the process described this far. Notice that the “Sauté Ingredients” node has three direct dependencies: the onion and garlic must be diced and the guanciale must be sautéed before the three ingredients can be sautéed together. Conversely, the “Dice Onion,” “Mince Garlic,” and “Heat Olive Oil” nodes don’t have any dependencies. The order in which you complete those steps doesn’t matter, but you must complete all of them before proceeding to the final sauté step. Also notice that the lines connecting the nodes have arrows as endpoints. This implies that there’s only one possible way to traverse the graph. It neither makes sense to sauté the onion before it’s diced, nor does it make sense to attempt to sauté the onion without a hot, oiled pan ready. This is what’s meant by a directed acyclic graph: there’s a logical, one-way traversal through the graph from nodes with no dependencies to a single terminal node.
Another thing you may notice about the graph in figure 2 is that there are no lines that connect later nodes back to earlier nodes. Once a node is complete, it’s never repeated or revisited. This is what makes the graph an acyclic graph. If the graph contains a feedback loop or some kind of continuous process, it’d instead be a cyclic graph. This wouldn’t be an appropriate representation of cooking, because recipes have a finite number of steps, a finite state (finished or unfinished), and deterministically resolves to a completed state, barring any kitchen catastrophes. Figure 3 demonstrates what a cyclic graph might look like.
From a programming perspective, this might sound like directed acyclic graphs won’t allow looping operations, but this isn’t necessarily the case: a directed acyclic graph can be constructed from deterministic loops (such as for loops) by copying the nodes to be repeated and connecting them sequentially. In figure 1, notice that the guanciale is sautéed in two different steps — first alone, then together with the onions. If the ingredients needed to be sautéed a non-deterministic number of times, the process couldn’t be expressed as an acyclic graph.
The final thing to note about the graph in figure 2 is that it’s in a special form known as a transitive reduction. This means that any lines that express transitive dependencies are eliminated. A transitive dependency means a dependency which is met indirectly through completion of another node. Figure 4 shows figure 2 redrawn without transitive reduction. Notice that a line is drawn between the node containing the operation “Heat Olive Oil” and “Saute Ingredients (8 minutes)”. Heating the olive oil is a transitive dependency of sautéing the onion, garlic, and guanciale because the guanciale must be sautéed alone before adding the onion and garlic. In order to sauté the guanciale, you must heat up a pan with olive oil first, such that by the time you’re ready to sauté all three ingredients together, you already have a hot pan with oil — the dependency is already met!
Figure 5 represents the full directed acyclic graph for the complete recipe. As you can see, the graph fully represents the process from start to finish. You can start at any of the red nodes because they don’t have dependencies, and you’ll eventually reach the terminal node labeled “Buon appetito!” As you look at this graph, it might be easy to spot some bottlenecks, and potentially reorder some nodes to produce a more optimal or time-efficient way of preparing the dish. For instance, if the pasta water takes twenty minutes to come to a rolling boil, perhaps you could draw a graph with a single starting node of putting the water on to boil. Then you wouldn’t have to wait for the water to heat up after already preparing the rest of the dish. These are great examples of optimizations that either an intelligent task scheduler or you, the designer of the workload, may come up with. And now that you have the foundational understanding of how directed acyclic graphs work, you should be able to read and understand any arbitrary graph — from cooking pasta to calculating descriptive statistics on a big data set.
Scale out, concurrency, and recovery
Up to this point, our example of cooking bucatini all’Amatriciana assumed that you were the sole cook in the kitchen. This might be fine if you’re only cooking dinner for your family or a small get together with friends, but if you needed to cook hundreds of servings for a busy dinner service in midtown Manhattan, you would likely reach the limits of your abilities very quickly. It’s now time to search for some help!
First, you must decide how you will handle the resource problem: should you upgrade your equipment to help you be more efficient in the kitchen, or should you hire more cooks to help share the workload? In computing, these two approaches are called scaling up and scaling out respectively. Just like in our hypothetical kitchen, neither approach is as simple as it may sound. In the next section, I’ll discuss the limitations of scale up solutions and how scale out solutions overcome those limitations.
Since Dask is a framework for scaling out complex problems, we’ll assume that the best course of action for our hypothetical kitchen is to hire more workers and scale out. Given that assumption, it’ll be important to understand some of the challenges that come with orchestrating complex tasks across many different workers. I’ll discuss how workers share resources, and how worker failures are handled later on.
Scale Up vs. Scale Out
Back to our hypothetical kitchen, you’re faced with the question of what to do now that you’re expected to feed a horde of hungry customers at dinner rush. The first thing you might notice is that as the volume of pasta you need to make increases, the amount of time that each step takes also increases. For example, the original recipe makes four servings and calls for ¾ cup of diced onions. This amount roughly equates to a single medium-sized yellow onion. If you were to make 400 servings of the dish, you would need to dice 100 onions. Assuming you can dice an onion in around two minutes, and it takes you 30 seconds to clear the cutting board and grab another onion, you would be chopping onions for roughly 5 hours! Forget the time it would take to prepare the rest of the recipe. By the time you merely finish dicing the onions, your angry customers would already have taken their business elsewhere. And to add insult to injury, you’d have cried your eyes dry from spending the last 5 hours cutting onions! There are two potential solutions to this problem: replace your existing kitchen equipment with faster, more efficient equipment (scale up) or hire more workers to work in parallel (scale out).
At first it might be tempting to choose scaling up. You would still ultimately be in charge of the whole process from start to finish. You wouldn’t have to deal with others’ potential unreliability or variation in skills, and you wouldn’t have to worry about bumping into other people in the kitchen. But eventually you would run into some physical limitations — perhaps there exists an onion chopping machine that can bulk chop onions in 1/10thof the time that it takes you to do it. This will suit your needs until you start scaling again. As your business expands and you start serving 800, 1,600, and 3,200 plates of pasta per day, you will start running into the same capacity problems you had before. There will come a time you will need to buy a new, faster onion chipping machine, but each time you buy a newer machine, it gets exponentially more expensive to develop and build. Eventually, your simple onion chopping machine will become highly specialized for your operation and require incredible feats of engineering to build and maintain. Ultimately, you will reach a point when further innovation is simply not tenable (at some point, the blades will have to rotate so quickly that the onion will just turn to mush!). By scaling out, you can avoid hitting these physical limitations.
Rather than attempt to improve on your own skills and abilities, you hire nine additional cooks to help share the workload. If all 10 of you focused 100% of your time and attention to the process of chopping onions, that five hours of work now comes down to a mere 30 minutes assuming you have equal skill levels. Of course, you would need to buy additional knives, cutting boards, and other tools, and you would need to provide adequate facilities and pay for your additional cooks, but in the long run this will be a more cost-effective solution than pouring money into development of specialized equipment. Not only can the additional cooks help you with reducing the time it takes to prepare the onions, but because they are non-specialized workers, they can also be trained to do all of the other necessary tasks. A special onion slicing machine, on the other hand, cannot be trained to boil pasta no matter how hard you may try!
Directed acyclic graphs are a great tool for planning and orchestrating complex tasks across a pool of workers. Most importantly, dependencies between nodes help ensure that the work will follow a certain order (remember that a node cannot begin work until all of its dependencies have completed), but there are no restrictions on how individual nodes are completed — whether by a single entity or many entities working in parallel. A node is a standalone unit of work, so it’s possible to subdivide the work and share it among many workers. This means that you could assign four cooks to chop the onions, while four other cooks sauté the guanciale and the remaining two cooks mince the garlic. As each cook completes their task, they can be assigned to work on the next available task. This also means that workers can work at different speeds without causing problems. Ultimately, so long as all 100 onions are chopped before moving on to the next step, it doesn’t matter if all four cooks each chop exactly 25 onions. Furthermore, if the cooks responsible for sautéing the guanciale finish before the other cooks are done chopping onions, they can help chop rather than standing idle. But because there are multiple workers available to work on different tasks, some form of supervision is necessary to ensure everything is completed in an orderly, efficient way. This is where intelligent task schedulers play an important role in scalable computing frameworks. Just like a sous chef manages the smooth operation of the kitchen and ensures the cooks aren’t slacking off during service, task schedulers distribute units of work to machines in an efficient manner and aim to minimize the worker pool’s idle time. Organizing execution of the graph between workers and assigning an appropriate number of workers to each task is crucial for minimizing the time it takes to complete the graph. Figure 7 depicts a possible way the original graph can be distributed to multiple workers.
Concurrency and resource management
More often than not, there are more constraints to consider than just the number of available workers. In scalable computing, these are called issues of concurrency. For example, if you hire more cooks to dice onions, but you only have five knives in the kitchen, only five operations that require a knife can be carried out simultaneously. Some other tasks may require sharing resources, such as the step that calls for minced garlic. Therefore, if all five knives are in use by cooks dicing onions, the garlic can’t be minced until at least one knife becomes available. Even if the remaining five cooks have completed all other possible nodes, the garlic mincing step becomes delayed due to resource starvation.
The other cooks are forced to remain idle until the onion dicing step is complete. When a shared resource is in use, a resource lock is placed on it, meaning other workers can’t “steal” the resource until the worker who locked the resource is finished using it. It would be quite rude (and dangerous) for one of your cooks to wrestle the knife out of the hands of another cook. If your cooks are constantly fighting over who gets to use the knife next, those disagreements consume time that could be spent working on completing the recipe. The sous chef is responsible for defusing these confrontations by laying the ground rules about who can use certain resources and what happens when a resource becomes available. Similarly, the task scheduler in a scalable computing framework must decide how to deal with resource contention and locking. If not handled properly, resource contention can be very detrimental to performance. But fortunately, most frameworks (including Dask) are pretty good at efficient task scheduling and don’t normally need to be hand-tuned.
Recovering from failures
Finally, no discussion of scalable computing would be complete without mentioning recovery strategies. Just like it’s difficult for a sous chef to closely supervise all of her cooks at once, it gets increasingly difficult to orchestrate distribution of processing tasks as the number of machines in a cluster grows in size. Since the final result consists of the aggregate of all the individual operations, it’s important to ensure that all the pieces find their way to where they need to go. But machines, like people, are imperfect and fail at times. There are two types of failures that must be accounted for: worker failure and data loss. For example, if you’ve assigned one of your cooks to dice the onions, and going into the third hour straight of chopping he decided he can’t take the monotony anymore, he might put down his knife, take off his coat, and walk out the door. You’re now down a worker! One of your other cooks will need to take up his place in order to finish dicing the onions, but thankfully you can still use the onions that the previous cook diced before he left. This is worker failure without data loss. The work that the failed worker completed does not need to be reproduced, so the impact to performance is not as severe. When data loss occurs, a significant impact to performance is much more likely. For example, your kitchen staff has completed all of the initial prep steps and the sauce is simmering away on the stove. Unfortunately, the pot is accidentally knocked over and spills all over the floor. Knowing that scraping the sauce off the floor and attempting to recover would violate all of the health codes in the book, you’re forced to remake the sauce. This means going back to dicing more onions, sautéing more guanciale, etc. The dependencies for the “Simmer Sauce” node are no longer met, meaning you have to step all the way back to the first dependency free node and work your way back from there. While this is a fairly catastrophic example, the important thing to remember is that at any point in the graph, the complete lineage of operations up to a given node can be “replayed” in the event of a failure. The task scheduler is ultimately responsible for stopping work and redistributing the work to be replayed. And because the DAG is universally known between all workers, the specific workers that completed the tasks before don’t need to be present to redo the tasks. For example, if the cook who decided to quit earlier had taken some diced onions with him, you would not need to stop the whole kitchen and redo everything from the beginning. You would just need to determine how many additional onions need to be diced and assign a new cook to do that work.
Hopefully you now have a good understanding of the power of DAGs and how they relate to scalable computing frameworks. These concepts will certainly come up again through this book since all of Dask’s task scheduling is based off the DAG concepts presented here. In the last section of this chapter, you will be introduced to a dataset that you will use throughout the book to learn about Dask’s operations and capabilities.
About the author:
Jesse Daniel has five years of experience writing applications in Python, including three years working with in the PyData stack (Pandas, NumPy, SciPy, Scikit-Learn). Jesse joined the faculty of the University of Denver in 2016 as an adjunct professor of business information and analytics, where he currently teaches a Python for Data Science course.
Originally published at freecontent.manning.com.