baizy77 发表于 2019-7-5 10:01:47

KS13-04 使用QtConcurrent处理并发-MapReduce

本帖最后由 baizy77 于 2019-7-16 20:43 编辑

版权声明---------------------------------------------------------------------------------------------------------------------该文章原创于Qter开源社区(www.qter.org)作者: 女儿叫老白转载请注明出处!---------------------------------------------------------------------------------------------------------------------课程目录: 【独家连载】《Qt入门与提高-GUI产品开发》目录
网页版课程源码 提取码:1uy7引言------------------------------------------------------------------------------------在上一节中,我们使用QtConcurrent的Map处理了并发操作:计算指定目录下所有文件的md5码。在那个示例中,这些操作的结果之间没有关联,我们只是利用CPU的多核执行并发操作。在本节的示例中,我们将演示通过将任务拆分并最终将计算结果进行合并,也就是通常所说的MapReduce(Map:把任务拆分并用多线程并发执行,Reduce:把各个线程计算结果汇总得到最终结果),我们将利用QtConcurrent的MapReduce进行开发。
正文------------------------------------------------------------------------------------上一节我们使用QtConcurrent::blockingMappedReduced()完成了并发操作,本节使用QtConcurrent::mappedReduced()接口,其定义如下:
代码清单13-04-01qtconcurrentmap.htemplate <typename Sequence, typename MapFunctor, typename ReduceFunctor>
QFuture<typename QtPrivate::ReduceResultType<ReduceFunctor>::ResultType> mappedReduced(const Sequence &sequence,
            MapFunctor map,
            ReduceFunctor reduce,
            ReduceOptions options = ReduceOptions(UnorderedReduce | SequentialReduce));
代码清单13-04-01摘自qtconcurrentmap.h。该清单所示的接口展示了模板接口mappedReduced()的定义。该接口提供了4个参数:参数1为输入序列,比如一个文件名列表或者一个待处理的图片数组等;参数2是功能函数,该函数接受sequence中一个成员作为参数;参数3是reduce函数;参数4是MapReduce选项集,该选项取值及含义如下:
表13-04-01
枚举取值含义
UnorderedReduce
0x1
返回值不会按照输入序列中的顺序组织返回值,即乱序。

OrderedReduce
0x2
返回值为按照输入序列中的顺序依次返回。

SequentialReduce
0x4
一次仅有一个线程调用reduce函数。将来Qt会支持并发调用。

mappedReduced()接口返回值与参数3 reduce函数返回值类型一样。QtConcurrent会自动遍历sequence,把每一个成员作为参数传入MapFunctor接口map,然后调用reduce接口把map()函数调用后的返回值组织到一起并返回。在本节课的示例中,我们将实现如下功能:遍历指定目录中的所有文件,挨个计算MD5码,最后把文件名、该文件对应的MD5码组成一个映射:    QMap<QString,QByteArray>;我们介绍一下怎样使用QtConcurrent实现该功能。
首先,我们看一下单线程计算批量文件md5码
代码清单13-04-02main.cpp#include <QApplication>
#include <QTranslator>      // 国际化
#include "qglobal.h"
#include <QLibraryInfo> // 国际化

#include <QDir>
#include <QFile>
#include <iostream>

#include "baseapi.h"

