Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

来自chatgpt

这个是低阶版本的,高阶的是openMPI+openMP,见MPI + openMP

openMPI和std::thread同时使用有什么要求

当在MPI应用程序中使用std::thread时,只有主线程可以调用MPI接口。

在其他线程中调用MPI接口会导致未定义的行为,可能会导致程序崩溃或者产生不可预测的结果。因此,如果需要在其他线程中进行与MPI相关的操作,应该使用线程间通信机制将任务委托给主线程进行处理。

注意process bind(来自man mpirun和实验)

Please note that mpirun automatically binds processes as of the start of the v1.8 series. Three binding patterns are used in the absence of any further directives:

  • Bind to core: when the number of processes is <= 2

  • Bind to socket: when the number of processes is > 2

  • Bind to none: when oversubscribed

If your application uses threads, then you probably want to ensure that you are either not bound at all (by specifying –bind-to none), or bound to multiple cores using an appropriate binding level or specific number of processing elements per application process.

(即每个process多线程的时候,要么指定--bind-to none,这样会not bound (or bound to all available processors;要么指定每个process分配多少处理器核心,比如--map-by node:PE=n是每个节点一个process、每个process bind to n个处理器核心)

相关option

运行指定:

  • –bind-to
    Bind processes to the specified object, defaults to core. Supported options include slot, hwthread, core, l1cache, l2cache, l3cache, socket, numa, board, cpu-list, and none.

  • –map-by
    Map process to the specified object, defaults to socket. Supported options include slot, hwthread,
    core, L1cache, L2cache, L3cache, socket, numa, board, node, sequential, distance, and ppr.
    Any object can include modifiers by adding a : and any combination of PE=n (bind n processing elements to each proc), SPAN (load balance the processes across the allocation), OVER‐
    SUBSCRIBE (allow more processes on a node than processing elements), and NOOVERSUBSCRIBE.
    This includes PPR, where the pattern would be terminated by another colon to separate it
    from the modifiers.

    比如 –map-by node:PE=n
    load balance the processes across the available nodes, and bind each process to 32 processing elements.

  • –use-hwthread-cpus

    ​ then processing element is not physical core, but hardware thread

帮助监控:

  • -report-bindings, –report-bindings
    Report any bindings for launched processes.

使用std::thread和openMPI以及读写锁的例子

由于只在主线程中调用MPI接口,因此使用MPI_THREAD_FUNNELED层次

#include <iostream>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <mpi.h>

std::shared_mutex rw_mutex;
std::mutex cout_mutex; // 只是用于保护输出不interleaf的

void reader_func(int rank, int thread_id)
{
    std::shared_lock<std::shared_mutex> lock(rw_mutex);
    std::lock_guard<std::mutex> cout_lock(cout_mutex);
    std::cout << "Reader " << thread_id << " in process " << rank << " starts reading" << std::endl;
    // Reading...
    std::cout << "Reader " << thread_id << " in process " << rank << " finishes reading" << std::endl;
}

void writer_func(int rank, int thread_id)
{
    std::unique_lock<std::shared_mutex> lock(rw_mutex);
    std::lock_guard<std::mutex> cout_lock(cout_mutex);
    std::cout << "Writer " << thread_id << " in process " << rank << " starts writing" << std::endl;
    // Writing...
    std::cout << "Writer " << thread_id << " in process " << rank << " finishes writing" << std::endl;
}

int main(int argc, char **argv)
{
    int provided_level;
    MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided_level);
    if (provided_level < MPI_THREAD_FUNNELED)
    {
        std::cerr << "MPI implementation does not support MPI_THREAD_FUNNELED." << std::endl;
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Create threads
    const int num_threads = 4;
    std::thread threads[num_threads];
    for (int i = 0; i < num_threads; i++)
    {
        if (i % 2 == 0)
        {
            threads[i] = std::thread(reader_func, rank, i);
        }
        else
        {
            threads[i] = std::thread(writer_func, rank, i);
        }
    }

    // Wait for threads to finish
    for (int i = 0; i < num_threads; i++)
    {
        threads[i].join();
    }

    MPI_Finalize();
    return 0;
}

在这个例子中,每个进程启动4个线程(加上主线程的话是5个)

我们使用了一个std::shared_mutex变量rw_mutex,它用于实现读写锁。std::unique_lockstd::shared_lock用于在写入和读取时获取锁。在每个主线程中,我们创建了4个线程,其中2个是读者线程,另外2个是写者线程。

读者线程调用函数reader_func(),它获取了一个共享锁,然后打印一条消息,表示正在读取。写者线程调用函数writer_func(),它获取了一个独占锁,然后打印一条消息,表示正在写入。在实际的应用程序中,读者线程和写者线程将执行实际的读取和写入操作。

在输出消息时,我们使用了一个std::mutex变量cout_mutex,它用于在多个线程之间保护输出操作。这是必要的,因为std::cout不是线程安全的

编译运行

编译:

mpicxx -std=c++17 -g -Wall -pthread example.cpp -o example

运行:-n 3指明要3个进程

 mpirun -n 3 ./example

输出:可以看到有三个进程

Reader 0 in process 1 starts reading
Reader 0 in process 1 finishes reading
Reader 2 in process 1 starts reading
Reader 2 in process 1 finishes reading
Writer 1 in process 1 starts writing
Writer 1 in process 1 finishes writing
Writer 3 in process 1 starts writing
Writer 3 in process 1 finishes writing
Reader 0 in process 2 starts reading
Reader 0 in process 2 finishes reading
Writer 1 in process 2 starts writing
Writer 1 in process 2 finishes writing
Reader 0 in process 0 starts reading
Reader 0 in process 0 finishes reading
Reader 2 in process 0 starts reading
Reader 2 in process 0 finishes reading
Reader 2 in process 2 starts reading
Reader 2 in process 2 finishes reading
Writer 3 in process 2 starts writing
Writer 3 in process 2 finishes writing
Writer 1 in process 0 starts writing
Writer 1 in process 0 finishes writing
Writer 3 in process 0 starts writing
Writer 3 in process 0 finishes writing

这是一个不好的行为,我们最好不这么干 MPI库会保证这一点:MPI_THREAD_FUNNELED级别只有主线程可以调用MPI函数

比如:这个例子中每个线程都会调用MPI_Barrier()函数,但是只有由主线程调用的这个MPI_Barrier()函数实际上会调用MPI库函数。MPI库会自动判断当前线程是否是主线程,如果不是,则会阻塞该线程,直到主线程完成相应的MPI库函数调用。

#include <iostream>
#include <thread>
#include <mpi.h>

void worker_function(int thread_id, int rank) {
    std::cout << "Thread " << thread_id << " running on rank " << rank << std::endl;
    MPI_Barrier(MPI_COMM_WORLD);
    std::cout << "Thread " << thread_id << " finished on rank " << rank << std::endl;
}

int main(int argc, char **argv) {
    int num_threads = 4;
    int provided_level;
    MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided_level);
    if (provided_level < MPI_THREAD_FUNNELED) {
        std::cerr << "MPI implementation does not support MPI_THREAD_FUNNELED." << std::endl;
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; i++) {
        threads.emplace_back(worker_function, i, rank);
    }

    for (int i = 0; i < num_threads; i++) {
        threads[i].join();
    }

    MPI_Finalize();
    return 0;
}

评论