ARTICLE

Collective Communication Pattern: Improving Performance When Parameter Servers Become a Bottleneck

From Distributed Machine Learning Patterns by Yuan Tang

Manning Publications

--

In this article, we introduce the collective communication pattern, which is a great alternative to parameter servers when the machine learning model we are building is not too large without having to tune the ratio between the number of workers and parameter servers.

Take 35% off Distributed Machine Learning Patterns by entering fcctang into the discount code box at checkout at manning.com.

You might be aware that DevOps engineers, who support the distributed machine learning infrastructure for data scientists or analysts, often have a hard time figuring out a good ratio between the number of parameter servers and the number of workers for different machine learning applications.

For example, let’s assume there are three parameter servers and three workers in the model training component of our machine learning system, as shown in Figure 1. All of the three workers perform intensive computations asynchronously and then send the calculated gradients to the parameter servers to update the different partitions of model parameters.

Figure 1 Distributed model training component consists of three parameter servers and three worker nodes.

In reality, worker nodes and parameter servers are not one-on-one mapping, in particular if the number of worker nodes is different from the number of parameter servers. In other words, multiple workers may send updates to the same subset of parameter servers.

Now imagine if two workers finished calculating the gradients at the same time and they both wanted to update the model parameters stored in the same parameter server as shown in Figure 2.

Figure 2 Two of the worker nodes finish calculating gradients and want to push updates to the first parameter server at the same time.

As a result, the two workers are blocking each other from sending the gradients to the parameter server. In other words, the gradients from both worker nodes cannot be accepted by the same parameter server at the same time.

Problem

In this case, only two workers are blocking each other when sending gradients to the same parameter server which makes it hard to gather the calculated gradients on time and requires a strategy to resolve the blocking issue. Unfortunately in real-world distributed training systems where parameter servers are incorporated, multiple workers may be sending the gradients at the same time and thus there are many blocking communications that need to be resolved.

When the ratio between the number of workers and the number of parameter servers is not optimal, for example, many workers are sending gradients to the same parameter server at the same time, the problem gets even worse and eventually the blocking communications among different workers or parameter servers become a bottleneck. Is there a way to prevent this issue from appearing?

Solution

How would we approach this issue? In this situation, the two workers need to figure out an approach to continue. They would have to reconcile, decide which worker would take the next step first, and then take turns to send the calculated gradients to that particular parameter server one by one.

In addition, when one worker finishes sending gradients to update the model parameters in that parameter server, the parameter server then starts sending the updated model partition back to that worker so the worker has the most up-to-date model at hand to be fine-tuned when feeding it with the incoming data. If at the same time another worker is also sending calculated gradients to that parameter server as shown in Figure 3, then another blocking communication occurs and they need to reconcile again.

Figure 3 One worker is pulling updates while another worker is pushing updates to the same parameter server.

Unfortunately this time the reconciliation may not be so easy to resolve as the worker that is trying to send the calculated gradients may not have used the most updated model when calculating the gradients. This may be okay for cases where the differences between model versions are small but may cause a huge difference in the statistical performance of the trained model eventually.

For instance, if each parameter server stores different model partitions unevenly, e.g. the first parameter server stores two third of the model parameters as shown in Figure 4, then calculated gradients using this outdated model partition will have a huge impact on the final trained model. In such cases, we may want to drop its calculated gradients and let the other worker send the more update-to-date gradients to the parameter servers.

Figure 4 Example of imbalance model partitions where the first parameter server contains two thirds of the entire set of model parameters.

Now comes another challenge — what if those dropped gradients that we consider as outdated are calculated from a larger portion of the entire training data as illustrated in Figure 5 and it could take a long time to be re-calculated using the latest model partition? In this case, we’ll probably want to keep these gradients instead of dropping them so that we don’t waste too much time on re-calculating the gradients.

Figure 5 Example of training data where the second worker is trying to push gradients calculated from half of the training data.

In real-world distributed machine learning systems with parameter servers, we may encounter a lot of challenges and issues that cannot be resolved completely and when those happen we would have to reconcile and consider trade-offs among the approaches we take.

As the number of workers and parameter servers increases, the cost of reconciliation and communication required to pull and push model parameters among workers and parameter servers becomes non-trivial. The system will end up spending a lot of time communicating between nodes whereas we actually spent a very small amount of time on the computations among neural network layers.

Even though we may have gained a lot of experience understanding the trade-offs and performance differences when applying different ratios and computational resources for parameter servers and workers to our system, it still seems counter-intuitive and time-consuming to tune towards a perfect system.

There are even circumstances where some of the workers or parameters fail during training or the network becomes unstable that causes issues when nodes are communicating with each other to push and pull updates. In other words, the parameter server pattern may not be suitable for our particular use case due to lack of expertise or time to work with the underlying distributed infrastructure.

Is there any alternative to this problem? The parameter server pattern may be one of the only good options for large models but here for simplicity and demonstration purposes, let’s assume that the model size does not change and the whole model is small enough that could just fit in a single machine. In other words, each single machine has enough disk space to store the model.

