找回密码
 立即注册
收起左侧

KS13-04 使用QtConcurrent处理并发-MapReduce

0
回复
5412
查看
[复制链接]
累计签到:41 天
连续签到:1 天
来源: 原创 2019-7-5 10:01:47 显示全部楼层 |阅读模式

马上注册,查看详细内容!注册请先查看:注册须知

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 baizy77 于 2019-7-16 20:43 编辑

版权声明
---------------------------------------------------------------------------------------------------------------------
该文章原创于Qter开源社区(www.qter.org
作者: 女儿叫老白
转载请注明出处!
---------------------------------------------------------------------------------------------------------------------

网页版课程源码 提取码:1uy7
引言
------------------------------------------------------------------------------------
在上一节中,我们使用QtConcurrent的Map处理了并发操作:计算指定目录下所有文件的md5码。在那个示例中,这些操作的结果之间没有关联,我们只是利用CPU的多核执行并发操作。在本节的示例中,我们将演示通过将任务拆分并最终将计算结果进行合并,也就是通常所说的MapReduce(Map:把任务拆分并用多线程并发执行,Reduce:把各个线程计算结果汇总得到最终结果),我们将利用QtConcurrent的MapReduce进行开发。

正文
------------------------------------------------------------------------------------
上一节我们使用QtConcurrent::blockingMappedReduced()完成了并发操作,本节使用QtConcurrent::mappedReduced()接口,其定义如下:

代码清单13-04-01
qtconcurrentmap.h
  1. template <typename Sequence, typename MapFunctor, typename ReduceFunctor>
  2. QFuture<typename QtPrivate::ReduceResultType<ReduceFunctor>::ResultType> mappedReduced(const Sequence &sequence,
  3.               MapFunctor map,
  4.               ReduceFunctor reduce,
  5.               ReduceOptions options = ReduceOptions(UnorderedReduce | SequentialReduce));
复制代码
代码清单13-04-01摘自qtconcurrentmap.h。该清单所示的接口展示了模板接口mappedReduced()的定义。该接口提供了4个参数:
参数1为输入序列,比如一个文件名列表或者一个待处理的图片数组等;
参数2是功能函数,该函数接受sequence中一个成员作为参数;
参数3是reduce函数;
参数4是MapReduce选项集,该选项取值及含义如下:

表13-04-01
  
枚举
  
  
取值
  
  
含义
  
  UnorderedReduce
  
  0x1
  
  返回值不会按照输入序列中的顺序组织返回值,即乱序。
  
  OrderedReduce
  
  0x2
  
  返回值为按照输入序列中的顺序依次返回。
  
  SequentialReduce
  
  0x4
  
  一次仅有一个线程调用reduce函数。将来Qt会支持并发调用。
  
mappedReduced()接口返回值与参数3 reduce函数返回值类型一样。QtConcurrent会自动遍历sequence,把每一个成员作为参数传入MapFunctor接口map,然后调用reduce接口把map()函数调用后的返回值组织到一起并返回。
在本节课的示例中,我们将实现如下功能:
遍历指定目录中的所有文件,挨个计算MD5码,最后把文件名、该文件对应的MD5码组成一个映射:
    QMap<QString,QByteArray>;
我们介绍一下怎样使用QtConcurrent实现该功能。

首先,我们看一下单线程计算批量文件md5码

代码清单13-04-02
main.cpp
  1. #include <QApplication>
  2. #include <QTranslator>        // 国际化
  3. #include "qglobal.h"
  4. #include <QLibraryInfo> // 国际化

  5. #include <QDir>
  6. #include <QFile>
  7. #include <iostream>

  8. #include "baseapi.h"

  9. using std::cout;
  10. using std::endl;
  11. typedef QMap<QString, QByteArray> MD5Map;
  12. int main(int argc, char * argv[])
  13. {
  14.         // ......

  15. QStringList strFilters;
  16. // 需要注意避免把本demo产生的md5_a.txt、md5_b.txt统计在内。
  17. strFilters << "*.h" << "*.cpp";
  18.     // 得到待计算MD5码的文件列表.
  19.     QString strScanPath = ns_train::getPath("$TRAINDEVHOME");
  20.     qDebug() << strScanPath;
  21. QStringList files =
  22.              ns_train::getFileList(strScanPath, strFilters, true);
  23.     QStringList::iterator ite = files.begin();   
  24.     int singleThreadTime = 0; // 单线程计算md5码所需的时间。
  25.     QTime time;
  26.     time.start();
  27.     MD5Map md5_a;
  28.     { // 串行操作
  29.         QString strFileName;
  30.         for (; ite != files.end(); ite++) {
  31.             md5_a[*ite] = ns_train::getMd5(*ite);
  32.         }
  33.         singleThreadTime = time.elapsed(); // ms
  34.         qDebug() << "single thread" << singleThreadTime;
  35. }
  36. }
复制代码
在代码清单13-04-02中,
第14行,定义了MapReduce的返回值类型,也就是最终我们需要保存的处理结果,一个从文件名到该文件md5码的QMap。
第19~26行,遍历得到"$TRAINDEVHOME"目录下的所有文件(含所有子目录)并将文件列表保存到files变量;
第29~30行,为了对比使用单线程计算md5码与QtConcurrent计算md5码的速度,定义了time变量并启动计时;
第31~39行,遍历变量files文件列表,并挨个文件计算md5码,将得到的md5码保存到映射md5_a;
第37~38行,统计计算用时并输出;

为了对两种方法的计算结果进行对比,确保两种方法计算的到的md5码一致,我们将得到的md5码列表保存到文件,先保存单线程方案的结果:
代码清单13-04-03

main.cpp
  1. int main(int argc, char * argv[])
  2. {
  3.         // ......
  4.    // 将结果a写入文件
  5.     {
  6.         QString strFileName;
  7.         strFileName = ns_train::getPath("$TRAINDEVHOME/test/chapter13/ks13_04/md5_a.txt");
  8.         QString strDir = ns_train::getDirectory(strFileName);
  9.         QDir dir;
  10.         dir.mkpath(strDir);

  11.         QFile file(strFileName);
  12.         // 打开方式:只读、文本方式
  13.         if (!file.open(QFile::Truncate | QFile::WriteOnly | QFile::Text)) {
  14.             qDebug("open failed! file name is:%s",  
  15.                                     strFileName.toLocal8Bit().data());
  16.         }
  17.         else {
  18.             QMapIterator<QString, QByteArray> iteMd5(md5_a);
  19.             while (iteMd5.hasNext()) {
  20.                 iteMd5.next();
  21.                 file.write(iteMd5.key().toLocal8Bit());
  22.                 file.write(iteMd5.value());
  23.             }
  24.         }
  25.         file.close();
  26. }
  27. // ......
  28. }
复制代码
代码清单13-04-03实现的功能是将md5_a中的文件名、md5码保存到文件:
    "$TRAINDEVHOME/test/chapter13/ks13_03/md5_a.txt"
       请注意第20行的语法,定义了一个QMap的迭代器iteMd5,构造该迭代器时传入了map对象md5_a。
然后,我们看一下使用QtCocurrent的实现方案
       代码清单13-04-04

main.cpp
  1. // ......
  2. #include <qtconcurrentmap.h>
  3. // ......
  4. // map函数
  5. MD5Map getMd5OfFile(const QString& strFileName)
  6. {
  7.     qDebug() << "get md5 in thread" << QThread::currentThread();
  8.     MD5Map md5map;
  9.     md5map[strFileName] = ns_train::getMd5(strFileName);
  10.     return md5map;
  11. };
  12. // reduce函数
  13. void reduce(MD5Map &result, const MD5Map &w)
  14. {
  15.     QMapIterator<QString, QByteArray> i(w);
  16.     while (i.hasNext()) {
  17.         i.next();
  18.         result[i.key()] += i.value();
  19.     }
  20. }
  21. int main(int argc, char * argv[])
  22. {
  23. // ......
  24.     int mapReduceTime = 0;
  25.     MD5Map md5_b;
  26.     {
  27.         QTime time;
  28.         time.start();
  29.         md5_b = QtConcurrent::mappedReduced(files, getMd5OfFile, reduce);
  30.         mapReduceTime = time.elapsed();
  31.         qDebug() << "MapReduce" << mapReduceTime;
  32.     }
  33. }
  34. // ......
  35. }
