1.介绍 是海量数据的计算问题(压缩数据去计算)
1.序列化2.框架原理 1. 输入数据InputFormat2. Shuffle3.输出数据 OutputFormat4. Join5.ETL(数据清洗)3.压缩4.优点1.易于编程 我们不用考虑多线程直接去实现2.良好拓展性 可动态增加服务器3.高容错性,如果挂一台服务器,可转移任务给其他机器计算4.适合海量数据(TB/PB)5.缺点1.不擅长实时计算 mysql毫秒级 实时返回2.不擅长流式计算 只能是一个静态文件直接计算 sparkstreaming flink擅长3.不擅长DAG有向无环图计算,spark擅长, 任务相互依赖没有环状
2.mapReduce核心思想
wordcount统计单词a-p一个文件,q-z一个文件如图hd7Map阶段(并行处理,按空格分行内单词,kv对(有这个单词v为1))使用MapTask先进行分区, Reduce阶段统计的单词的value相加,都为1,相加起来得到个数
3.MapReduce进程(2,3查看时是YarnChild)
1.MrAppMaster过程的调度和状态2.MapTask Map阶段数据处理3.ReduceTask Reduce数据处理
4.查看workcount代码
cd /opt/module/hadoop3.3/share/hadoop/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar//下载到本地yum install -y lrzszsz //资料的反编译工具jd-gui.exe//拖拽过去 WordCount.java三个类继承//就是输入数据类型,输出数据类型 IntWritable是Int类型Mapper
5.Mapper阶段代码规范
1.自定义Mapper继承自己的父类2.Mapper输入输出数据是kv对形式3.Mapper业务逻辑写在map方法中4.map方法对每个kv调用一次,全部多少个调用多少次 如k为0,v为hello 调用,0代表第0行
6.Reducer阶段
同上,d但是
1. 继承Reducer这个父类2.业务逻辑在reduce方法
如图hd8,处理流程
7.Driver阶段
相当于Yarn客户端,MapReduce运行job对象提交到yarn集群
8.WordCount案例过程
图hd8
//导入依赖
org.apache.hadoop hadoop-client 3.3.0 junit junit 4.12 org.slf4j slf4j-log4j12 1.7.30
//写3个类 WordCountDriver WordCountMapper WordCountReducer
WordCountMapper继承Mapper(mapreduce的包) 看输入和输出数据 Text是hadoopio包
LongWritable是第几行 Mapper
重写map方法(打map+enter自动写),处理数据
//下面是xxMapper.java
String line=value.toString();String[] words=line.split(" ");//切割数据//输出数据for(String word:words){private Text outK=new Text();//写到类内部最外层提高效率private IntWritable outV=new IntWritable(1);//同上outK.set(word); context.write(outK,outV);}
>
> //下面是xxMapper.java//class WordCountReducer,extends Reducer(mapreduce)Reducerreduce回车//Iterable是一个List集合父父类int sum=0;for(IntWritable value: values){sum+=value.get(); } IntWritable intWritable = new IntWritable();intWritable.set(sum);context.write(key,intWritable);
//WordCountDriver.java固定的套路,建main方法
这里是引用
//1.获取jobnew Configuration();Configuration conf = new Configuration();Job job = Job.getInstance(conf);//mapreduce的//2.设置jar包路径job.setJarByClass(WordCountDriver.class);//3.关联mapper和reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//4.设置map输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//5.设置最终输出和kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//6.设置输入路径和输出路径FileInputFormat.setInputPaths(job,new Path("D:\\input\\inputword"));FileOutputFormat.setOutputPaths(job,new Path("D:\\hadoop\\output1"));//7.提交job,true表示尽可能打印信息boolean result=job.waitForCompletion(true);System.exit(result?0:1); //生成的文件part-r-0000 r代表压缩了 第一块分块数据
9.我们的WordsCount程序是用windows安装的hadoop运行的,我们需要把他打包到linux hadoop集群上运行
1.加上maven插件 jar只有6kb,org.apache.maven.plugins maven-compiler-plugin 7 7 2.启动集群myhadoop.sh start3.java怎么写路径? 变成main方法传入的参数FileInputFormat.setInputPaths(job,new Path(args[0]));FileInputFormat.setInputPaths(job,new Path(args[1]));4.jar上传到hadoop3.3文件夹5.运行,(文件夹不要存在,存在就删除) 复制driver的reference 全限定名称hadoop jar hadoopcount.jar com.example.hadoopcount.WordCountDriver /input /output//!!!在企业中先用win环境写代码,然后放linux测试,通常有生成多个jar文件,后面有工具进行自动化方便管理处理过程总结:先在mapper数据每一行取出来调用map方法一次, 全部处理完成然后到reducer 的list序列, 把相同的数据合并个数 为sum,然后写入
10.hadoop序列化(类比java序列化) 反序列化就是把传输过来的信息解密
java序列化为了传输(包括了传输基本信息和 校验信息/头/继承体系) hadoop序列化 (基本信息和简单校验)
因为hadoop在内部传输
好处: 1.存储空间少紧凑 传输速度快快速. 互操作性:跨语言可以反序列化
11.hadoop实现序列化在 Writable类查看 write序列化 readFields反序列化
自定义序列化:(企业中不满足) !!!序列化反序列化顺序一致 如下
12.案例, 统计每一个手机号耗费的总上行流量,下行流量,总流量
1. key为手机号,value为封装的对象//定义一个class FlowBean implements Writable1.实现接口2.重新序列化和反序列化方法 3.重写无参数构造调用super() ,写 long 变量 upFlow上行 downFlow 下行 sumFlow 总流量生成get和set方法,setSum方法为上行+下行//顺序要相同
write( out){out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}readFields(){this.upFlow=in.readLong();this.downFlow=in.readLong();this.sumFlow=in.readLong();}//可以定义你想输出的格式toString(){return upFlow+"\t"+downFlow+"\t"+sumFlow;}2.FlowMapperpublic class FlowMapper extends Mapper {private Text outK = new Text();private FlowBean outV = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line=value.toString();String[] split = line.split("\t");String phone=split[1];String upFlow=split[split.length-3];//重后面往前取,以免前面数据是空数据String downFlow=split[split.length-2];outK.set(phone);outV.setUpFlow(Long.parseLong(upFlow));outV.setDownFlow(Long.parseLong(downFlow));outV.setSumFlow();context.write(outK,outV);}
}3.FlowReducerpublic class FlowReducer extends Reducer {private FlowBean outV = new FlowBean();@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {long totalUp = 0;long totalDown = 0;for (FlowBean value : values) {totalUp+= value.getUpFlow();totalDown+= value.getDownFlow();}outV.setUpFlow(totalUp);outV.setDownFlow(totalDown);outV.setSumFlow();context.write(key,outV);}
}4.FlowDriver
public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2 关联本Driver类job.setJarByClass(FlowDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);//4 设置Map端输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//5 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//6 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("E:\\input1"));FileOutputFormat.setOutputPath(job, new Path("E:\\flowoutput"));//7 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}//!!!总结 mapper负责格式处理(一行一行处理) , reducer负责数据计算(一个一个相同字段进行处理) driver负责调用
13.MapReduce框架原理
如图 hd9
shuffle可进行排序,压缩..inputFormat 进行输入outputFormat进行输出 可以输出到mysql es hbase....MapTask不是越多越好,如果1kb启动MapTask的时间比处理的时间长,因为还有提供网络传输本地maptask自己处理最快,当块大小适合时1.一个jobmap阶段并行度由客户端在提交job时的切片数决定(并行度,就是处理效率)2.每个split切片分配一个maptask并行实例处理(多个maptask一起处理切片)3.默认情况 切片大小等于块大小(默认为一块,不切开)4.切片时不考虑数据集整体,逐一对每个文件进行单独切片(不同文件,独立进行切片)
14.切片源码
//length是txt文件里面的内容大小//不是所有文件支持切片,不能切片需要压缩
//如果切片出来有一点点空间,那么纳为一个整体,默认1.1倍 32m 32.1m也切片
//调整块大小mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue
15.企业常用的TextInputFormat CombineTextInputFormat
TextInputFormat 是按一行处理
NLineInputFormat 按多行处理
CombineTextInputFormat 是处理多个超小文件处理(TextInputFormat 无论多小都切片,小文件效率低),规定全部小文件加起来小于多少才可以统一切片CombineTextInputFormat.setMaxInputSplitSize(job,41000000);///4m例如 先排序块大小,然后 我文件是5.1m 会判断 <4m为一片 , >4m && <4*2m切割2片文件2是1.7m ,为一片 和2.55m排序后的结果 合并为一片
16.CombineTextInputFormat实操 创建4个小文件
//普通 TextInputFormat的日志 number of splits : 4//在之前wordcount案例的driver修改job.setInputFormatClass(CombineTextInputFormat.class);CombineTextInputFormat.setMaxInputSplitSize(job,41000000);///4m
17.shuffle map方法之后 reduce方法之前的处理过程
//数据分区合并压缩
18.partition分区 如果要求把不同省份的数据 分类到不同的文件中
在wordcount案例中写
job.setNumReduceTask(2);//文件内容会分到2个文件
//自定义Partitioner类,原来的是根据hashcode取and来决定哪个分区在哪个文件
1. CumstomPartitioner extends Partition//重写方法getPartition(Text key,FlowBean value,int numPartitions){//控制分区的代码逻辑,由你决定String phone= text.toString();//取到前面三位String prePhone=phone.substring(0,3);int partition;if("136".equals(prePhone)){partition=0;}else 137 1 138 2 139 3 其他4 }2.在driver的job 自定义Partitionerjob.setPartitionerClass(CumstomPartitioner.class);3.设置reduceTaskjob.setNumReduceTask(5);//
19.分区总结 注意partition从0开始
1.reduceTask=1 只会产生1个文件2.reduceTask>getPartition数量 产生多几个空文件3.reduceTask
20.mapreduce为什么要排序? 为了提供系统性能,相同的key如果每次需要去对比是否相同,需要消耗大量的数据,排序后放在一起就可以轻松比较
reducetask 统一对内存和磁盘所有数据进行一次归并排序
1.部分(分区)排序(企业) 部分字段进行排序FlowBean 改为 implements WritableComparable实现compareTo(FlowBean o)方法if(this.sumFlow>o.sumFlow){return -1;//代表倒序}else if(this.sumFlowreturn 1;}else{return 0;}private FlowBean outK=new FlowBean();private Text outV=new Text();mapper类String line =value.toString();String[] split =line.split("\t");outV.set(split[0]);outK.setUpFlow(Long.parseLong(split[1]));outK.setDownFlow(Long.parseLong(split[2]));outK.setSumFlow();context.write(outK,outV);//reducer Reducer reduce方法 values是指得到重复的值protected void reduce(FlowBean key, Iterable values, Context context)for(Text value : values){context.write(value,key);}//driver job的key和value调换,使用上一个案例的输出数据//4 设置Map端输出KV类型(处理过程中的key和value)job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//5 设置程序最终输出的KV类型,最终输出的key和valuejob.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);
2.全排序 输出结果只有一个文件,只有一个分区设置一个ReduceTask 3.辅助排序在多写一个reduce类,来排序(groupComparator)4.二次排序 compareTo 判断条件为两个(满足2个条件)