文章目录
- 零、本节学习目标
- 一、Spark的概述
- (一)Spark的组件
- 1、Spark Core
- 2、Spark SQL
- 3、Spark Streaming
- 4、MLlib
- 5、Graph X
- 6、独立调度器、Yarn、Mesos
- (二)Spark的发展史
- 二、Spark的特点
- (一)速度快
- (二)易用性
- (三)通用性
- (四)兼容性
- (五)代码简洁
- 1、采用MR实现词频统计
- 2、采用Spark实现词频统计
- 三、Spark的应用场景
- 四、Spark与Hadoop的对比
零、本节学习目标
- 了解什么是Spark计算框架
- 了解Spark计算框架的特点
- 了解Spark计算框架的应用场景
- 理解Spark框架与Hadoop框架的对比
一、Spark的概述
(一)Spark的组件
- Spark在
2013
年加入Apache孵化器项目,之后获得迅猛的发展,并于2014
年正式成为Apache软件基金会的顶级项目。Spark生态系统已经发展成为一个可应用于大规模数据处理的统一分析引擎,它是基于内存计算的大数据并行计算框架,适用于各种各样的分布式平台的系统。在Spark生态圈中包含了Spark SQL、Spark Streaming、GraphX、MLlib等组件。


1、Spark Core
- Spark核心组件,实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含对弹性分布式数据集的API定义。
2、Spark SQL
- 用来操作结构化数据的核心组件,通过Spark SQL可直接查询Hive、HBase等多种外部数据源中的数据。Spark SQL的重要特点是能够统一处理关系表和RDD。
3、Spark Streaming
- Spark提供的流式计算框架,支持高吞吐量、可容错处理的实时流式数据处理,其核心原理是将流数据分解成一系列短小的批处理作业。
4、MLlib
- Spark提供的关于机器学习功能的算法程序库,包括分类、回归、聚类、协同过滤算法等,还提供了模型评估、数据导入等额外的功能。
5、Graph X
- Spark提供的分布式图处理框架,拥有对图计算和图挖掘算法的API接口及丰富的功能和运算符,便于对分布式图处理的需求,能在海量数据上运行复杂的图算法。
6、独立调度器、Yarn、Mesos
- 集群管理器,负责Spark框架高效地在一个到数千个节点之间进行伸缩计算的资源管理。
(二)Spark的发展史
- 对于一个具有相当技术门槛与复杂度的平台,Spark从诞生到正式版本的成熟,经历的时间如此之短,让人感到惊诧。2009年,Spark诞生于伯克利大学AMPLab,最开初属于伯克利大学的研究性项目。它于2010年正式开源,并于2013年成为了Aparch基金项目,并于2014年成为Aparch基金的顶级项目,整个过程不到五年时间。

- Spark目前最新版本是2023年2月17日发布的Spark3.3.2

二、Spark的特点
- Spark计算框架在处理数据时,所有的中间数据都保存在内存中,从而减少磁盘读写操作,提高框架计算效率。同时Spark还兼容HDFS、Hive,可以很好地与Hadoop系统融合,从而弥补MapReduce高延迟的性能缺点。所以说,Spark是一个更加快速、高效的大数据计算平台。
- Spark官网上给出Spark的特点

(一)速度快
- 与MapReduce相比,Spark可以支持包括Map和Reduce在内的更多操作,这些操作相互连接形成一个有向无环图(Directed Acyclic Graph,简称DAG),各个操作的中间数据则会被保存在内存中。因此处理速度比MapReduce更加快。Spark通过使用先进的DAG调度器、查询优化器和物理执行引擎,从而能够高性能的实现批处理和流数据处理。


(二)易用性
- Spark支持使用Scala、Python、Java及R语言快速编写应用。同时Spark提供超过80个高级运算符,使得编写并行应用程序变得容易并且可以在Scala、Python或R的交互模式下使用Spark。

(三)通用性
- Spark可以与SQL、Streaming及复杂的分析良好结合。Spark还有一系列的高级工具,包括Spark SQL、MLlib(机器学习库)、GraphX(图计算)和Spark Streaming,并且支持在一个应用中同时使用这些组件。

(四)兼容性
- 用户可以使用Spark的独立集群模式运行Spark,也可以在EC2(亚马逊弹性计算云)、Hadoop YARN或者Apache Mesos上运行Spark。并且可以从HDFS、Cassandra、HBase、Hive、Tachyon和任何分布式文件系统读取数据。

(五)代码简洁
- 参看【经典案例【词频统计】十一种实现方式】

1、采用MR实现词频统计
package net.hw.wc;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 获取行内容String line = value.toString();// 按空格拆分得到单词数组String[] words = line.split(" ");// 遍历单词数组,生成输出键值对for (int i = 0; i < words.length; i++) {context.write(new Text(words[i]), new IntWritable(1));}}
}
package net.hw.wc;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {// 定义键出现次数int count = 0;// 遍历输入值迭代器for (IntWritable value : values) {count += value.get(); // 其实针对此案例,可用count++来处理}// 输出新的键值对,注意要将java的int类型转换成hadoop的IntWritable类型context.write(key, new IntWritable(count));}
}
package net.hw.wc;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.net.URI;public class WordCountDriver {public static void main(String[] args) throws Exception {// 创建配置对象Configuration conf = new Configuration();// 设置数据节点主机名属性conf.set("dfs.client.use.datanode.hostname", "true");// 获取作业实例Job job = Job.getInstance(conf);// 设置作业启动类job.setJarByClass(WordCountDriver.class);// 设置Mapper类job.setMapperClass(WordCountMapper.class);// 设置map任务输出键类型job.setMapOutputKeyClass(Text.class);// 设置map任务输出值类型job.setMapOutputValueClass(IntWritable.class);// 设置Reducer类job.setReducerClass(WordCountReducer.class);// 设置reduce任务输出键类型job.setOutputKeyClass(Text.class);// 设置reduce任务输出值类型job.setOutputValueClass(IntWritable.class);// 设置分区数量(reduce任务的数量,结果文件的数量)job.setNumReduceTasks(3);// 定义uri字符串String uri = "hdfs://master:9000";// 创建输入目录Path inputPath = new Path(uri + "/word/input");// 创建输出目录Path outputPath = new Path(uri + "/word/result");// 获取文件系统FileSystem fs = FileSystem.get(new URI(uri), conf);// 删除输出目录(第二个参数设置是否递归)fs.delete(outputPath, true);// 给作业添加输入目录(允许多个)FileInputFormat.addInputPath(job, inputPath);// 给作业设置输出目录(只能一个)FileOutputFormat.setOutputPath(job, outputPath);// 等待作业完成job.waitForCompletion(true);// 输出统计结果System.out.println("======统计结果======");FileStatus[] fileStatuses = fs.listStatus(outputPath);for (int i = 1; i < fileStatuses.length; i++) {// 输出结果文件路径System.out.println(fileStatuses[i].getPath());// 获取文件系统数据字节输入流FSDataInputStream in = fs.open(fileStatuses[i].getPath());// 将结果文件显示在控制台IOUtils.copyBytes(in, System.out, 4096, false);}}
}
- 运行程序WordCountDriver,查看结果

2、采用Spark实现词频统计
package net.hw.spark.wcimport org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("wordcount")val sc = new SparkContext(conf)val rdd = sc.textFile("test.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)rdd.foreach(println)rdd.saveAsTextFile("result")}
}
- 启动WordCount,查看结果

- 大家可以看出,完成同样的词频统计任务,Spark代码比MapReduce代码简洁很多。
三、Spark的应用场景
四、Spark与Hadoop的对比