复制代码
代码清单13-04-04中:
第2行,包含“qtconcurrentmap.h”,因为本示例用到的接口在该文件中定义;
    第5~11行,定义了Map函数,函数名为:getMd5OfFile,函数提供一个const QString&类型的参数,返回值为QByteArray类型。该函数的实现与上节类似,通过调用ns_train::getMd5()计算指定文件的md5码,返回值类型为MD5Map。
    第13~20行,是reduce函数的定义。该函数提供两个参数:
        参数1(输入、输出参数),reduce函数的计算结果(返回值);
        参数2(输入参数), map函数的计算结果w。
    该函数中定义了一个QMapIterator<QString, QByteArray> 类型的迭代器i,该迭代器使用w进行构造。在reduce函数中,通过对map函数的计算结果进行遍历,把map的计算结果汇总到输出result对象中。
    最后,看一下main()函数:
    第27~28行,目的仍然是统计运行时间;
    第29行,该行代码是本节课的核心,通过调用QtConcurrent::mappedReduced()实现了MapReduce。files作为输入序列(代码清单13-04-01的参数1),getMd5OfFile是map函数名,reduce是Reduce函数名;
    第30~31行,统计MapReduce用时并输出;

代码清单13-04-05

main.cpp
  1. // 将结果b写入文件
  2. int main(int argc, char * argv[])
  3. {
  4.     // ......
  5.      qDebug() << "Map speedup x"
  6.               << ((double)singleThreadTime - (double)mapTime) / (double)mapTime + 1;
  7.     // 将结果b写入文件
  8.     {
  9.         QString strFileName;
  10.         strFileName = ns_train::getPath("$TRAINDEVHOME/test/chapter13/ks13_03/md5_b.txt");
  11.         QString strDir = ns_train::getDirectory(strFileName);
  12.         QDir dir;
  13.         dir.mkpath(strDir);

  14.         QFile file(strFileName);
  15.         // 打开方式:只读、文本方式
  16.         if (!file.open(QFile::Truncate | QFile::WriteOnly | QFile::Text)) {
  17.             qDebug("open failed! file name is:%s",
  18.                   strFileName.toLocal8Bit().data());
  19.         }
  20.         else {
  21.             QList<QByteArray>::iterator iteMd5 = md5_b.begin();
  22.             for (; iteMd5 != md5_b.end(); iteMd5++) {
  23.                 file.write(*iteMd5);
  24.             }
  25.         }
  26.         file.close();
  27. }
  28. }
复制代码
在代码清单13-04-05中,跟方案1一样,将方案2结果b保存到文件中。在第5~7行将两个方案的用时进行比较并计算出比例值并输出,以便比较这两个方案哪个用时更少。读者可以自行运行一下本示例附件源码,看看效果,如果不明显可以试试多准备些文件。

结语
------------------------------------------------------------------------------------
       本文介绍了使用QtConcurrent:: mappedReduced处理并发操作并将结果汇总的MapReduce模式的开发方法,关键点在于编写map、reduce函数。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

公告
可以关注我们的微信公众号yafeilinux_friends获取最新动态,或者加入QQ会员群进行交流:190741849、186601429(已满) 我知道了