pytorch on supercomputer

Some notes about running pytorch on supercomputer.

Issue and solution

It is hard to access the supercomputer with several GPU on each node, once you got these kind of opportunities, you try to utilize the power of parallel as much as possible.

This is the link that explains related question, we also explain it here in details.

We use the pytorch as a example to show how it works with the supercomputer. The good thing is that it provides a good abstraction to do the multinode processing. For example, this is the code we want to test and run it in multi gpu case:

# the source code come from here
# https://github.com/pytorch/examples/edit/master/distributed/ddp/example.py
import argparse
import os
import sys
import tempfile
from urllib.parse import urlparse

import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim

from torch.nn.parallel import DistributedDataParallel as DDP

class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)

def forward(self, x):
return self.net2(self.relu(self.net1(x)))

# this code is ok for one process 1 gpu
def demo_basic(local_world_size, local_rank):

# setup devices for this process. For local_world_size = 2, num_gpus = 8,
# rank 0 uses GPUs [0, 1, 2, 3] and
# rank 1 uses GPUs [4, 5, 6, 7].
n = torch.cuda.device_count() // local_world_size
# this devide id is in the scope of local domain
device_ids = list(range(local_rank * n, (local_rank + 1) * n))

print(
f"[{os.getpid()}] rank = {dist.get_rank()}, "
+ f"world_size = {dist.get_world_size()}, attachedDevice = {n}, device_ids = {device_ids} \n", end=''
)

print("use device", device_ids[0])

# should we move model to every device manually here
# if we use multiple gpu on one node?
model = ToyModel().cuda(device_ids[0])
ddp_model = DDP(model, device_ids)

loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(device_ids[0])
loss_fn(outputs, labels).backward()
optimizer.step()


def spmd_main(local_world_size, local_rank):
# These are the parameters used to initialize the process group
# is this the global rank?
env_dict = {
key: os.environ[key]
for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
}

if sys.platform == "win32":
# Distributed package only covers collective communications with Gloo
# backend and FileStore on Windows platform. Set init_method parameter
# in init_process_group to a local file.
if "INIT_METHOD" in os.environ.keys():
print(f"init_method is {os.environ['INIT_METHOD']}")
url_obj = urlparse(os.environ["INIT_METHOD"])
if url_obj.scheme.lower() != "file":
raise ValueError("Windows only supports FileStore")
else:
init_method = os.environ["INIT_METHOD"]
else:
# It is a example application, For convience, we create a file in temp dir.
temp_dir = tempfile.gettempdir()
init_method = f"file:///{os.path.join(temp_dir, 'ddp_example')}"
dist.init_process_group(backend="gloo", init_method=init_method, rank=int(env_dict["RANK"]), world_size=int(env_dict["WORLD_SIZE"]))
else:
print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
# it looks that gloo is good but nccl fail, not sure the reason
dist.init_process_group(backend="gloo")

print(
f"[{os.getpid()}]: world_size = {dist.get_world_size()}, "
+ f"rank = {dist.get_rank()}, backend={dist.get_backend()} \n", end=''
)

demo_basic(local_world_size, local_rank)

# Tear down the process group
dist.destroy_process_group()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# This is passed in via launch.py
parser.add_argument("--local_rank", type=int, default=0)
# This needs to be explicitly passed in
parser.add_argument("--local_world_size", type=int, default=1)
args = parser.parse_args()
# The main entry point is called directly without using subprocess
spmd_main(args.local_world_size, args.local_rank)

When we try to start it, we just need to use this:

#!/bin/bash
#$1 is the node rank id
#$2 is the master addr

python -m torch.distributed.launch \
--nnode=2 --nproc_per_node=4 --node_rank=$1\
--master_addr="$2" ./distributed4.py --local_world_size=4

The issue is that we need to provide a node rank id and a master addr. But for the supercomputer platform, it use the job manager tool such as the slurm, we do not know the node id before we actually allocate the resource (if we do not specify the master addr, it is 127.0.0.1 in default and the worker node can not communicate with the master node apparently in this case).

The idea is to write a adaptor program based on MPI, this MPI process is in chage of providing the node level information such as the node id and master addr, and then for every MPI process, it starts a torch.distributed.launch, this program is only responsible the things in one node, for example, the --nproc_per_node=4 represents that there are 4 processes will be started by torch.distributed.launch on this node. Here is the MPI program we write for start this bash scripts, it might be easier to use the python and mpi for python here:

