05.hadoopMapReduce
迪丽瓦拉
2025-06-01 06:17:07
0

1.介绍 是海量数据的计算问题(压缩数据去计算)

 1.序列化2.框架原理   1. 输入数据InputFormat2. Shuffle3.输出数据 OutputFormat4. Join5.ETL(数据清洗)3.压缩4.优点1.易于编程 我们不用考虑多线程直接去实现2.良好拓展性  可动态增加服务器3.高容错性,如果挂一台服务器,可转移任务给其他机器计算4.适合海量数据(TB/PB)5.缺点1.不擅长实时计算 mysql毫秒级 实时返回2.不擅长流式计算 只能是一个静态文件直接计算 sparkstreaming flink擅长3.不擅长DAG有向无环图计算,spark擅长, 任务相互依赖没有环状

2.mapReduce核心思想

    wordcount统计单词a-p一个文件,q-z一个文件如图hd7Map阶段(并行处理,按空格分行内单词,kv对(有这个单词v为1))使用MapTask先进行分区, Reduce阶段统计的单词的value相加,都为1,相加起来得到个数

请添加图片描述

3.MapReduce进程(2,3查看时是YarnChild)

 1.MrAppMaster过程的调度和状态2.MapTask  Map阶段数据处理3.ReduceTask Reduce数据处理

4.查看workcount代码

    cd /opt/module/hadoop3.3/share/hadoop/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar//下载到本地yum install -y lrzszsz //资料的反编译工具jd-gui.exe//拖拽过去  WordCount.java三个类继承//就是输入数据类型,输出数据类型 IntWritable是Int类型MapperJava的String类型对应Hadoop类型Text,其他的后面加Writable

5.Mapper阶段代码规范

    1.自定义Mapper继承自己的父类2.Mapper输入输出数据是kv对形式3.Mapper业务逻辑写在map方法中4.map方法对每个kv调用一次,全部多少个调用多少次  如k为0,v为hello 调用,0代表第0行

6.Reducer阶段
同上,d但是

   1. 继承Reducer这个父类2.业务逻辑在reduce方法
如图hd8,处理流程

请添加图片描述

7.Driver阶段

 相当于Yarn客户端,MapReduce运行job对象提交到yarn集群

8.WordCount案例过程
图hd8

//导入依赖

org.apache.hadoophadoop-client3.3.0junitjunit4.12org.slf4jslf4j-log4j121.7.30

//写3个类 WordCountDriver WordCountMapper WordCountReducer
WordCountMapper继承Mapper(mapreduce的包) 看输入和输出数据 Text是hadoopio包
LongWritable是第几行 Mapper
重写map方法(打map+enter自动写),处理数据
//下面是xxMapper.java

   String line=value.toString();String[] words=line.split(" ");//切割数据//输出数据for(String word:words){private Text outK=new Text();//写到类内部最外层提高效率private IntWritable outV=new IntWritable(1);//同上outK.set(word); context.write(outK,outV);}
> 
>   //下面是xxMapper.java//class WordCountReducer,extends Reducer(mapreduce)Reducerreduce回车//Iterable是一个List集合父父类int sum=0;for(IntWritable value: values){sum+=value.get();   }  IntWritable intWritable = new IntWritable();intWritable.set(sum);context.write(key,intWritable);
//WordCountDriver.java固定的套路,建main方法

这里是引用

  //1.获取jobnew Configuration();Configuration conf = new Configuration();Job job = Job.getInstance(conf);//mapreduce的//2.设置jar包路径job.setJarByClass(WordCountDriver.class);//3.关联mapper和reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//4.设置map输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//5.设置最终输出和kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//6.设置输入路径和输出路径FileInputFormat.setInputPaths(job,new Path("D:\\input\\inputword"));FileOutputFormat.setOutputPaths(job,new Path("D:\\hadoop\\output1"));//7.提交job,true表示尽可能打印信息boolean result=job.waitForCompletion(true);System.exit(result?0:1);   //生成的文件part-r-0000 r代表压缩了 第一块分块数据 

9.我们的WordsCount程序是用windows安装的hadoop运行的,我们需要把他打包到linux hadoop集群上运行

   1.加上maven插件 jar只有6kb,org.apache.maven.pluginsmaven-compiler-plugin772.启动集群myhadoop.sh start3.java怎么写路径? 变成main方法传入的参数FileInputFormat.setInputPaths(job,new Path(args[0]));FileInputFormat.setInputPaths(job,new Path(args[1]));4.jar上传到hadoop3.3文件夹5.运行,(文件夹不要存在,存在就删除) 复制driver的reference 全限定名称hadoop jar hadoopcount.jar com.example.hadoopcount.WordCountDriver  /input /output//!!!在企业中先用win环境写代码,然后放linux测试,通常有生成多个jar文件,后面有工具进行自动化方便管理处理过程总结:先在mapper数据每一行取出来调用map方法一次, 全部处理完成然后到reducer 的list序列, 把相同的数据合并个数 为sum,然后写入

