MR-MPI库

官网: MapReduce-MPI 库

下载:apt包管理器即可下载

  • libmrmpi1 下载运行环境
  • libmrmpi-dev 下载开发环境

使用时引入头文件

1
2
3
#include "mapreduce.h"
#include "keyvalue.h"
using namespace MAPREDUCE_NS

编译时把.h添加进环境变量

1
export CPLUS_INCLUDE_PATH=/usr/include/mrmpi:$CPLUS_INCLUDE_PATH

mpic++进行编译

1
mpic++ hello.cpp -o hello /usr/lib/x86_64-linux-gnu/libMapReduceMPI.so

MR-MPI中主要是对两个类进行操作mapreduce keyvalue

keyvalue储存键值对,后文的键值对基本指的是该类的对象

这里遇到个问题,待解决。。。。

我设置LD_LIBRARY_PATH为/usr/lib/x86_64-linux-gnu,然后直接编译mpic++ hello.cpp -o hello却不行,或mpic++ hello.cpp -o hello -L/usr/lib/x86_64-linux-gnu也不行,不知道为什么。这里真的卡我很久。。。。。

使用

MapReduce对象

创建

有三种构造函数

1
2
3
MapReduce(MPI_Comm);
MapReduce();
MapReduce(double);

MPI_Comm这个一般创建时直接为MapReduce *mr = new MapReduce(MPI_COMM_WORLD);

复制

1
MapReduce *MapReduce::copy()

复制一个对象

1
2
3
MapReduce *mr1 = new MapReduce(MPI_COMM_WORLD);
mr1->map(ntasks,&mymap,NULL);
MapReduce *mr2 = mr1->copy();

这样mr1复制到mr2中去了

销毁

和一般的对象一样,堆上的对象用delete销毁,栈上的对象函数结束后自己就销毁了

MapReduce map()

该函数有多组重载,用于生成键值对

1
2
3
Variant 1:
uint64_t MapReduce::map(int nmap, void (*mymap)(int, KeyValue *, void *), void *ptr)
uint64_t MapReduce::map(int nmap, void (*mymap)(int, KeyValue *, void *), void *ptr, int addflag)

改组用法为,nmap为执行mymap总述的处理器数量

1
2
3
Variant 2:
uint64_t MapReduce::map(int nstr, char **strings, int self, int recurse, int readfile, void (*mymap)(int, char *, KeyValue *, void *), void *ptr)
uint64_t MapReduce::map(int nstr, char **strings, int self, int recurse, int readfile, void (*mymap)(int, char *, KeyValue *, void *), void *ptr, int addflag)

这一组可以指定nmap和strings为文件名或目录,来生成一个文件名列表。列表中的每一个文件都会进入mymap函数然后来处理他。

这里如果要读取文件,self recurse readfile设置为010。其他的官方文档中似乎没有写

1
2
3
4
5
6
Variant 3:
uint64_t MapReduce::map(int nmap, int nstr, char **strings, int recurse, int readfile, char sepchar, int delta, void (*mymap)(int, char *, int, KeyValue *, void *), void *ptr)
uint64_t MapReduce::map(int nmap, int nstr, char **strings, int recurse, int readfile, char sepchar, int delta, void (*mymap)(int, char *, int, KeyValue *, void *), void *ptr, int addflag)
Variant 4:
uint64_t MapReduce::map(int nmap, int nstr, char **strings, int recurse, int readfile, char *sepstr, int delta, void (*mymap)(int, char *, int, KeyValue *, void *), void *ptr)
uint64_t MapReduce::map(int nmap, int nstr, char **strings, int recurse, int readfile, char *sepstr, int delta, void (*mymap)(int, char *, int, KeyValue *, void *), void *ptr, int addflag)

这两组是把一个大型文件转换为一个或多个部分来进行mymap函数的调用

variant3的sepchar来指定特定的分隔字符

variant4的sepchar*来指定特定的分隔字符串

1
2
3
Variant 5:
uint64_t MapReduce::map(MapReduce *mr2, void (*mymap)(uint64_t, char *, int, char *, int, KeyValue *, void *), void *ptr)
uint64_t MapReduce::map(MapReduce *mr2, void (*mymap)(uint64_t, char *, int, char *, int, KeyValue *, void *), void *ptr, int addflag)

该组可以指定已有的对象来执行mymap

这些map函数都可以指定一个ptr指针,这个指针会被传入mymap函数,不需要的话就设置成NULL

addflag参数。对于除了最后一组重载之外的map,如果addflag省略或设置为0,则map将创建一个新的KeyValue,再删除存在的KeyValue,如果addflag不为0,则由mymap生成的KeyValue会被添加到已经存在的KeyValue中。

addflag参数对于最后一组来说,如果设置成1,则新生成的KeyValue会被添加到已存在的KeyValue中,如果设置成0,那么已存在的KeyValue会被新生成的替代,

关于mymap函数,有四种重载

1
2
3
4
void mymap(int itask, KeyValue *kv, void *ptr)
void mymap(int itask, char *file, KeyValue *kv, void *ptr)
void mymap(int itask, char *str, int size, KeyValue *kv, void *ptr)
void mymap(uint64_t itask, char *key, int keybytes, char *value, int valuebytes, KeyValue *kv, void *ptr)

