I am trying to spawn a couple of process using pytorch's multiprocessing module within a openmpi distributed back-end. What I have is the following code:
def run(rank_local, rank, world_size, maingp):
print("I WAS SPAWNED ", rank_local, " OF ", rank)
tensor = torch.zeros(1)
tensor += 1
if rank == 0:
tensor += 100
dist.send(tensor, dst=1)
else:
print("I am spawn: ", rank, "and my tensor value before receive: ", tensor[0])
dist.recv(tensor, src=0)
print("I am spawn: ", rank, "and my tensor value after receive: ", tensor[0])
if __name__ == '__main__':
# Initialize Process Group
dist.init_process_group(backend="mpi", group_name="main")
maingp = None #torch.distributed.new_group([0,1])
mp.set_start_method('spawn')
# get current process information
world_size = dist.get_world_size()
rank = dist.get_rank()
# Establish Local Rank and set device on this node
mp.spawn(run, args=(rank, world_size, maingp), nprocs=1)
I run this code using the openmpi as follows:
mpirun -n 2 python code.py
So my understanding is that mpirun creates two process with ranks [0, 1], each of these process spawn new process with their local rank as 0. Now if I want to communicate between these two sub-processes of the main process I get some Traceback and following error:
-- Process 0 terminated with the following error:
Traceback (most recent call last):
File "/home/usama/anaconda3/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 19, in _wrap
fn(i, *args)
File "/home/usama/code/test/code.py", line 19, in run
dist.send(tensor, dst=1)
File "/home/usama/anaconda3/lib/python3.6/site-packages/torch/distributed/distributed_c10d.py", line 666, in send
_check_default_pg()
File "/home/usama/anaconda3/lib/python3.6/site-packages/torch/distributed/distributed_c10d.py", line 191, in _check_default_pg
"Default process group is not initialized"
AssertionError: Default process group is not initialized
My question is how do I make these sub-processes to be able to communicate i.e the [0, 0] process sending something to [1, 0] process. Any ideas?
Sometimes our questions become too restrictive due to premature optimization, like the choice of MPI backend in this case... it may be actually impossible, given that the popular distributed training framework Ray, which supports the other two backends, NCCL and Gloo, does not support MPI, see its code:
RuntimeError for Backend.MPI
An example of using Ray for distributed training of PyTorch models with backends other than MPI (source):
import pytorch_lightning as pl
from ray_lightning import RayPlugin
# Create your PyTorch Lightning model here.
ptl_model = MNISTClassifier(...)
plugin = RayPlugin(num_workers=4, num_cpus_per_worker=1, use_gpu=True)
# If using GPUs, set the ``gpus`` arg to a value > 0.
# The actual number of GPUs is determined by ``num_workers``.
trainer = pl.Trainer(..., gpus=1, plugins=[plugin])
trainer.fit(ptl_model)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With