3.3.4 WritableComparable排序3.3.5 WritableComparable排序案例实操(全排序)3.3.6 WritableComparable排序案例实操(区内排序)3.3.7 Combiner合并3.3.8 Combiner合并案例实操3.3.9 GroupingComparator分组(辅助排序/分组排序)3.3.10 GroupingComparator分组案例实操3.4 MapTask工作机制3.5 ReduceTask工作机制3.6 OutputFormat数据输出3.6.1 OutputFormat接口实现类3.6.2 自定义OutputFormat3.6.3 自定义OutputFormat案例实操3.7 Join多种应用3.7.1 Reduce Join3.7.2 Reduce Join案例实操3.7.3 Map Join3.7.4 Map Join案例实操3.8 计数器应用3.9 数据清洗(ETL)3.9.1 数据清洗案例实操-简单解析版3.9.2 数据清洗案例实操-复杂解析版3.10 MapReduce开发总结第4章 Hadoop数据压缩4.1 概述4.2 MR支持的压缩编码4.3 压缩方式选择4.3.1 Gzip压缩4.3.2 Bzip2压缩4.3.3 Lzo压缩4.3.4 Snappy压缩4.4 压缩位置选择4.5 压缩参数配置4.6 压缩实操案例4.6.1 数据流的压缩和解压缩4.6.2 Map输出端采用压缩4.6.3 Reduce输出端采用压缩第5章 Yarn资源调度器5.1 Yarn基本架构5.3 Yarn工作机制5.4 作业提交全过程5.5 资源调度器5.6 任务的推测执行(秘籍)
0、排序概述
1、排序的分类
2、自定义排序WritableComparable(1)原理分析bean对象做为key传输,需要实现WritableComparable接口重写compareTo()方法,就可以实现排序。@Override public int compareTo(FlowBean o) { int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; } else if (sumFlow < bean.getSumFlow()) { result = 1; } else { result = 0; } return result; }
1、需求根据
案例2.3产生的结果再次对总流量进行排序。(1)输入数据
(2)期望输出数据
13509468723 7335 110349 117684 13736230513 2481 24681 27162 13956435636 132 1512 1644 13846544121 264 0 264 ......
2、需求分析
3、代码实现(1)FlowBean对象在在需求1基础上增加了比较功能package com.atguigu.mr.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable
(2)编写Mapper类
package com.atguigu.mr.sort; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FlowCountSortMapper extends Mapper
(3)编写Reducer类
package com.atguigu.mr.sort; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowCountSortReducer extends Reducer
(4)编写Driver类
package com.atguigu.mr.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSortDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "d:/temp/atguigu/0529/output2", "d:/temp/atguigu/0529/output8" }; // 1、获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2、指定本程序的jar包所在的本地路径 job.setJarByClass(FlowCountSortDriver.class); // 3、指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); // 4、指定mapper输出数据的kv类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5、指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6、指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7、将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
1、需求要求每个省份手机号输出的文件中按照总流量内部排序。2、需求分析基于前一个需求,增加自定义分区类,分区按照省份手机号设置。3、案例实操(1)增加自定义分区类package com.atguigu.mr.sort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner
(2)在驱动类中添加加载分区类
// 加载自定义分区类(即关联分区) job.setPartitionerClass(ProvincePartitioner.class); // 设置Reducetask个数 job.setNumReduceTasks(5);
Combiner合并是Hadoop框架优化的一种手段,因为Combiner合并减少了数据的IO传输。(6)自定义Combiner实现步骤(a)自定义一个Combiner继承Reducer,重写reduce()方法package com.atguigu.mr.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer
(b)在Job驱动类中设置:
job.setCombinerClass(WordcountCombiner.class);
1、需求统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。(1)数据输入banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang
(2)期望输出数据期望:Combine输入数据多,输出时经过合并,输出数据降低。2、需求分析
3、案例实操-方案一1)增加一个WordcountCombiner类继承Reducer
package com.atguigu.mr.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer
2)在WordcountDriver驱动类中指定Combiner
// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑 job.setCombinerClass(WordcountCombiner.class);
4、案例实操-方案二1)将WordcountReducer作为Combiner在WordcountDriver驱动类中指定
// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑 job.setCombinerClass(WordcountReducer.class);
运行程序,如下图所示:
对Reduce阶段的数据根据某一个或几个字段进行分组。分组排序步骤:(1)自定义类继承WritableComparator(2)重写compare()方法@Override public int compare(WritableComparable a, WritableComparable b) { // 比较的业务逻辑 // ...... return result; }
(3)创建一个构造将比较对象的类传给父类
protected OrderGroupingComparator() { super(OrderBean.class, true); }
1、需求有如下订单数据现在需要求出每一个订单中最贵的商品。(1)输入数据GroupingComparator.txt0000001 Pdt01 222.8 0000002 Pdt05 722.4 0000001 Pdt02 33.8 0000003 Pdt06 232.8 0000003 Pdt02 33.8 0000002 Pdt03 522.8 0000002 Pdt_04 122.4
(2)期望输出数据
1 222.8 2 722.4 3 232.8
2、需求分析(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。(2)在Reduce端利用GroupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,如下图所示。
3、代码实现(1)定义订单信息OrderBean类
package com.atguigu.mr.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable
(2)编写OrderSortMapper类
package com.atguigu.mr.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // 0000001 Pdt01 222.8 public class OrderSortMapper extends Mapper
(3)编写OrderSortGroupingComparator类
package com.atguigu.mr.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderSortGroupingComparator extends WritableComparator { /// / 创建一个构造将比较对象的类传给父类 // protected OrderSortGroupingComparator() { super(OrderBean.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable a, WritableComparable b) { // 比较的业务逻辑 // 要求按照只要是id相同,就认为是相同的key OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrderid() > bBean.getOrderid()) { result = 1; } else if (aBean.getOrderid() < bBean.getOrderid()) { result = -1; } else { result = 0; } return result; } }
(4)编写OrderSortReducer类
package com.atguigu.mr.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderSortReducer extends Reducer
(5)编写OrderSortDriver类
package com.atguigu.mr.order; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderSortDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "d:/temp/atguigu/0529/input/inputorder", "d:/temp/atguigu/0529/output11" }; // 1、获取配置信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2、设置jar包加载路径 job.setJarByClass(OrderSortDriver.class); // 3、加载map/reduce类 job.setMapperClass(OrderSortMapper.class); job.setReducerClass(OrderSortReducer.class); // 4、设置map输出数据key和value类型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5、设置最终输出数据的key和value类型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6、设置输入数据和输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 8、设置reduce端的分组 job.setGroupingComparatorClass(OrderSortGroupingComparator.class); // 7、提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
MapTask工作机制如下图所示。
Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。 溢写阶段详情:
快速排序算法
对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序
。这样,经过排序后,数据以分区为单位聚集在一起
,且同一分区内所有数据按照key有序
。
output/spillN.out
(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集
操作。
output/spillN.out.index
中。
output/file.out中,同时生成相应的索引文件
output/file.out.index。在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用
多轮递归合并的方式。每轮合并
io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的
随机读取带来的开销。
1、ReduceTask工作机制ReduceTask工作机制,如下图所示。
拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则溢写到磁盘上,否则直接放到内存中。
后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
将key相同的数据聚在一起,Hadoop采用了
基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了
局部排序,因此,ReduceTask只需对所有数据进行一次
归并排序即可。
2、设置ReduceTask并行度(个数)ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与
MapT