Skip to content
34 changes: 34 additions & 0 deletions distributed/ddp-tutorial-series/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ Code for the DDP tutorial series at https://pytorch.org/tutorials/beginner/ddp_s

Each code file extends upon the previous one. The series starts with a non-distributed script that runs on a single GPU and incrementally updates to end with multinode training on a Slurm cluster.

## Dependencies

1. nccl
1. https://github.com/NVIDIA/nccl
2. https://github.com/NVIDIA/nccl-tests
2. torch>=1.11.0


## Files
* [single_gpu.py](single_gpu.py): Non-distributed training script

Expand All @@ -16,6 +24,32 @@ Each code file extends upon the previous one. The series starts with a non-distr
* [slurm/config.yaml.template](slurm/config.yaml.template): configuration to set up an AWS cluster
* [slurm/sbatch_run.sh](slurm/sbatch_run.sh): slurm script to launch the training job

## Create Virtual Environment

```shell
$ python -m venv </path/to/new/virtual/environment>
$ source </path/to/new/virtual/environment>/bin/activate
```

## Run commands

* [single_gpu.py](single_gpu.py):
```shell
$ python single_gpu.py 50 10
```

* [multigpu.py](multigpu.py):

```shell
$ python multigpu.py 50 10
```


* [multigpu_torchrun.py](multigpu_torchrun.py):
```shell
$ torchrun --standalone --nproc_per_node=gpu multigpu_torchrun.py 50 10
```

* [multinode.py](multinode.py): DDP on multiple nodes using Torchrun (and optionally Slurm)

TODO
3 changes: 2 additions & 1 deletion distributed/ddp-tutorial-series/datautils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import torch
from torch.utils.data import Dataset


class MyTrainDataset(Dataset):
def __init__(self, size):
self.size = size
Expand All @@ -10,4 +11,4 @@ def __len__(self):
return self.size

def __getitem__(self, index):
return self.data[index]
return self.data[index]
7 changes: 6 additions & 1 deletion distributed/ddp-tutorial-series/multigpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset
from icecream import ic

# --- Additional modules required for Distributed Training
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
# ---


def ddp_setup(rank, world_size):
Expand Down Expand Up @@ -46,7 +49,7 @@ def _run_batch(self, source, targets):

def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batch size: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
source = source.to(self.gpu_id)
Expand Down Expand Up @@ -84,6 +87,8 @@ def prepare_dataloader(dataset: Dataset, batch_size: int):


def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
ic(rank, world_size)

ddp_setup(rank, world_size)
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
Expand Down
14 changes: 11 additions & 3 deletions distributed/ddp-tutorial-series/multigpu_torchrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset

import torch.multiprocessing as mp
# --- Additional modules required for Distributed Training
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
# ---

from utils import print_nodes_info


def ddp_setup():
init_process_group(backend="nccl")
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))


class Trainer:
def __init__(
self,
Expand Down Expand Up @@ -52,7 +56,7 @@ def _run_batch(self, source, targets):

def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batch size: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
source = source.to(self.gpu_id)
Expand Down Expand Up @@ -107,5 +111,9 @@ def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()


# --- Print the environment variables
print_nodes_info()
# ---

main(args.save_every, args.total_epochs, args.batch_size)
12 changes: 10 additions & 2 deletions distributed/ddp-tutorial-series/multinode.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset
from utils import print_nodes_info

# --- Additional modules required for Distributed Training
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
# ---


def ddp_setup():
init_process_group(backend="nccl")
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))


class Trainer:
def __init__(
self,
Expand Down Expand Up @@ -53,7 +57,7 @@ def _run_batch(self, source, targets):

def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
print(f"[GPU{self.global_rank}] Epoch {epoch} | Batch size: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
source = source.to(self.local_rank)
Expand Down Expand Up @@ -108,5 +112,9 @@ def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()


# --- Print the environment variables
print_nodes_info()
# ---

main(args.save_every, args.total_epochs, args.batch_size)
4 changes: 3 additions & 1 deletion distributed/ddp-tutorial-series/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
torch>=1.11.0
torch>=1.11.0
numpy
icecream
1 change: 1 addition & 0 deletions distributed/ddp-tutorial-series/run_multigpu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
python multigpu.py 50 10
1 change: 1 addition & 0 deletions distributed/ddp-tutorial-series/run_multigpu_torchrun.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
torchrun --standalone --nproc_per_node=gpu multigpu_torchrun.py 50 10
1 change: 1 addition & 0 deletions distributed/ddp-tutorial-series/run_single_gpu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
python single_gpu.py 50 10
4 changes: 2 additions & 2 deletions distributed/ddp-tutorial-series/single_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int,
save_every: int,
save_every: int,
) -> None:
self.gpu_id = gpu_id
self.model = model.to(gpu_id)
Expand All @@ -28,7 +28,7 @@ def _run_batch(self, source, targets):

def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batch size: {b_sz} | Steps: {len(self.train_data)}")
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
Expand Down
10 changes: 10 additions & 0 deletions distributed/ddp-tutorial-series/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import os

from icecream import ic


def print_nodes_info():
Node = os.environ['LOCAL_RANK']
ic(Node, os.environ['LOCAL_RANK'], os.environ['RANK'], os.environ['WORLD_SIZE'], os.environ['MASTER_ADDR'],
os.environ['MASTER_PORT'])
print()