#include <iostream>
#include <mpi.h>
#include <cstdlib>
#include <cstdio>
#include <iostream>
#include <memory>
#include <stdexcept>
#include <string>
#include <array>
#include <cstring>
#include <string>
#include <map>
#include <vector>


std::string exec(const char* cmd) {
std::array<char, 128> buffer;
std::string result;
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(cmd, "r"), pclose);
if (!pipe) {
throw std::runtime_error("popen() failed!");
}
while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) {
result += buffer.data();
}
return result;
}

int main(int argc, char *argv[]){
MPI_Init(&argc, &argv);
int rank, procs, noderank;

MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &procs);

if(argc!=2){
std::cout << "initrank cori|plmt";
exit(0);
}

std::string machine(argv[1]);
std::cout << "machine is " << machine << std::endl;

std::string ipcommand;
if(machine=="cori"){
ipcommand="ifconfig eth3 | egrep -o 'inet [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}' | cut -d' ' -f2";
}else if (machine=="plmt"){
ipcommand="ip address show nmn0 | egrep -o 'inet [0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}' | cut -d' ' -f2";

}else{
throw std::runtime_error("machine is cori or plmt");
}

std::string ipaddr = exec(ipcommand.c_str());

//compute node rank
//comparing ip
std::vector<char> packed_addresses(procs * 256);
char ipstr[256];

strcpy(ipstr, ipaddr.c_str());

MPI_Allgather(
ipstr,
256,
MPI_CHAR,
packed_addresses.data(),
256,
MPI_CHAR,
MPI_COMM_WORLD);

std::map<std::string, int> ipaddrMap;
int tempid=0;
for(int i=0;i<procs;i++){
char* addr = packed_addresses.data() + i * 256;
std::string tempstr=std::string(addr);
if(ipaddrMap.find(tempstr)==ipaddrMap.end()){
ipaddrMap[tempstr]=tempid;
tempid++;
}
}

for (auto it = ipaddrMap.begin(); it != ipaddrMap.end(); it++)
{
// get the node id
if(ipaddr==it->first){
noderank = it->second;
}
}


std::cout << " ip addr is " << ipaddr << "process rank is " << rank << " node rank is " << noderank << std::endl;

//get master ip
char masterAddr[128];
if(rank==0){
strcpy(masterAddr,ipaddr.c_str());
}

MPI_Bcast(masterAddr,128,MPI_CHAR,0,MPI_COMM_WORLD);

//std::cout << "rank is " << rank << " ip addr is " << ipaddr << " master ip is " << std::string(masterAddr)<< std::endl;

std::string pytorchcommand = "/bin/bash ./rundistributed.sh " + std::to_string(noderank) + " " + std::string(masterAddr);

std::cout << "pytorchcommand: " << pytorchcommand << std::endl;

system(pytorchcommand.c_str());

return 0;
}

We just parse the ip addr for specific network card and capture that ip addr. We then use MPI to allgather all addr and compute the node id based on a map (the allgather can guarantee the addr list in every rank is in same sequence).

Details of pytorch distributed run

It is still good to go over how pytorch mange the gpu resources. The common practice is let one GPU attach to one process, but there are different manners.

This readme provide a good explanation and how pytorch run things in an distributed way.

The local domain is for each node, and the global domain is for the whole space. In this example, the global id can be computed based on the node id and the local rank.

Other solutions

It seems that scontrol show hostname command can show all host names of a running job. If we put this command into a running job and print out hostnames into a file then we can not which host is allocated to the job during its execution. The etc file of the compute node of HPC plafrorm is ususlly well equipted about the mapping between the host name and the ip. Then we can give these hostnames to pytorch and they will know which one is the master which one is the worker accordingly. By this way, we do not need a separate MPI bootstrap program to identify the ip and send them to the pytorch.

The deepspeed framework is a good candidate to make the pytorch run in a distributed way more easily.

Other thoughts

Although we can use the multiple GPU on supercomputer, but for researcher that focuse on AI, they still prefer to do things on one node that contains multiple GPU, at least, they do not wait for a long queue to get avalible resources. Sometimes it needs more then one day to get avalible resources. For AI researchers, they more focuse on the accuracy of their algorithm instead of the train time. They would reather use few GPU card to run their code and debug it conveniently instead of waiting for a long queue to get one experiment result.

推荐文章