10.hadoop序列化(类比java序列化) 反序列化就是把传输过来的信息解密

java序列化为了传输(包括了传输基本信息和 校验信息/头/继承体系) hadoop序列化 (基本信息和简单校验)
因为hadoop在内部传输
好处: 1.存储空间少紧凑 传输速度快快速. 互操作性:跨语言可以反序列化

11.hadoop实现序列化在 Writable类查看 write序列化 readFields反序列化
自定义序列化:(企业中不满足) !!!序列化反序列化顺序一致 如下
12.案例, 统计每一个手机号耗费的总上行流量,下行流量,总流量

   1.        key为手机号,value为封装的对象//定义一个class FlowBean implements Writable1.实现接口2.重新序列化和反序列化方法 3.重写无参数构造调用super()	 ,写 long 变量 upFlow上行 downFlow 下行 sumFlow 总流量生成get和set方法,setSum方法为上行+下行//顺序要相同
      write( out){out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}readFields(){this.upFlow=in.readLong();this.downFlow=in.readLong();this.sumFlow=in.readLong();}//可以定义你想输出的格式toString(){return upFlow+"\t"+downFlow+"\t"+sumFlow;}2.FlowMapperpublic class FlowMapper extends Mapper {private Text outK = new Text();private FlowBean outV = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line=value.toString();String[] split = line.split("\t");String phone=split[1];String upFlow=split[split.length-3];//重后面往前取,以免前面数据是空数据String downFlow=split[split.length-2];outK.set(phone);outV.setUpFlow(Long.parseLong(upFlow));outV.setDownFlow(Long.parseLong(downFlow));outV.setSumFlow();context.write(outK,outV);}
}3.FlowReducerpublic class FlowReducer extends Reducer {private FlowBean outV = new FlowBean();@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {long totalUp = 0;long totalDown = 0;for (FlowBean value : values) {totalUp+= value.getUpFlow();totalDown+= value.getDownFlow();}outV.setUpFlow(totalUp);outV.setDownFlow(totalDown);outV.setSumFlow();context.write(key,outV);}
}4.FlowDriver   
public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2 关联本Driver类job.setJarByClass(FlowDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);//4 设置Map端输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//5 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//6 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("E:\\input1"));FileOutputFormat.setOutputPath(job, new Path("E:\\flowoutput"));//7 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}//!!!总结 mapper负责格式处理(一行一行处理) , reducer负责数据计算(一个一个相同字段进行处理) driver负责调用

13.MapReduce框架原理
如图 hd9
请添加图片描述

   shuffle可进行排序,压缩..inputFormat 进行输入outputFormat进行输出 可以输出到mysql es hbase....MapTask不是越多越好,如果1kb启动MapTask的时间比处理的时间长,因为还有提供网络传输本地maptask自己处理最快,当块大小适合时1.一个jobmap阶段并行度由客户端在提交job时的切片数决定(并行度,就是处理效率)2.每个split切片分配一个maptask并行实例处理(多个maptask一起处理切片)3.默认情况 切片大小等于块大小(默认为一块,不切开)4.切片时不考虑数据集整体,逐一对每个文件进行单独切片(不同文件,独立进行切片)

14.切片源码

 //length是txt文件里面的内容大小//不是所有文件支持切片,不能切片需要压缩
//如果切片出来有一点点空间,那么纳为一个整体,默认1.1倍  32m 32.1m也切片
//调整块大小mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue

15.企业常用的TextInputFormat CombineTextInputFormat

 TextInputFormat 是按一行处理
NLineInputFormat  按多行处理
CombineTextInputFormat 是处理多个超小文件处理(TextInputFormat 无论多小都切片,小文件效率低),规定全部小文件加起来小于多少才可以统一切片CombineTextInputFormat.setMaxInputSplitSize(job,41000000);///4m例如 先排序块大小,然后 我文件是5.1m 会判断 <4m为一片  , >4m && <4*2m切割2片文件2是1.7m ,为一片 和2.55m排序后的结果 合并为一片

16.CombineTextInputFormat实操 创建4个小文件

 //普通 TextInputFormat的日志 number of splits : 4//在之前wordcount案例的driver修改job.setInputFormatClass(CombineTextInputFormat.class);CombineTextInputFormat.setMaxInputSplitSize(job,41000000);///4m

17.shuffle map方法之后 reduce方法之前的处理过程
//数据分区合并压缩

