------------------------------------------------------------------------------------
在上一节中,我们使用QtConcurrent的Map处理了并发操作:计算指定目录下所有文件的md5码。在那个示例中,这些操作的结果之间没有关联,我们只是利用CPU的多核执行并发操作。在本节的示例中,我们将演示通过将任务拆分并最终将计算结果进行合并,也就是通常所说的MapReduce(Map:把任务拆分并用多线程并发执行,Reduce:把各个线程计算结果汇总得到最终结果),我们将利用QtConcurrent的MapReduce进行开发。
------------------------------------------------------------------------------------
上一节我们使用QtConcurrent::blockingMappedReduced()完成了并发操作,本节使用QtConcurrent::mappedReduced()接口,其定义如下:
mappedReduced()接口返回值与参数3 reduce函数返回值类型一样。QtConcurrent会自动遍历sequence,把每一个成员作为参数传入MapFunctor接口map,然后调用reduce接口把map()函数调用后的返回值组织到一起并返回。
在本节课的示例中,我们将实现如下功能:
遍历指定目录中的所有文件,挨个计算MD5码,最后把文件名、该文件对应的MD5码组成一个映射:
QMap<QString,QByteArray>;
我们介绍一下怎样使用QtConcurrent实现该功能。
首先,我们看一下单线程计算批量文件md5码
代码清单13-04-02
main.cpp
- #include <QApplication>
- #include <QTranslator> // 国际化
- #include "qglobal.h"
- #include <QLibraryInfo> // 国际化
- #include <QDir>
- #include <QFile>
- #include <iostream>
- #include "baseapi.h"
- using std::cout;
- using std::endl;
- typedef QMap<QString, QByteArray> MD5Map;
- int main(int argc, char * argv[])
- {
- // ......
- QStringList strFilters;
- // 需要注意避免把本demo产生的md5_a.txt、md5_b.txt统计在内。
- strFilters << "*.h" << "*.cpp";
- // 得到待计算MD5码的文件列表.
- QString strScanPath = ns_train::getPath("$TRAINDEVHOME");
- qDebug() << strScanPath;
- QStringList files =
- ns_train::getFileList(strScanPath, strFilters, true);
- QStringList::iterator ite = files.begin();
- int singleThreadTime = 0; // 单线程计算md5码所需的时间。
- QTime time;
- time.start();
- MD5Map md5_a;
- { // 串行操作
- QString strFileName;
- for (; ite != files.end(); ite++) {
- md5_a[*ite] = ns_train::getMd5(*ite);
- }
- singleThreadTime = time.elapsed(); // ms
- qDebug() << "single thread" << singleThreadTime;
- }
- }
复制代码在代码清单13-04-02中,
第14行,定义了MapReduce的返回值类型,也就是最终我们需要保存的处理结果,一个从文件名到该文件md5码的QMap。
第19~26行,遍历得到"$TRAINDEVHOME"目录下的所有文件(含所有子目录)并将文件列表保存到files变量;
第29~30行,为了对比使用单线程计算md5码与QtConcurrent计算md5码的速度,定义了time变量并启动计时;
第31~39行,遍历变量files文件列表,并挨个文件计算md5码,将得到的md5码保存到映射md5_a;
第37~38行,统计计算用时并输出;
为了对两种方法的计算结果进行对比,确保两种方法计算的到的md5码一致,我们将得到的md5码列表保存到文件,先保存单线程方案的结果:
代码清单13-04-03
main.cpp
- int main(int argc, char * argv[])
- {
- // ......
- // 将结果a写入文件
- {
- QString strFileName;
- strFileName = ns_train::getPath("$TRAINDEVHOME/test/chapter13/ks13_04/md5_a.txt");
- QString strDir = ns_train::getDirectory(strFileName);
- QDir dir;
- dir.mkpath(strDir);
- QFile file(strFileName);
- // 打开方式:只读、文本方式
- if (!file.open(QFile::Truncate | QFile::WriteOnly | QFile::Text)) {
- qDebug("open failed! file name is:%s",
- strFileName.toLocal8Bit().data());
- }
- else {
- QMapIterator<QString, QByteArray> iteMd5(md5_a);
- while (iteMd5.hasNext()) {
- iteMd5.next();
- file.write(iteMd5.key().toLocal8Bit());
- file.write(iteMd5.value());
- }
- }
- file.close();
- }
- // ......
- }
复制代码代码清单13-04-03实现的功能是将md5_a中的文件名、md5码保存到文件:
"$TRAINDEVHOME/test/chapter13/ks13_03/md5_a.txt"
请注意第20行的语法,定义了一个QMap的迭代器iteMd5,构造该迭代器时传入了map对象md5_a。
然后,我们看一下使用QtCocurrent的实现方案
代码清单13-04-04
main.cpp
- // ......
- #include <qtconcurrentmap.h>
- // ......
- // map函数
- MD5Map getMd5OfFile(const QString& strFileName)
- {
- qDebug() << "get md5 in thread" << QThread::currentThread();
- MD5Map md5map;
- md5map[strFileName] = ns_train::getMd5(strFileName);
- return md5map;
- };
- // reduce函数
- void reduce(MD5Map &result, const MD5Map &w)
- {
- QMapIterator<QString, QByteArray> i(w);
- while (i.hasNext()) {
- i.next();
- result[i.key()] += i.value();
- }
- }
- int main(int argc, char * argv[])
- {
- // ......
- int mapReduceTime = 0;
- MD5Map md5_b;
- {
- QTime time;
- time.start();
- md5_b = QtConcurrent::mappedReduced(files, getMd5OfFile, reduce);
- mapReduceTime = time.elapsed();
- qDebug() << "MapReduce" << mapReduceTime;
- }
- }
- // ......
- }
复制代码代码清单13-04-04中:
第2行,包含“qtconcurrentmap.h”,因为本示例用到的接口在该文件中定义;
第5~11行,定义了Map函数,函数名为:getMd5OfFile,函数提供一个const QString&类型的参数,返回值为QByteArray类型。该函数的实现与上节类似,通过调用ns_train::getMd5()计算指定文件的md5码,返回值类型为MD5Map。
第13~20行,是reduce函数的定义。该函数提供两个参数:
参数1(输入、输出参数),reduce函数的计算结果(返回值);
参数2(输入参数), map函数的计算结果w。
该函数中定义了一个QMapIterator<QString, QByteArray> 类型的迭代器i,该迭代器使用w进行构造。在reduce函数中,通过对map函数的计算结果进行遍历,把map的计算结果汇总到输出result对象中。
最后,看一下main()函数:
第27~28行,目的仍然是统计运行时间;
第29行,该行代码是本节课的核心,通过调用QtConcurrent::mappedReduced()实现了MapReduce。files作为输入序列(代码清单13-04-01的参数1),getMd5OfFile是map函数名,reduce是Reduce函数名;
第30~31行,统计MapReduce用时并输出;
代码清单13-04-05
main.cpp
- // 将结果b写入文件
- int main(int argc, char * argv[])
- {
- // ......
- qDebug() << "Map speedup x"
- << ((double)singleThreadTime - (double)mapTime) / (double)mapTime + 1;
- // 将结果b写入文件
- {
- QString strFileName;
- strFileName = ns_train::getPath("$TRAINDEVHOME/test/chapter13/ks13_03/md5_b.txt");
- QString strDir = ns_train::getDirectory(strFileName);
- QDir dir;
- dir.mkpath(strDir);
- QFile file(strFileName);
- // 打开方式:只读、文本方式
- if (!file.open(QFile::Truncate | QFile::WriteOnly | QFile::Text)) {
- qDebug("open failed! file name is:%s",
- strFileName.toLocal8Bit().data());
- }
- else {
- QList<QByteArray>::iterator iteMd5 = md5_b.begin();
- for (; iteMd5 != md5_b.end(); iteMd5++) {
- file.write(*iteMd5);
- }
- }
- file.close();
- }
- }
复制代码在代码清单13-04-05中,跟方案1一样,将方案2结果b保存到文件中。在第5~7行将两个方案的用时进行比较并计算出比例值并输出,以便比较这两个方案哪个用时更少。读者可以自行运行一下本示例附件源码,看看效果,如果不明显可以试试多准备些文件。
结语
------------------------------------------------------------------------------------
本文介绍了使用QtConcurrent:: mappedReduced处理并发操作并将结果汇总的MapReduce模式的开发方法,关键点在于编写map、reduce函数。