分布式网络测试

https://discuss.pytorch.org/t/questions-on-underlying-port-restrictions-in-nccl-gloo-communication/171573/17

In a distributed training system where you have two nodes, each with 8 GPUs, and you are using a framework like TensorFlow with NVIDIA's NCCL library for inter-GPU communication, the communication setup typically involves each process (or rank) having its own unique port for communication. This is necessary to establish individual connections between each pair of GPUs across different nodes.

Here's how the port configuration generally works in this scenario:

  1. Unique Ports per Pair of GPUs:

    • Each GPU on a node communicates with a corresponding GPU on the other node. For example, GPU 0 on Node A communicates with GPU 0 on Node B, GPU 1 on Node A communicates with GPU 1 on Node B, and so on.
    • To facilitate this communication, each pair of GPUs (across nodes) will use a specific port number for NCCL communication. These port numbers are typically unique for each pair.
  2. Port Configuration:

    • When launching your distributed training job, you would specify the port numbers to be used for NCCL communication.
    • Each process (or rank) participating in the training job will be configured with its own NCCL port number. For example, if you have 8 processes (one for each GPU) on Node A and 8 processes on Node B, each process will have a unique NCCL port configured.
  3. NCCL Communication:

    • NCCL library handles the communication between GPUs efficiently using these configured port numbers.
    • The NCCL library ensures that each process can establish connections with the corresponding GPU processes on other nodes using the specified port numbers.

#allreduce

import torch
import torch.distributed as dist

def main():
    # Initialize the process group
    dist.init_process_group(backend='gloo', init_method='tcp://<IP_of_Node_1>:<Port>', rank=0, world_size=2)

    # Input tensor on Node 1
    tensor = torch.tensor([1.0, 2.0, 3.0])
    
    # 执行张量广播操作
    #dist.broadcast(tensor, src=0)  # 将排名为0的节点的张量广播到所有其他节点
    # 打印广播后的张量
    #print(f"Rank {rank} - Broadcasted Tensor: {tensor}")

    # All-reduce operation
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

    # Output the reduced tensor
    print(f"Node 1 - Reduced Tensor: {tensor}")

    # Clean up
    dist.destroy_process_group()

if __name__ == '__main__':
    main()


svc_ip=$MASTER_ADDR
pod_ip=172.20.0.45

torchrun --master_addr=$svc_ip --master_port=2222 --nnodes=2 --node_rank=$RANK --nproc_per_node=8 hly.py

import torch
import torch_npu
import os
import torch.distributed as dist
device = int(os.getenv('LOCAL_RANK'))

print(device)
torch.npu.set_device(device)

torch.distributed.init_process_group(backend='hccl',init_method='env://')

a=torch.tensor(1).npu()
torch.distributed.all_reduce(a)


dist.broadcast(a, src=0)
print(a,device)

打印默认的init_method

import torch.distributed as dist

def print_default_init_method():
    default_init_method = dist.default_pg_init_method()
    print(f"Default torch.distributed init_method: {default_init_method}")

if __name__ == '__main__':
    print_default_init_method()

获取group详细信息

def print_process_group_details(group):
    import torch.distributed as dist
    # Get the ranks in the process group
    ranks = dist.get_rank(group)
    print(f"Ranks in the Process Group: {ranks}")

    # Get the size of the process group
    group_size = dist.get_world_size(group)
    print(f"Size of the Process Group: {group_size}")

    # Print details about each rank in the process group
    print("Details for each rank in the Process Group:")
    global_rank = dist.get_rank()
    group_rank = dist.get_group_rank(group, global_rank)
    print(f"  Global Rank: {global_rank}")
    print(f"  Device ID: {group_rank}")