With that assumption in mind, what would be an alternative to parameter servers if we just want to improve the performance of distributed training?

Now without parameter servers, there are only worker nodes where each worker node stores a copy of the entire set of model parameters, as shown in Figure 6.

Figure 6 Distributed model training component with only worker nodes where every worker stores a copy of the entire set of model parameters and consumes partitions of data to calculate the gradients.

How do we perform model training in this case? Let’s recall that every worker consumes some portions of data and calculates the gradients needed to update the model parameters stored locally on this worker node.

Once all of the worker nodes have successfully completed the calculation of gradients, we will need to aggregate all the gradients and make sure every worker’s entire set of model parameters is updated based on the aggregated gradients. In other words, each worker should store a copy of the exact same updated model. How do we aggregate all the gradients?

We are already familiar with the process to send gradients from one node to another, for instance, sending the calculated gradients from a worker node to a parameter server to update the model parameters in a particular model partition. In general, that process to transfer data from one process to another is called a point-to-point communication, as shown in Figure 7. Note that there’s no other process involved other than these two processes.

Figure 7 Example of point-to-point communication where the data is being transferred between these two processes. Note that there’s no other process involved other than these two.

Here in our situation point-to-point communication is not very efficient since there are only worker nodes involved and we need to perform some kind of aggregation on the results from all workers. Fortunately we can leverage another type of communication called collective communication that allows communication patterns across all processes in a group, which is composed of a subset of all processes. Figure 8 is an example of collective communication between one process and a group that consists of three other processes.

Figure 8 Example of collective communication between one process and a group that consists of three other processes.

In this case, each worker node carries the gradients and wants to send them to a group that includes the rest of the worker nodes so that all worker nodes will obtain the results from every worker.

For our machine learning models, we usually perform some kind of aggregate operation on all the received gradients before sending the aggregated result to all the workers. This type of aggregation is called reduce, which involves taking a set of numbers into a smaller set of numbers via a function. Examples of such reduce functions are finding the sum, maximum, minimum, or average of the set of numbers, in our case, the gradients we received from all the workers.

Figure 9 Example of an reduce operation with sum as the reduce function.

An example of a reduce operation is illustrated in Figure 9. Vectors v0, v1, and v3 in each of the processes in the process group which are then summed over to the first process via a reduce operation.

Once the gradients are reduced in a distributed fashion, we then send the reduced gradients to all the workers so that they are on the same page and can update the model parameters in the same way, which makes sure that they have the exact same models. This kind of operation is called broadcast operation that is used very often when performing collective communications. Figure 10 is an example of a broadcast operation to send a value to every process in the process group.

Figure 10 Example of a broadcast operation to send a value to every process in the process group.

Actually the combination of reduce and broadcast operations here is called allreduce, which reduces the results based on a specified reduce function and then distributes the reduced results to all processes, as shown in Figure 11, in our case, to all the workers so that the model stored in each worker is exactly the same and is the most up-to-date.

Figure 11 Example of an allreduce operation to reduce the results on each process in the group and then send the result to every process in the group.

Once we’ve finished a round of allreduce operation, we can then start the next round by feeding new data to the updated model, calculate gradients, and then perform allreduce operation again to gather all gradients from workers to update the model.

Let’s take a break and see what we have just accomplished. We’ve successfully leveraged the collective communication pattern that takes advantage of the underlying network infrastructure to perform allreduce operations for communicating gradients among multiple workers and allows us to train a medium-sized machine learning model in a distributed fashion. As a result, we no longer need parameter servers and thus there is no communication overhead among parameter servers and workers as we previously discussed.

Discussion

The collective communication pattern is a great alternative to parameter servers when the machine learning model we are building is not too large. As a result, there is no communication overhead among parameter servers and workers and it’s no longer necessary to spend a lot of efforts on tuning the ratio between the number of workers and parameter servers.

In other words, we can easily add additional workers to speed up the model training process without having to worry about the performance regression.

There’s still one potential problem that’s worth mentioning here though. After incorporating the collective communication pattern by applying the allreduce operation, each worker will need to communicate with all of its peer workers, which may slow down the entire training process if the number of workers becomes really large.

Actually since collective communications rely on we communication over the network infrastructure and we still haven’t fully leveraged all the benefits of that yet in allreduce operation.

Fortunately, there are better collective communication algorithms that could be used to update the model more efficiently. For example, in ring-allreduce algorithm where the process is similar to allreduce operation but the data is transferred in ring-like fashion without the reduce operation, each of the N workers only needs to communicate with two of its peer workers 2 * (N − 1) times to update all the model parameters completely. In other words, this algorithm is bandwidth-optimal, meaning that if the aggregated gradients are large enough, it will optimally utilize the underlying network infrastructure.

Exercises

  1. Do those block communications only happen among the workers?
  2. Is the model parameter updating process asynchronous or synchronous? In other words, does one worker’s work depending on another work?
  3. Can you represent an allreduce operation with a composition of other collective communication operations?

That’s all for this article. If you want to see more, check out the book on Manning’s liveBook platform here.

--

--

Manning Publications

Follow Manning Publications on Medium for free content and exclusive discounts.