1、MapReduce进阶,Cloud Computing,主要内容(6学时),MPI(Message Passing Interface) 等并行编程方法缺少对高层并行编程模型和统一计算框架的支持,需要程序员处理许多底层细节, 为此MapReduce在三个层面上做了系统而巧妙的设计构思。在大数据处理的基本方法上,对相互计算依赖不大的数据采取“分而治之”的处理策略。借鉴了Lisp语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型和接口。对于诸多的底层实现和处理细节MapReduce提供了一个统一的计算框架,大大减轻了程序员在编程是的负担。,简介,把小的键值对合并成大的键值对M
2、ap计算过程中所产生的中间结果键值对需要通过网络传输给Reduce节点,大规模的键值对可能会大幅增大网络通信开销,并且降低程序执行速度,为此开采用一个基本的优化方法,即把大量小的键值对合并为较大的键值对。例如在单词同现矩阵计算中,单词a可能会与多个其他单词共同出现,因而一个Map可能会产生很多个单词a与其他单词的键值对,如下:,复合键值对的使用, 1 3 a b:1,c:3, d:5, e:8, f:4 4 8 4,复合键值对的使用,巧用复合键让系统完成排序,Map计算过程中,系统自动按照Map的输出键进行排序,因此进入Reduce的键值对都是按照key值排序的,但有时希望value也按一定规
3、则排序。方法1:在Reduce过程中对value列表中的值进行 本地排序,但当value列表数据量巨大时 必须使用复杂的外排算法,会很耗时。方法2:将value中需要排序的部分加入到key中, 形成复合键,这样能利用MapReduce系统 的排序功能自动完成排序。,Hadoop内置的数据类型 BooleanWritable:标准布尔型数值 ByteWritable:单字节数值 DoubleWritable:双字节数 FloatWritable:浮点数 IntWritable:整型数 LongWritable:长整型数 Text:使用UTF8格式存储的文本 NullWritable:当中的key
4、或value为空时使用,用户定制数据类型,自定义数据类型的实现首先实现Writable接口,以便该数据能被序列化后完成网络传输或文件输入/输出;其次,如果该数据需要作为key使用,或者要比较数值大小时,则需要实现 WritableComparable接口。例如将一个三维坐标P(x,y,z)定制为一个数据类型 pubic class Point3D implements Writable private float x,y,z; public void readFields(DataInput in) throws IOException public void write(DataOutput
5、 out) throws IOException ,用户定制数据类型,如果Point3D还需要作为主键值使用,或者需要比较大小时,还应该实现WritableComparable接口 pubic class Point3D implements WritableComparable private float x,y,z; public void readFields(DataInput in) throws IOException public void write(DataOutput out) throws IOException public int compareTo(Point3D
6、p) /具体实现比较当前的this(x,y,z)与p(x,y,z)的位置/并输出-1,0,1 ,用户定制数据类型,Hadoop内置数据输入格式和RecordReaderTextInputFormat:是系统默认的数据输入格式,可以文本文件分块逐行读入,读入一行时,所产生的key为当前行在整个文件中的字节偏移位置,而value就是行内容。KeyValueInputFormat:是另一个常用的数据输入格式,可将一个安照格式逐行存放的文件逐行读出,并自动解析成相应的key和value。,用户定制输入/输出格式,RecordReader:对于一个数据输入格式,都需要有一个对应的RecordReader
7、。 RecordReader主要用于将一个文件中的数据记录分拆成具体的键值对,传给Map函数。例如: TextInputFormat的默认RecordReader为Line RecordReader, KeyValueInputFormat的默认RecordReader为KeyValueLine RecordReader。除此之外,系统还提供很多输入格式,例如: AutoInputFormat, CombineFileInputFormat等,用户定制输入/输出格式,假设需要定制 FileNameLocInputFormat 与 FileNameLocRecordReader, 以便产生Fil
8、eNameLineOffset主键值,则可定制如下代码pubic class FileNameLocInputFormat extends FileInputFormatOverride public RecordReader createRecordReader(InputSplit split,TaskAttemptContext context) FileNameLocRecordReader fnrr = new FileNameLocRecordReader(); fnrr.initialize(split, context); ,用户定制数据输入格式与RecordReader,pu
9、bic class FileNameLocRecordReader extends RecordReaderString FileName;LineRecordReader lrr = new LineRecordReader(); Overridepublic Text getCurrnetKey() return new Text(“+FileName+”+lrr.getCurrentKey()+”);Overridepublic void initialize(InputSplit arg0, TaskAttemptContext arg1) .,用户定制数据输入格式与RecordRea
10、der,Hadoop内置的数据输出格式与RecordWriterHadoop提供了丰富的内置数据输出格式。最常用的是TextOutputFormat,也是系统默认的输出格式,可以将计算结果以key + t +value的形式逐行输入到文件中。数据输出格式也提供一个对应的RecordWriter,以便系统明确输出结果写到文件的具体格式。 TextOutputFormat的默认RecordWriter是LineRecordWriter。,用户定制输入/输出格式,默认情况下,MapReduce将产生包含一至多个文件的单个输出数据文件集合。但有时候需要输出多个文件集合。比如,在处理专利数据是,希望根据
11、不同国家,将每个国家的专利数据记录输出到不同国家的文件目录中。Hadoop提供了MultipleOutputFormat类帮助完成这一处理功能。在Reduce进行数据输出前,需要定制MultioleOutputFormat的一个子类,实现其中的一个重要方法 protected String generateFileNameForKeyValue(K key, V value, String name) 通过该放过,程序可以根据输入的主键产生并返回一个所期望的输出数据文 件名和路径,通过定制数据输出格式实现多集合文件输出,Hadoop MapReduce 提供了默认的Partition来完成Ma
12、p节点数据的中间结果向Reduce节点的分区处理。大多数情况下,应用程序仅使用默认的HashPartitiner即可满足计算要求,但有时候,数据可能不会被均匀的分配到每个Reduce上,或者不符合应用程序的分区要求,这时,就需要定制自己的Partitioner。,用户定制Partitioner,定制一个Partitioner大致方法如下,继承HashPartitioner,并重载getPartition()方法 class NewPartitioner extends HashPartitioner /override the methodgetPartition(K key, V value
13、, int numReduceTasks). ,用户定制Partitioner,用户也可以根据需要定制自己的Combiner,以减少Map阶段输出中间结果的数量,降低数据的网络传输开销。 class NewCombiner extends Reducer public void reduce(Text key, Iterable values, Context context) throw IOException, InterruptedException ,用户定制Combiner,一些复杂任务难以用一趟MapReduce处理完成,需要将其拆分为多趟简单的MapReduce子任务进行处理。本
14、节将介绍多种不同形式的组合MapReduce任务,包括迭代方法, 顺序组合方法, 具有依赖关系的组合方法,以及链式方法,组合式MapReduce计算作业,当使用MapReduce进行这样的问题求解时,运行一趟MapReduce过程将无法完成整个求解结果,因此,需要采用迭代法方循环运行该MapReduce过程,直到达到一个逼近结果。例如页面排序算法PageRank就是这一类需要用循环迭代MapReduce计算进行求解的问题。,迭代MapReduce计算任务,多个MapReduce子任务可以注意手工执行,但更方便的做法是将这些子任务串起来,前面的输出作为后面的输入。例如 mapreduce1 ma
15、preduce2 mapreduce3,顺序组合式MapReduce计算任务,例如,一个MapReduce作业有x, y, z三个子任务,它们的依赖关系如下:,具有复杂依赖关系的组合式MapReduce作业,Job,Jobz,Joby,Jobx,则他们的配置程序如下 /配置jobx Configuration jobxconf = . /设置JobControl Job jobx = JobCOntrol jc = new JobControl(); /配置joby jc.addJob(jobx); Configuration jobyconf = . jc.addJob(joby); Job
16、 joby = jc.addJob(jobz); /配置jobz jc.run(); Configuration jobzconf = . Job jobz = /设置依赖关系 jobz.addDependingJob(jobx); jobz.addDependingJob(joby);,具有复杂依赖关系的组合式MapReduce作业,一个MapReduce作业可能会有一些前处理和后处理步骤,比如文档倒排索引处理前需要去除“停用词”,若将其设置为一个单独的MapReduce作业,则可能会影响核心作业的效率。为此,一个较好的方法是在核心的Map和Reduce过程之外,把这些前后处理步骤实现为一些
17、辅助的Map过程,将这些辅助过程与核心Map和Reduce过程合并为一个链式MapReduce任务。,MapReduce前处理和后处理步骤的链式执行,Hadoop为此专门提供了链式Mapper(ChainMapper) 和链式Reducer(ChainReducer)来完成这种处理。ChainMapper允许在单一Map任务中添加和使用多个Map子任务;而ChainReducer允许在一个单一Reduce任务执行了Reduce处理后,继续使用多个Map子任务完成后续处理。,MapReduce前处理和后处理步骤的链式执行,一个MapReduce任务可能需要访问多个数据集,在关系数据库中,这将是两
18、个或多个表的连接(join)。Hadoop 系统没有关系数据库那样强大的连接处理功能,大多数时候需要程序员自己实现,本节介绍基于DataJoin类库实现Reduce端连接的方法,用全局文件复制实现Map端连接的方法,带Map端过滤的Reduce端连接方法,以及MapRedude数据连接方法的限制,多数据源的连接,用DataJoin类库完成数据源连接的基本处理方法如下:首先需要为不同数据源下的每个数据记录定义一个数据源标签(Tag)。进一步,为了能准确的标识一个数据源下的每个数据记录并完成连接处理,需要为每个带连接的数据记录确定一个连接主键(GroupKey)最后, DataJoin类库分别在M
19、ap和Reduce阶段提供一个处理框架,并尽可能的帮助程序员完成一些处理工作,剩余部分必须自己完成。,用DataJoin类库实现Reduce端连接,根据GroupKey进行分区,Map处理过程,数据源Customers1,王二,025-1111-11112,张三,021-2222-22223,李四,025-3333-33334,孙五,010-4444-4444,数据源Orders3,订单1,90, 2011.8.1 1,订单2,130,2011.8.6 2,订单3,220,2011.8.10 3,订单4,160,2011.8.18,Map,Map,Customers1,王二,025-1111-
20、1111,Customers2,张三,021-2222-2222,Customers3,李四,025-3333-3333,Customers4,孙五,010-4444-4444,Orders3,订单1,90, 2011.8.1,Orders 1,订单2,130,2011.8.6,Orders2,订单3,220,2011.8.10,Orders3,订单4,160,2011.8.18,1,2,3,4,3,1,2,3,Reduce节点接收到这些带标签的数据记录后,Reduce过程将对不同数据源标签下具有相同GroupKey的记录进行笛卡尔叉积,自动生成所有不同的叉积组合。然后对每一个叉积组合,由程序
21、员实现一个combine()方法,将这些具有相同GroupKey的记录进行适当的处理,以完成连接。过程如图:,Reduce处理过程,Reduce过程,Customers3,李四,025-3333-3333,Orders3,订单1,90, 2011.8.1,Orders3,订单4,160,2011.8.18,3,Reduce,Customers3,李四,025-3333-3333,Orders3,订单1,90, 2011.8.1,Customers3,李四,025-3333-3333,Orders3,订单4,160,2011.8.18,Combine(),Combine(),3,李四,025-3
22、333-3333,订单1,90,2011.8.1,3,李四,025-3333-3333,订单4,160,2011.8.18,前述用DataJoin类实现的Reduce端连接方法中,连接操作直到Reduce端才能进行,因为很多无效的连接组合数据在Reduce阶段才能去除,所以大量的网络带宽被用来传输不用的数据,因此,这种方法效率不高。当数据源的数据量较小时,能够放在单节点的内存时,可以使用称为“复制连接”的全局文件复制方法,把较小的数据源复制到每个Map节点上,然后在Map阶段完成连接操作,用全局文件复制方法实现Map端连接,Hadoop提供了一个Distributed Cache机制用于将一个
23、或多个文件分布复制到所有节点上。要利用此机制,需要涉及到以下两部分的设置(1) Job类中 public void addCacheFile(URI uri): 将一个文件存放到Distributed Cache文件中(2) Mapper 或Reducer的context类中 public Path getLocalCacheFiles(): 获取设置在Distributed Cache中的文件路径,用全局文件复制方法实现Map端连接,当较小的数据源文件也无法放入内存时,可采用以下办法:(1) 可以首先过滤较小数据源文件中的记录,只保 留需要进行连接的记录(2) 可以将较小数据源文件分割为能n
24、个小文件,其 中每个小文件都能放入内存处理,然后分别对 这n个小文件用全局文件复制方法进行与较大源 文件的连接,最后把结果合并起来,用全局文件复制方法实现Map端连接,如果过滤后数据仍然无法放在内存中处理,可采用带Map端过滤的Reduce端连接处理。具体过程为在Map端先生成一个仅包含连接主键的过滤文件,由于这个文件的数据量大大降低,则可将这个文件存放在Distributed Cache文件中,然后在Map端过滤掉主键不在这个列表中的所有记录,然后再实现正常的Reduce端连接,带Map端过滤的Reduce端连接,为了能让用户灵活设置某些作业参数,一个MapReduce计算任务可能需要在执行
25、时从命令行输入这些作业参数,并将这个参数传递给各个节点Configuration类为此专门提供了用于保存和获取属性的方法例如: public void set(String name, String value);/设置字符串属性 public void get(String name, String defaultValue) /读取字符串属性 /将一个参数设置为name属性 jobconf.set(“name”, args0); /然后可在Mapper或Reducer类的初始化方法setup中从Configuration对象读出该属性值 jobconf.get(“name”, “”);,全
26、局参数的传递,这里同样也用到了Distributed Cache机制。要利用此机制,需要涉及到以下两部分的设置(1) Job类中 public void addCacheFile(URI uri): 将一个文件存放到Distributed Cache文件中(2) Mapper 或Reducer的context类中 public Path getLocalCacheFiles(): 获取设置在Distributed Cache中的文件路径,全局数据文件的传递,Hadoop提供了相应的从关系数据库查询和读取数据的接口。DBInputFormat:提供从数据库读取数据的格式DBRecordReade
27、r:提供读取数据记录的接口Hadoop查询和读取关系数据库的处理效率比较低因此DBInputFormat仅适合读取小量的数据,对于读取大量的数据,可以用数据库中的工具将数据输出为文本文件,并上载到HDFS中处理。,关系数据库的连接与访问,Hadoop提供了相应的向关系数据库直接输出计算结果的编程接口。DBOutputFormat:提供向数据库输出数据的格式DBRecordWriter:提供向数据库写入数据记录的接口DBConfiguration:提供数据库配置和创建连接的接口/连接DB的静态方法Public static void configureDB(Job job, String dri
28、verClass, String dbUrl, String userName, String password)Job 为当前准备执行的作业, driveClass为数据库厂商提供的访问数据库的驱动程序dbUrl 为运行数据库主机的地址,userName和password为访问数据库的用户名与密码,关系数据库的连接与访问,数据库连接完成后,即可完成从MapReduce程序向关系数据库写入数据的操作,DBOutputFormat提供了一个静态方法来指定需要写入的数据表和字段: public static void setOutput(Job job, String tableName, String fieldNames) 其中tableName指定即将写入数据的表名,后续参数指定哪些字段数据将写入该表,关系数据库的连接与访问,为了能完成向数据库中的写入操作,程序员还需要实现DBWritable: public class NewDBWritable implements Writable, DBWritable public void write(DataOutput out) . ,关系数据库的连接与访问,完,