using std::cout;
using std::endl;
typedef QMap<QString, QByteArray> MD5Map;
int main(int argc, char * argv[])
{
      // ......

QStringList strFilters;
// 需要注意避免把本demo产生的md5_a.txt、md5_b.txt统计在内。
strFilters << "*.h" << "*.cpp";
    // 得到待计算MD5码的文件列表.
    QString strScanPath = ns_train::getPath("$TRAINDEVHOME");
    qDebug() << strScanPath;
QStringList files =
             ns_train::getFileList(strScanPath, strFilters, true);
    QStringList::iterator ite = files.begin();   
    int singleThreadTime = 0; // 单线程计算md5码所需的时间。
    QTime time;
    time.start();
    MD5Map md5_a;
    { // 串行操作
      QString strFileName;
      for (; ite != files.end(); ite++) {
            md5_a[*ite] = ns_train::getMd5(*ite);
      }
      singleThreadTime = time.elapsed(); // ms
      qDebug() << "single thread" << singleThreadTime;
}
}
在代码清单13-04-02中,第14行,定义了MapReduce的返回值类型,也就是最终我们需要保存的处理结果,一个从文件名到该文件md5码的QMap。第19~26行,遍历得到"$TRAINDEVHOME"目录下的所有文件(含所有子目录)并将文件列表保存到files变量;第29~30行,为了对比使用单线程计算md5码与QtConcurrent计算md5码的速度,定义了time变量并启动计时;第31~39行,遍历变量files文件列表,并挨个文件计算md5码,将得到的md5码保存到映射md5_a;第37~38行,统计计算用时并输出;
为了对两种方法的计算结果进行对比,确保两种方法计算的到的md5码一致,我们将得到的md5码列表保存到文件,先保存单线程方案的结果:代码清单13-04-03
main.cppint main(int argc, char * argv[])
{
      // ......
   // 将结果a写入文件
    {
      QString strFileName;
      strFileName = ns_train::getPath("$TRAINDEVHOME/test/chapter13/ks13_04/md5_a.txt");
      QString strDir = ns_train::getDirectory(strFileName);
      QDir dir;
      dir.mkpath(strDir);

      QFile file(strFileName);
      // 打开方式:只读、文本方式
      if (!file.open(QFile::Truncate | QFile::WriteOnly | QFile::Text)) {
            qDebug("open failed! file name is:%s",
                                    strFileName.toLocal8Bit().data());
      }
      else {
            QMapIterator<QString, QByteArray> iteMd5(md5_a);
            while (iteMd5.hasNext()) {
                iteMd5.next();
                file.write(iteMd5.key().toLocal8Bit());
                file.write(iteMd5.value());
            }
      }
      file.close();
}
// ......
}
代码清单13-04-03实现的功能是将md5_a中的文件名、md5码保存到文件:    "$TRAINDEVHOME/test/chapter13/ks13_03/md5_a.txt"       请注意第20行的语法,定义了一个QMap的迭代器iteMd5,构造该迭代器时传入了map对象md5_a。然后,我们看一下使用QtCocurrent的实现方案       代码清单13-04-04
main.cpp// ......
#include <qtconcurrentmap.h>
// ......
// map函数
MD5Map getMd5OfFile(const QString& strFileName)
{
    qDebug() << "get md5 in thread" << QThread::currentThread();
    MD5Map md5map;
    md5map = ns_train::getMd5(strFileName);
    return md5map;
};
// reduce函数
void reduce(MD5Map &result, const MD5Map &w)
{
    QMapIterator<QString, QByteArray> i(w);
    while (i.hasNext()) {
      i.next();
      result += i.value();
    }
}
int main(int argc, char * argv[])
{
// ......
    int mapReduceTime = 0;
    MD5Map md5_b;
    {
      QTime time;
      time.start();
      md5_b = QtConcurrent::mappedReduced(files, getMd5OfFile, reduce);
      mapReduceTime = time.elapsed();
      qDebug() << "MapReduce" << mapReduceTime;
    }
}
// ......
}
代码清单13-04-04中:第2行,包含“qtconcurrentmap.h”,因为本示例用到的接口在该文件中定义;    第5~11行,定义了Map函数,函数名为:getMd5OfFile,函数提供一个const QString&类型的参数,返回值为QByteArray类型。该函数的实现与上节类似,通过调用ns_train::getMd5()计算指定文件的md5码,返回值类型为MD5Map。    第13~20行,是reduce函数的定义。该函数提供两个参数:      参数1(输入、输出参数),reduce函数的计算结果(返回值);      参数2(输入参数), map函数的计算结果w。    该函数中定义了一个QMapIterator<QString, QByteArray> 类型的迭代器i,该迭代器使用w进行构造。在reduce函数中,通过对map函数的计算结果进行遍历,把map的计算结果汇总到输出result对象中。    最后,看一下main()函数:    第27~28行,目的仍然是统计运行时间;    第29行,该行代码是本节课的核心,通过调用QtConcurrent::mappedReduced()实现了MapReduce。files作为输入序列(代码清单13-04-01的参数1),getMd5OfFile是map函数名,reduce是Reduce函数名;    第30~31行,统计MapReduce用时并输出;
代码清单13-04-05
main.cpp// 将结果b写入文件
int main(int argc, char * argv[])
{
    // ......
   qDebug() << "Map speedup x"
            << ((double)singleThreadTime - (double)mapTime) / (double)mapTime + 1;
    // 将结果b写入文件
    {
      QString strFileName;
      strFileName = ns_train::getPath("$TRAINDEVHOME/test/chapter13/ks13_03/md5_b.txt");
      QString strDir = ns_train::getDirectory(strFileName);
      QDir dir;
      dir.mkpath(strDir);

      QFile file(strFileName);
      // 打开方式:只读、文本方式
      if (!file.open(QFile::Truncate | QFile::WriteOnly | QFile::Text)) {
            qDebug("open failed! file name is:%s",
                  strFileName.toLocal8Bit().data());
      }
      else {
            QList<QByteArray>::iterator iteMd5 = md5_b.begin();
            for (; iteMd5 != md5_b.end(); iteMd5++) {
                file.write(*iteMd5);
            }
      }
      file.close();
}
}
在代码清单13-04-05中,跟方案1一样,将方案2结果b保存到文件中。在第5~7行将两个方案的用时进行比较并计算出比例值并输出,以便比较这两个方案哪个用时更少。读者可以自行运行一下本示例附件源码,看看效果,如果不明显可以试试多准备些文件。
结语------------------------------------------------------------------------------------       本文介绍了使用QtConcurrent:: mappedReduced处理并发操作并将结果汇总的MapReduce模式的开发方法,关键点在于编写map、reduce函数。
页: [1]
查看完整版本: KS13-04 使用QtConcurrent处理并发-MapReduce