Message Exchanging Example

Before we work on training a machine learning model across multiple nodes, I would first give you an idea of how to use PyTorch to communicate between nodes. As an introduction for writing distributed PyTorch applications across multiple nodes, let’s walk through an example of message exchanging on multiple nodes with PyTorch DDP. The code example (found in your downloaded files or on GitHub) is based on this PyTorch tutorial.

If we launch the Python script on 2 nodes on Frontera rtx nodes, each equipped with 4 GPUs,

WORLD_SIZE, which is total number of GPUs in your job, is 8,

GLOBAL_RANK, which is the ID for the GPU in your cluster, is [0, 1, 2, 3, 4, 5, 6, 7].

LOCAL_RANK, which is the local ID for the GPU in a node, is [0, 1, 2, 3].

Image source: [2]
Image source: [2]

Let’s assume we have these variables set for us already.

Communication Backend for torch.distributed

torch.distributed supports several built-in backends.

In our code we specified our communication backend to be “NCCL” dist.init_process_group(backend="nccl").

The NVIDIA Collective Communication Library (NCCL) implements multi-GPU and multi-node communication primitives optimized for NVIDIA GPUs and Networking. NCCL provides routines such as all-gather, all-reduce, broadcast, reduce, reduce-scatter as well as point-to-point send and receive that are optimized to achieve high bandwidth and low latency over PCIe and NVLink high-speed interconnects within a node and over NVIDIA Mellanox Network across nodes.

If we were running our application on hardware equipped with CPU only, the NCCL backend, which is exclusively developed for NVIDIA GPUs, would not work. We would need to change to the ‘gloo’ backend for communication between CPUs dist.init_process_group(backend="gloo").

Launching Multi-node Distributed Training: Now to set the environment variables

To set the environment variables including WORLD_SIZE, GLOBAL_RANK and LOCAL_RANK, we need to use mpirun and torchrun together.

Torchrun

To run multi-node jobs, we need to torchrun on each node. A torchrun command needs these environment variables to be set:

nproc-per-node: Number of workers per node

nnodes: Number of nodes

node-rank: Rank of the node for multi-node distributed training

master-addr: Address of the master node (rank 0). Here it is the hostname of rank 0.

master-port: Port on the master node (rank 0) to be used for communication during distributed training.

torchrun will create WORLD_SIZE, GLOBAL_RANK and LOCAL_RANK for each worker.

If we want to launch our job on two nodes (c000-001 and c000-002), we need to ssh into the two nodes and run the following commands:

The above example is suitable for the TACC Frontera machine, where each rtx node is equipped with 4 GPUs (nproc_per_node is 4).

Set Environment Variable with mpirun and Slurm

The torchrun launcher is all we need to launch a script on multiple nodes. Why do we want to use it in combination with mpirun and Slurm? We have to ssh on to each machine and manually modify the launch command on each node, and that is error-prone. With the help of mpirun and Slurm, we can launch the same job with only one command.

Message Passing Interface (MPI)

The Message Passing Interface (MPI) is a standardized and portable message-passing standard designed to function on parallel computing architectures [1]. Here we use MPI as a frontend for launching the job, communicating node information.

mpirun command

Open MPI is a MPI library project. The mpirun command is used here to execute parallel jobs in Open MPI.

We use an mpirun command line of the following form:

% mpirun [-np number-of-processes] [-ppn processes-per-node]

This command will run number-of-processes copies of <program>.

In the command line above:

-np sets the number of MPI processes to launch.

-ppn sets the number of processes to launch on each node.

As a concrete example, to run message_passing.sh on 2 nodes, we would use this command:

e run our application on 2 nodes through Jupyter Notebook application on TAP. Each compute node is equipped with 4 GPUs.
We run our application on 2 nodes through Jupyter Notebook application on TAP. Each compute node is equipped with 4 GPUs.

After we start the MPI application with the mpirun command, the command sets a number of built-in environment variables for us. One of the environment variables set is PMI_RANK. It is the rank of the current process among all of the processes that are created by the mpirun command. We will set node-rank argument for torchrun, which is the rank of the node for multi-node distributed training, as PMI_RANK.

Slurm

Slurm, or Simple Linux Utility for Resource Management, is a High Performance Computing (HPC) Job Scheduler that helps manage and allocates compute resources to make sure access is distributed fairly between users.

We use the Slurm command scontrol show hostnames to print hostnames for the job. With a list of host names, we can obtain the number of nodes for the job.

Finally, we have our script that can be launched with mpirun command from the master node. The “PRELOAD” part of the script launches a container with a Python environment set up for this tutorial. Containers allow you to package an application along with all of its dependencies, so they are very handy when we run code on HPC platforms like TACC. This tutorial will not go into details about containers, but TACC has tutorials available if you are interested in containers @ TACC.

Our mpirun command

launches 2 MPI processes, with 1 process launched on each one of the 2 nodes. Let’s say process 0 runs on node A, and process 1 runs on node B. Each process runs the message_passing.sh script.

In the message_passing.sh script, on accessing PMI_RANK, node A gets value 0, and node B gets value 1.

with

command, we get a list of host names. A concrete example of the output from this command is

This suggests we have a total of 2 nodes in this job, their host names are c000-001 and c000-002. We set the master address for torchrun command as c000-001.

Let’s assume node A with rank 0 uses host name c000-001. Node B with rank 1 uses host name c000-002. The full command launched on node A is:

/opt/apps/tacc-apptainer/1.3.3/bin/apptainer exec --nv --bind /run/user:/run/user /scratch1/07980/sli4/containers/cnn_course_latest.sif python3 -m torch.distributed.run --nproc_per_node 4 --nnodes 2 --node_rank=0 --master_addr=c000-001 --master_port=1234 message_passing.py

And the full command launched on node B is:

/opt/apps/tacc-apptainer/1.3.3/bin/apptainer exec --nv --bind /run/user:/run/user /scratch1/07980/sli4/containers/cnn_course_latest.sif python3 -m torch.distributed.run --nproc_per_node 4 --nnodes 2 --node_rank=1 --master_addr=c000-001 --master_port=1234 message_passing.py

Launching the code with mpirun -np 2 -ppn 1 message_passing.sh command, we get an output like this:

The above message exchanging example with PyTorch DDP showed us how to write distributed PyTorch application across multiple nodes. With the knowledge regarding torchrun and environment variables, we are almost ready for our goal for this tutorial: train a machine learning model across multiple nodes. Before we start to work on the code for training our model, I would like to mention that the point-to-point communication used in the example is different from the collective communication used in PyTorch DDP.

Point-to-Point Communication vs. Collective Communication

The above example uses point-to-point communication through send and recv functions. Point-to-point communication is useful when we want more fine-grained control over the communication of our process.

Image source: [3]

Remember PyTorch’s DistributedDataParallel uses collective communications instead of point-to-point ones: all gradients are synchronized by averaging gradients from each GPU and sent back to the individual GPUs via an Allreduce operation. The Allreduce collective communication applies on every tensor and stores results in all processes in the process group.

Image source: [4]
 
© Chishiki-AI  |   Cornell University    |   Center for Advanced Computing    |   Copyright Statement    |   Access Statement
CVW material development is supported by NSF OAC awards 1854828, 2321040, 2323116 (UT Austin) and 2005506 (Indiana University)