这些情况传递给函数的最后两个参数都是指向MapReduce储存的Keyvalue对象以及map中的ptr

  • 第一个重载 $$0<=itask<nmap$$,nmap即map中指定的
  • 第二个重载$$0<=itask<nfiles$$,nfiles是文件列表中文件的数量,file指向的是个文件
  • 第三个重载,itask规则同第一个,nmap是map分割字符串后的数量,他会传递一个字符串和字符串长度(包括\x00)
  • 最后一个重载,$$0<=itask<nkey$$ nkey是键值对的数量

mymap函数即用来生成键值对的

MapReduce reduce()

1
uint64_t MapReduce::reduce(void (*myreduce)(char *, int, char *, int, int *, KeyValue *, void *), void *ptr) 

传入一个函数指针,他会对KeyMultiValue进行操作,对每个处理器均会去执行myreduce函数。myreduce会生成新的键值对。函数返回所有进程中生成键值对的数量

1
void myreduce(char *key, int keybytes, char *multivalue, int nvalues, int *valuebytes, KeyValue *kv, void *ptr) 

MapReduce add()

1
uint64_t MapReduce::add(MapReduce *mr2)

添加键值对,该函数有多个重载,可以添加不同种键值对,或直接对一个对象进行操作

1
mr1->add(mr2);

把mr2的键值附加到mr1中去

add把相同键值对会进行合并

MapReduce aggregate()

1
uint64_t MapReduce::aggregate(int (*myhash)(char *, int)) 

用于重新分配键值对,合并相同的键。一个key的不同value可能在多个进程中,在执行后,key的重复项由一个进程存储。

返回新的键值对的总数。

MapReduce broadcast()

1
uint64_t MapReduce::broadcast(int root) 

删去除了root进程以外所有进程的键值对,并把root进程的键值对给广播到所有进程。最终结果是所有进程都有一份root进程中键值对的拷贝

返回值官方文档中没写

MapReduce clone()

1
uint64_t MapReduce::clone() 

把keyvalue对象转化成keymultivalue对象

MapReduce open() MapReduce close()

1
2
3
void MapReduce::open()
void MapReduce::open(int addflag)
uint64_t MapReduce::close()

MapReduce collapse()

1
uint64_t MapReduce::collapse(char *key, int keybytes) 

将多个键值对折叠成一个键值对

("dog",3), ("me",45), ("parallel",1) =>>(key,["dog",3,"me",45,"parallel",1])

MapReduce compress()

1
uint64_t MapReduce::compress(void (*mycompress)(char *, int, char *, int, int *, KeyValue *, void *), void *ptr)  

compress将具有重复键的KeyValue压缩成新的KeyValue对象,其中每个键在一个处理器上仅出现一次

MapReduce collate()

1
uint64_t MapReduce::collate(int (*myhash)(char *, int))

将KeyValue对象转化成KeyMultiValue对象

这个函数和执行aggregate()后在执行convert()效果一样

MapReduce convert()

1
uint64_t MapReduce::convert() 

把keyvalue转化成KeyMultiValue:查找重复的键,将他们的值连接成一个值的列表

MapReduce gather()

1
uint64_t MapReduce::gather(int nprocs)

用于收集所有处理器中键值对来在nprocs个处理器中形成一个新的键值对

ID>=nprocs的处理器会得到一个空的KeyValue

函数返回新的KeyValue的总数。

MapReduce print()

1
2
void MapReduce::print(int proc, int nstride, int kflag, int vflag)
void MapReduce::print(char *file, int fflag, int proc, int nstride, int kflag, int vflag)

第一个将KeyValue或KeyMultiValue打印

proc<0,打印所有proc的信息

proc>=0 打印指定proc的信息

第二个打印一个或多个文件

如果fflag=0所有进程的信息都打印到该文件,

MapReduce scan()

1
2
uint64_t MapReduce::scan(void (*myscan)(char *, int, char *, int, void *), void *ptr)
uint64_t MapReduce::scan(void (*myscan)(char *, int, char *, int, int *, void *), void *ptr)

MapReduce sort_keys()

1
2
uint64_t MapReduce::sort_keys(int (*mycompare)(char *, int, char *, int))
uint64_t MapReduce::sort_keys(int flag)

以key为关键字排序,其中mycompare是比较器

MapReduce sort_values()

1
2
uint64_t MapReduce::sort_values(int (*mycompare)(char *, int, char *, int))
uint64_t MapReduce::sort_values(int flag)

以value为关键字排序

示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// MapReduce word frequency example in C++
// Syntax: wordfreq file1 dir1 file2 dir2 ...
// (1) read all files and files in dirs
// (2) parse into words separated by whitespace
// (3) count occurrence of each word in all files
// (4) print top 10 words

#include "mpi.h"
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "sys/stat.h"
#include "mapreduce.h"
#include "keyvalue.h"

using namespace MAPREDUCE_NS;

