Microsoft MPI (MS-MPI) 是微软基于 MPI(Message Passing Interface) 标准开发的高性能并行计算库,主要用于 Windows HPC(高性能计算) 环境。它支持 MPI-2 和部分 MPI-3 特性,适用于多节点集群计算,广泛应用于科学计算、金融建模、机器学习和大规模仿真等领域。
MS-MPI 主要特点
-
Windows 原生支持:专为 Windows Server 和 Windows HPC 环境优化。
-
跨节点并行计算:支持多台计算机组成的集群。
-
高性能通信:支持 TCP/IP、共享内存(节点内)和 RDMA(远程直接内存访问)。
-
与 Visual Studio 集成:方便在 Windows 平台上开发和调试 MPI 程序。
-
兼容性:支持 32 位和 64 位应用程序。
MS-MPI 核心组件
组件 | 功能 |
---|---|
mpiexec.exe | 用于启动 MPI 程序(类似 OpenMPI 的 mpirun ) |
smpd.exe | MS-MPI 守护进程,管理 MPI 进程通信 |
MPI 头文件(mpi.h) | 用于 C/C++ 开发 |
MS-MPI SDK | 开发工具包(编译 MPI 程序所需) |
MS-MPI 安装与配置
1 安装方式
-
独立安装 MS-MPI(推荐)
-
下载地址:Microsoft MPI 官方下载
-
安装
msmpisetup.exe
(运行时)和msmpisdk.msi
(开发包)。
-
-
通过 Microsoft HPC Pack 安装
-
适用于 Windows HPC 集群环境。
-
2 环境变量配置
安装后,MS-MPI 会自动设置以下环境变量:
-
MSMPI_BIN
:MPI 可执行文件路径(如mpiexec.exe
) -
MSMPI_INC
:MPI 头文件路径(如mpi.h
) -
MSMPI_LIB
:MPI 库文件路径
MS-MPI 编程
MS-MPI SDK(Software Development Kit)是微软提供的 MPI 开发工具包,用于在 Windows 平台上开发并行计算应用程序。它包含 头文件、库文件、编译器包装器 等工具,支持 C/C++ 和 Fortran 开发。
主要组件
组件 | 说明 |
---|---|
mpi.h | MPI 头文件(C/C++) |
mpif.h | MPI 头文件(Fortran) |
msmpi.lib | 静态链接库 |
mpicc/mpicxx | C/C++ MPI 编译器包装器 |
mpifort | Fortran MPI 编译器包装器 |
核心接口函数分类
1.1 环境管理函数
函数 | 功能 |
---|---|
MPI_Init(&argc, &argv) | 初始化MPI环境 |
MPI_Finalize() | 终止MPI环境 |
MPI_Comm_size(comm, &size) | 获取通信域中的进程总数 |
MPI_Comm_rank(comm, &rank) | 获取当前进程的编号(rank) |
MPI_Abort(comm, errorcode) | 异常终止所有MPI进程 |
1.2 点对点通信函数
函数 | 功能 |
---|---|
MPI_Send(buf, count, datatype, dest, tag, comm) | 阻塞发送 |
MPI_Recv(buf, count, datatype, source, tag, comm, &status) | 阻塞接收 |
MPI_Isend(buf, count, datatype, dest, tag, comm, &request) | 非阻塞发送 |
MPI_Irecv(buf, count, datatype, source, tag, comm, &request) | 非阻塞接收 |
MPI_Wait(&request, &status) | 等待非阻塞操作完成 |
1.3 集体通信函数
函数 | 功能 |
---|---|
MPI_Bcast(buf, count, datatype, root, comm) | 广播 |
MPI_Scatter(sendbuf,sendcount,sendtype,recvbuf,recvcount,recvtype,root,comm) | 数据分发 |
MPI_Gather(sendbuf,sendcount,sendtype,recvbuf,recvcount,recvtype,root,comm) | 数据收集 |
MPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, comm) | 规约操作 |
MPI_Allreduce(sendbuf, recvbuf, count, datatype, op, comm) | 全局规约 |
MPI_Barrier(comm) | 同步屏障 |
1.4 数据类型操作
函数 | 功能 |
---|---|
MPI_Type_contiguous(count, oldtype, &newtype) | 创建连续数据类型 |
MPI_Type_vector(count, blocklength, stride, oldtype, &newtype) | 创建向量数据类型 |
MPI_Type_commit(&datatype) | 提交新数据类型 |
基础编程示例
1. MPI Hello World
#include <mpi.h>
#include <stdio.h>
int main(int argc, char** argv) {
MPI_Init(&argc, &argv); // 初始化 MPI 环境
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank); // 获取当前进程的 rank(编号)
MPI_Comm_size(MPI_COMM_WORLD, &size); // 获取总进程数
printf("进程 %d / %d: Hello, World!\n", rank, size);
MPI_Finalize(); // 结束 MPI 环境
return 0;
}
编译与运行
# 使用 MS-MPI 编译器(需安装 SDK)
mpicc hello.c -o hello.exe
# 运行(4 个进程)
mpiexec -n 4 hello.exe
输出示例:
进程 0 / 4: Hello, World!
进程 1 / 4: Hello, World!
进程 2 / 4: Hello, World!
进程 3 / 4: Hello, World!
2. MPI 点对点通信(Send/Recv)
#include <mpi.h>
#include <stdio.h>
int main(int argc, char** argv) {
MPI_Init(&argc, &argv);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank == 0) {
int send_data = 123;
MPI_Send(&send_data, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
printf("Rank 0 发送数据: %d\n", send_data);
} else if (rank == 1) {
int recv_data;
MPI_Recv(&recv_data, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("Rank 1 接收数据: %d\n", recv_data);
}
MPI_Finalize();
return 0;
}
运行方式:
mpiexec -n 2 send_recv.exe
输出:
Rank 0 发送数据: 123
Rank 1 接收数据: 123
3. MPI 集体通信(Broadcast)
#include <mpi.h>
#include <stdio.h>
int main(int argc, char** argv) {
MPI_Init(&argc, &argv);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int data;
if (rank == 0) {
data = 100; // 只有 Rank 0 初始化数据
}
MPI_Bcast(&data, 1, MPI_INT, 0, MPI_COMM_WORLD); // 广播数据
printf("Rank %d 接收广播数据: %d\n", rank, data);
MPI_Finalize();
return 0;
}
运行方式:
mpiexec -n 4 bcast.exe
输出:
Rank 0 接收广播数据: 100
Rank 1 接收广播数据: 100
Rank 2 接收广播数据: 100
Rank 3 接收广播数据: 100
高级应用技巧
1. 非阻塞通信优化
// 重叠计算和通信示例
MPI_Request req;
MPI_Isend(data, count, MPI_DOUBLE, dest, tag, comm, &req);
// 在等待通信完成的同时进行计算
do_some_computation();
// 确保通信完成
MPI_Wait(&req, MPI_STATUS_IGNORE);
2. 自定义规约操作
void my_max(void *in, void *inout, int *len, MPI_Datatype *dptr) {
double *invals = (double *)in;
double *inoutvals = (double *)inout;
for (int i = 0; i < *len; i++) {
if (invals[i] > inoutvals[i]) {
inoutvals[i] = invals[i];
}
}
}
// 注册自定义操作
MPI_Op my_op;
MPI_Op_create(my_max, 1, &my_op);
// 使用自定义规约
MPI_Allreduce(local_data, global_data, count, MPI_DOUBLE, my_op, comm);
常用应用场景开发示例
场景一:并行数值计算(矩阵运算)
使用多个进程并行计算矩阵乘法,主进程(Rank 0)负责分发数据和收集结果。
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#define N 1024 // 矩阵维度
int main(int argc, char** argv) {
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
// 每个进程负责的行数
int rows_per_proc = N / size;
double *A_part = malloc(rows_per_proc * N * sizeof(double));
double *B = malloc(N * N * sizeof(double));
double *C_part = malloc(rows_per_proc * N * sizeof(double));
// Rank 0初始化完整矩阵
if (rank == 0) {
double *A_full = malloc(N * N * sizeof(double));
// 初始化A_full和B矩阵...
}
// 分发A矩阵的行块
MPI_Scatter(A_full, rows_per_proc*N, MPI_DOUBLE,
A_part, rows_per_proc*N, MPI_DOUBLE,
0, MPI_COMM_WORLD);
// 广播B矩阵到所有进程
MPI_Bcast(B, N*N, MPI_DOUBLE, 0, MPI_COMM_WORLD);
// 并行计算部分结果
for (int i = 0; i < rows_per_proc; i++) {
for (int j = 0; j < N; j++) {
C_part[i*N + j] = 0.0;
for (int k = 0; k < N; k++) {
C_part[i*N + j] += A_part[i*N + k] * B[k*N + j];
}
}
}
// 收集结果到Rank 0
MPI_Gather(C_part, rows_per_proc*N, MPI_DOUBLE,
C_full, rows_per_proc*N, MPI_DOUBLE,
0, MPI_COMM_WORLD);
MPI_Finalize();
return 0;
}
关键点分析
-
使用
MPI_Scatter
分发数据 -
使用
MPI_Bcast
广播共享数据 -
使用
MPI_Gather
收集计算结果 -
计算任务均匀分配到各进程
场景二:分布式排序(桶排序)
将大数据集分布到多个进程进行局部排序,最后合并结果。
#include <mpi.h>
#include <algorithm>
#define DATA_SIZE 1000000
int main(int argc, char** argv) {
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
// 每个进程处理的数据量
int local_size = DATA_SIZE / size;
int *local_data = new int[local_size];
// Rank 0初始化并分发数据
if (rank == 0) {
int *all_data = new int[DATA_SIZE];
// 初始化数据...
MPI_Scatter(all_data, local_size, MPI_INT,
local_data, local_size, MPI_INT,
0, MPI_COMM_WORLD);
delete[] all_data;
} else {
MPI_Scatter(NULL, local_size, MPI_INT,
local_data, local_size, MPI_INT,
0, MPI_COMM_WORLD);
}
// 本地排序
std::sort(local_data, local_data + local_size);
// 收集排序后的数据
if (rank == 0) {
int *sorted_data = new int[DATA_SIZE];
MPI_Gather(local_data, local_size, MPI_INT,
sorted_data, local_size, MPI_INT,
0, MPI_COMM_WORLD);
// 处理最终排序结果...
delete[] sorted_data;
} else {
MPI_Gather(local_data, local_size, MPI_INT,
NULL, local_size, MPI_INT,
0, MPI_COMM_WORLD);
}
delete[] local_data;
MPI_Finalize();
return 0;
}
性能优化技巧
-
使用
MPI_Scatterv
和MPI_Gatherv
处理不均衡数据 -
在数据分发前进行预排序(Sample Sort)
-
使用非阻塞通信重叠计算和通信
场景三:主从模式任务分配
主进程(Rank 0)负责任务分配,从进程处理任务并返回结果。
#include <mpi.h>
#define TOTAL_TASKS 1000
void process_task(int task_id) {
// 模拟任务处理
}
int main(int argc, char** argv) {
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (rank == 0) {
// 主进程代码
int next_task = 0;
MPI_Status status;
// 初始任务分配
for (int worker = 1; worker < size && next_task < TOTAL_TASKS; worker++) {
MPI_Send(&next_task, 1, MPI_INT, worker, 0, MPI_COMM_WORLD);
next_task++;
}
// 处理返回结果并分配新任务
while (next_task < TOTAL_TASKS) {
int result;
MPI_Recv(&result, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
MPI_Send(&next_task, 1, MPI_INT, status.MPI_SOURCE, 0, MPI_COMM_WORLD);
next_task++;
}
// 发送终止信号
for (int worker = 1; worker < size; worker++) {
int terminate = -1;
MPI_Send(&terminate, 1, MPI_INT, worker, 0, MPI_COMM_WORLD);
}
} else {
// 从进程代码
while (1) {
int task_id;
MPI_Recv(&task_id, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (task_id == -1) break; // 终止信号
process_task(task_id);
// 返回处理完成信号
int done = 1;
MPI_Send(&done, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);
}
}
MPI_Finalize();
return 0;
}
模式特点
-
动态负载均衡
-
适用于任务处理时间不均衡的场景
-
可扩展性强
典型应用场景总结
应用场景 | 适用MPI函数 | 特点 |
---|---|---|
科学计算 | MPI_Reduce , MPI_Allreduce | 大规模数值计算 |
数据分析 | MPI_Scatter , MPI_Gather | 数据并行处理 |
图像处理 | MPI_Sendrecv | 区域分解法 |
机器学习 | MPI_Allgather | 参数服务器模式 |
金融仿真 | MPI_Bcast | 蒙特卡洛模拟 |
通过合理选择MPI函数组合,可以构建高效的并行程序。建议根据具体问题特点选择点对点通信或集体通信方式,并注意负载均衡和通信开销的平衡。