18.partition分区 如果要求把不同省份的数据 分类到不同的文件中
在wordcount案例中写
job.setNumReduceTask(2);//文件内容会分到2个文件
//自定义Partitioner类,原来的是根据hashcode取and来决定哪个分区在哪个文件

  1. CumstomPartitioner extends Partition//重写方法getPartition(Text key,FlowBean value,int numPartitions){//控制分区的代码逻辑,由你决定String phone= text.toString();//取到前面三位String prePhone=phone.substring(0,3);int partition;if("136".equals(prePhone)){partition=0;}else  137 1 138 2 139 3 其他4  }2.在driver的job 自定义Partitionerjob.setPartitionerClass(CumstomPartitioner.class);3.设置reduceTaskjob.setNumReduceTask(5);//

19.分区总结 注意partition从0开始

   1.reduceTask=1 只会产生1个文件2.reduceTask>getPartition数量 产生多几个空文件3.reduceTask

20.mapreduce为什么要排序? 为了提供系统性能,相同的key如果每次需要去对比是否相同,需要消耗大量的数据,排序后放在一起就可以轻松比较

reducetask 统一对内存和磁盘所有数据进行一次归并排序
     1.部分(分区)排序(企业)  部分字段进行排序FlowBean 改为 implements 	WritableComparable实现compareTo(FlowBean o)方法if(this.sumFlow>o.sumFlow){return -1;//代表倒序}else if(this.sumFlowreturn 1;}else{return 0;}private FlowBean outK=new FlowBean();private Text outV=new Text();mapper类String line =value.toString();String[] split =line.split("\t");outV.set(split[0]);outK.setUpFlow(Long.parseLong(split[1]));outK.setDownFlow(Long.parseLong(split[2]));outK.setSumFlow();context.write(outK,outV);//reducer  Reducer   reduce方法 values是指得到重复的值protected void reduce(FlowBean key, Iterable values, Context context)for(Text value : values){context.write(value,key);}//driver job的key和value调换,使用上一个案例的输出数据//4 设置Map端输出KV类型(处理过程中的key和value)job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//5 设置程序最终输出的KV类型,最终输出的key和valuejob.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);
     2.全排序 输出结果只有一个文件,只有一个分区设置一个ReduceTask  3.辅助排序在多写一个reduce类,来排序(groupComparator)4.二次排序 compareTo 判断条件为两个(满足2个条件)

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
A.机器学习入门算法(三):基... 机器学习算法(三):K近邻(k-nearest neigh...
数字温湿度传感器DHT11模块... 模块实例https://blog.csdn.net/qq_38393591/article/deta...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
Redis 所有支持的数据结构... Redis 是一种开源的基于键值对存储的 NoSQL 数据库,支持多种数据结构。以下是...
win下pytorch安装—c... 安装目录一、cuda安装1.1、cuda版本选择1.2、下载安装二、cudnn安装三、pytorch...
MySQL基础-多表查询 文章目录MySQL基础-多表查询一、案例及引入1、基础概念2、笛卡尔积的理解二、多表查询的分类1、等...
keil调试专题篇 调试的前提是需要连接调试器比如STLINK。 然后点击菜单或者快捷图标均可进入调试模式。 如果前面...
MATLAB | 全网最详细网... 一篇超超超长,超超超全面网络图绘制教程,本篇基本能讲清楚所有绘制要点&#...
IHome主页 - 让你的浏览... 随着互联网的发展,人们越来越离不开浏览器了。每天上班、学习、娱乐,浏览器...
TCP 协议 一、TCP 协议概念 TCP即传输控制协议(Transmission Control ...
营业执照的经营范围有哪些 营业执照的经营范围有哪些 经营范围是指企业可以从事的生产经营与服务项目,是进行公司注册...
C++ 可变体(variant... 一、可变体(variant) 基础用法 Union的问题: 无法知道当前使用的类型是什...
血压计语音芯片,电子医疗设备声... 语音电子血压计是带有语音提示功能的电子血压计,测量前至测量结果全程语音播报࿰...
MySQL OCP888题解0... 文章目录1、原题1.1、英文原题1.2、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3...
【2023-Pytorch-检... (肆十二想说的一些话)Yolo这个系列我们已经更新了大概一年的时间,现在基本的流程也走走通了,包含数...
实战项目:保险行业用户分类 这里写目录标题1、项目介绍1.1 行业背景1.2 数据介绍2、代码实现导入数据探索数据处理列标签名异...
记录--我在前端干工地(thr... 这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前段时间接触了Th...
43 openEuler搭建A... 文章目录43 openEuler搭建Apache服务器-配置文件说明和管理模块43.1 配置文件说明...