void fileread(int, char *, KeyValue *, void *);
void sum(char *, int, char *, int, int *, KeyValue *, void *);
int ncompare(char *, int, char *, int);
void output(uint64_t, char *, int, char *, int, KeyValue *, void *);

struct Count {
int n,limit,flag;
};

/* ---------------------------------------------------------------------- */

int main(int narg, char **args)
{
MPI_Init(&narg,&args);

int me,nprocs;
MPI_Comm_rank(MPI_COMM_WORLD,&me);
std::cout<<"MYPID:"<<me<<std::endl;
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);

if (narg <= 1) {
if (me == 0) printf("Syntax: wordfreq file1 file2 ...\n");
MPI_Abort(MPI_COMM_WORLD,1);
}

MapReduce *mr = new MapReduce(MPI_COMM_WORLD);
mr->verbosity = 2;
mr->timer = 1;
//mr->memsize = 1;
//mr->outofcore = 1;

MPI_Barrier(MPI_COMM_WORLD);
double tstart = MPI_Wtime();

int nwords = mr->map(narg-1,&args[1],0,1,0,fileread,NULL);
int nfiles = mr->mapfilecount;
mr->collate(NULL);
int nunique = mr->reduce(sum,NULL);

MPI_Barrier(MPI_COMM_WORLD);
double tstop = MPI_Wtime();

mr->sort_values(&ncompare);

Count count;
count.n = 0;
count.limit = 10;
count.flag = 0;
mr->map(mr,output,&count);

mr->gather(1);
mr->sort_values(ncompare);

count.n = 0;
count.limit = 10;
count.flag = 1;
mr->map(mr,output,&count);

delete mr;

if (me == 0) {
printf("%d total words, %d unique words\n",nwords,nunique);
printf("Time to process %d files on %d procs = %g (secs)\n",
nfiles,nprocs,tstop-tstart);
}

MPI_Finalize();
}

/* ----------------------------------------------------------------------
read a file
for each word in file, emit key = word, value = NULL
------------------------------------------------------------------------- */

void fileread(int itask, char *fname, KeyValue *kv, void *ptr)
{
// filesize = # of bytes in file

struct stat stbuf;
int flag = stat(fname,&stbuf);
if (flag < 0) {
printf("ERROR: Could not query file size\n");
MPI_Abort(MPI_COMM_WORLD,1);
}
int filesize = stbuf.st_size;

FILE *fp = fopen(fname,"r");
char *text = new char[filesize+1];
int nchar = fread(text,1,filesize,fp);
text[nchar] = '\0';
fclose(fp);

char *whitespace = " \t\n\f\r\0";
char *word = strtok(text,whitespace);
while (word) {
kv->add(word,strlen(word)+1,NULL,0);
word = strtok(NULL,whitespace);
}

delete [] text;
}

/* ----------------------------------------------------------------------
count word occurrence
emit key = word, value = # of multi-values
------------------------------------------------------------------------- */

void sum(char *key, int keybytes, char *multivalue,
int nvalues, int *valuebytes, KeyValue *kv, void *ptr)
{
kv->add(key,keybytes,(char *) &nvalues,sizeof(int));
}

/* ----------------------------------------------------------------------
compare two counts
order values by count, largest first
------------------------------------------------------------------------- */

int ncompare(char *p1, int len1, char *p2, int len2)
{
int i1 = *(int *) p1;
int i2 = *(int *) p2;
if (i1 > i2) return -1;
else if (i1 < i2) return 1;
else return 0;
}

/* ----------------------------------------------------------------------
process a word and its count
depending on flag, emit KV or print it, up to limit
------------------------------------------------------------------------- */

void output(uint64_t itask, char *key, int keybytes, char *value,
int valuebytes, KeyValue *kv, void *ptr)
{
Count *count = (Count *) ptr;
count->n++;
if (count->n > count->limit) return;

int n = *(int *) value;
if (count->flag) printf("%d %s\n",n,key);
else kv->add(key,keybytes,(char *) &n,sizeof(int));
}

该程序来源于官方github项目里词频统计的例子

然后再建立两个文件file1,file2来存放些许数据,这里我直接存放的是两个程序的原码。

让该程序统计file1和file2中的词频

执行

1
mpirun -np4 ./test file1 file2

该程序执行过程

  • 先利用map调用fileread函数,来调用两个进程同时读两个文件,并对文件进行划分。键值对结果保存在mr中即MapReduce的对象

  • 再用collate把keyvalue转化成keymultivalue。否则没法被reduce处理。

  • 用reduce统计词频进行sum求和

  • 求和结果排序后gather到0进程在次进行排序

  • 最后在次用map进行输出打印结果

总结

对数据处理成键值对进行操作,其中添加键值对主要通过add进行完成

MR-MPI主要是通过map和reduce进行操作。map和reduce中的参数涉及到函数指针的调用,map和reduce会多次重复的调用这个函数,而不是每个进程只调用一次。有点类似于map和reduce是帮助用来遍历的函数,map是帮助遍历数据转化成键值对,reduce则是遍历键值对。在遍历的过程中具体如何对数据进行操作就是调用的自定义函数所来实现的。