RDD到底是什么?RDD的API_rdd bank-程序员宅基地

技术标签: Spark  分布式  大数据  

RDD到底是什么?RDD的API

大家好,我是W

今天给大家带来一篇关于Spark和RDD的博客,由于我也是初学者,所以没法带来那么深刻的东西,但是我希望用我的感性认知带给大家一点灵感,毕竟刚开始学习Spark的时候我对RDD概念、Spark流程是有很多困惑的,我觉得大家也可能存在这种问题。OK,接下来我将从以下几个角度来讲RDD和Spark:1、 Spark简介、对比hadoop、生态,2、 RDD概念

1、 Spark简介、对比hadoop、生态

1.1 Spark简介

Spark官网,可以看到官方对Spark的概述:

Spark Overview
Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.
Apache的Spark是一个用于大规模数据处理的统一分析引擎。它提供了一系列Java、Scaala、Python的高级API以及优化引擎,所以支持统一的操作。它同样的提供了一系列丰富的高阶工具,包括用于SQL查询、结构化数据处理的Spark SQL,用于机器学习的MLlib库,用于图处理的GraphX库,以及用于增量计算和流处理的Streaming库。

可以看到官网对Spark的定义就是一个大一统的框架,其中存在做结构化数据处理的组件Spark SQL,有用于机器学习的MLlib组件等等。在我实际学习的过程中可以感觉到组件间的关系就好像积木一样,需要的时候插上即可。

1.2 Spark对比Hadoop

Spark对比hadoop最大的特点就是快,在官网上第一张图就摆出来Spark比hadoop快了百倍,Spark的运算是基于内存的,而hadoop则需要通过HDFS将数据持久化到磁盘,所以显然是快的,但是快多少还是要看实际生产环境吧。

可是除了这点就没了吗?其实还有的,在《大数据基础:Spark工作原理及基础概念》中给大家罗列出来了:

特点 说明
spark 计算速度快 spark将每个任务构建成DAG进行计算,内部的计算过程通过弹性式分布式数据集RDD在内存在进行计算,相比于hadoop的mapreduce效率提升了100倍。
易于使用 spark 提供了大量的算子,开发只需调用相关api进行实现无法关注底层的实现原理。相较于以前离线任务采用mapreduce实现,实时任务采用storm实现,目前这些都可以通过spark来实现,降低来开发的成本。同时spark 通过spark SQL降低了用户的学习使用门槛,还提供了机器学习,图计算引擎等。
支持多种的资源管理模式 学习使用中可以采用local 模型进行任务的调试,在正式环境中又提供了standalone,yarn等模式,方便用户选择合适的资源管理模式进行适配。
社区支持 spark 生态圈丰富,迭代更新快,成为大数据领域必备的计算引擎。

1.3 Spark生态圈

其实刚刚介绍Spark的时候已经讲了一点了,大家请看图:

在这里插入图片描述

这是我找到比较合理的一张图,它把不同的工作内容分层,结构比较清晰。

说明
资源调度层 因为我们的任务是要提交到集群上运行的,不同的结点有不同的工作,所以需要对计算资源进行调度,而在这一层的资源调度方式就有很多:local模式、StandAlone模式、yarn模式、mesos模式等等。
计算层 计算层主要使用的是spark-core这个spark的核心库,其面向的是离线的计算,而R、Python这些就是所支持的语言。
存储层 存储层包括一系列的存储组件,最常见的比如有hadoop-HDFS、MySQL、HBASE、MongoDB、Redis等等,这些均是spark生态可以对接的存储组件,而右边的sparkSQL显然是支持这些数据源的,而下方的MLlib等等显然需要数据的支持。
数据流 在做实时计算的时候streaming可以对接flume、kafka等组件。

2、 RDD的概念(RDD到底是什么)、Spark的工作流程

这两个话题涉及了很多因素,我感觉这一篇文章还是不可能讲的很清楚,但是我会用我能做到的最朴素的语言给大家感性的讲一讲。同时,我建议大家多做几个小案例来加深认识。

2.1 RDD的概念

2.1.1 官方的定义

RDD是Spark中最重要的概念,其全称叫做Resilient Distributed Dataset (RDD),即弹性分布式数据集,是一种可容错的、可以被并行操作元素集合,是Spark中处理所有数据的一种基本抽象。

光是看这一句还是不够的,我在源码中找来注释给大家看一下,我建议大家仔细看下源码的注释

/**
 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 * partitioned collection of elements that can be operated on in parallel. This class contains the
 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`.
 * 一个弹性分布式数据集(RDD),是Spark里的基本抽象。
 * 它代表了可以被并行操作的不可变的分区元素集合。这个类包含了各种RDD都支持的基本操作,比如map、filter、persist等。
 * 
 * In addition,[[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as `groupByKey` and `join`;
 * 此外,org.apache.spark.rdd.PairRDDFunctions里还包含了只有键值对(key-value)类型RDD可用的操作,比如groupByKey、join等。
 * 
 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles; 
 * org.apache.spark.rdd.DoubleRDDFunctions 里包含了只有Double数据类型的RDD可用的操作。
 * 
 * and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.
 * org.apache.spark.rdd.SequenceFileRDDFunctions 里包含了可以被序列化成文件的RDD所包含的操作。
 * 
 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit.
 * 所有的操作都可以通过implicit来赋予。
 * 
 * Internally, each RDD is characterized by five main properties:
 * 在RDD内部,每一个RDD都由这五个主要特征来描述:
 * 
 *  - A list of partitions
 *  - 一系列分区
 *  
 *  - A function for computing each split
 *  - 对每一个分片做计算的函数
 *  
 *  - A list of dependencies on other RDDs
 *  - 一系列对其他RDD的依赖
 *  
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - 视情况而定,一个作用于键值对RDD的分区器
 *  
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
 *  - 视情况而定, 要计算每个分片的首选位置的列表
 *
 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
 * reading data from a new storage system) by overriding these functions. Please refer to the
 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
 * for more details on RDD internals.
 * Spark里的所有scheduling和execution都是基于这些方法(通过赋予RDD操作的方式)来实现其自身的计算方式,当然用户可以通过重写方法自定义RDD。
 */

最后注释中还贴心的给出了RDD的提出的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》

RDD的操作分为两大类,Transformation、Action。

Transformation是对已有的RDD进行转换(记录下一步操作)然后生成新的RDD,采用的是lazy策略,不会立即计算出结果。

Action是让已有的RDD对数据执行它的操作。

表格来自:大数据之Spark简介及RDD说明

Transformation
方法(算子) 说明
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
Action
方法(算子) 说明
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
2.1.2 我的感性认识

刚开始我对RDD也是很迷惑,它是在哪里体现了并行化计算的?但是当我真正正正做一个完整的案例时,我才对他有那么一点理解。

大家可以想一个完整的离线计算案例,比如:

我们需要计算美团上外卖的标签,那么我们会有类似以下数据集:

商品ID 用户ID 评价(String)
109283 yyyyxxx 味道还不错,就是有点贵
109283 swssim 虽然有点贵,但是分量足
109284 swssim 好难吃!

我们的目标是针对商品做标签,依据是商品出现最多的5个评价标签。

  • 1、 首先我们通过sparkContext读取数据
  • 2、 因为我们拿到的是评价String,所以做分词,这里假设分词调包成功,评价此时不再是一个长长的话,而是:评价1,评价2
  • 3、 接下来,提取出商品ID,评价
  • 4、 根据商品ID聚类,即groupByKey
  • 5、 对后面标签做操作…

请大家注意第3步,我们的程序放到集群中,而集群中显然不止一台worker,即显然不止一个executor,所以我们整个spark集群中每一个executor拿到的只是整个数据集的一部分(第一台拿0 - n-1行,第二台拿n - 2n-1行类似这样),但是我们的操作是写在一份程序里面,如何对不同机器中的数据集做统一的操作呢?

这显然就是RDD的作用,程序提交时会经过cluster manager分配资源、通过driver提交代码到executor,然后经过各种scheduler把程序进行分析,分成多个stage每一个stage代表了不需要跨机器执行的操作的集合(比如map、filter),而当出现要跨机器操作(比如collect、reduce)时,则会把数据集中到一台机器去操作。

说了那么多,RDD到底是什么呢?

解释1 : 因为每一台机器都知道哪几步本机器不需要依靠别人可以自己做(stage),所以可以先做,不需要看别人脸色,而遇到大家统一的操作时通过网络把数据合并由一台机器做。RDD就是定义这些操作的对象,RDD操作的对象就是分布在不同机器上的同一格式的数据集。

解释2 : 数据集分布在不同机器中,RDD定义了各个机器对这份数据的同一操作(先做什么再做什么)。就好像你安排你的小弟,去不同银行,插入银行卡,输入密码,取5000块钱,然后拿回来,最后给你汇总一样。

参考

总结

Spark毫无疑问是个非常优秀的框架,其中的组件就仿佛积木一般随时插拔。RDD作为Spark的最重要的概念,对Spark整个框架起着至关重要的作用。RDD的操作分为Transoformation和Action两种,其核心理念是定义一个抽象的数据操作,从而方便每个分区针对各自所管理的数据做统一的操作。今天这篇博客可能还有很多没法讲清楚的地方,接下来我会继续把Spark的其他概念、RDD涉及的相关概念更详细的给大家理清楚。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Alian_W/article/details/109770644

智能推荐

python difflib 编辑距离_Python Edit_Distance包_程序模块 - PyPI - Python中文网-程序员宅基地

文章浏览阅读413次。编辑距离用于计算序列之间编辑距离和对齐的python模块。我需要一种方法来计算python中序列之间的编辑距离。我没有能够找到任何合适的库来实现这一点,所以我自己编写了一个。在那里似乎有许多可用于计算编辑的编辑距离库两个字符串之间的距离,但不是两个序列之间的距离。这完全是用python编写的。这种实现可能是在python中优化为更快。如果在C中实现。库API是根据difflib.sequencem..._edit distance python lib

antd upload组件 手动上传-程序员宅基地

文章浏览阅读3.8k次,点赞2次,收藏15次。antd 的upload组件是点开对话框后,按下确实就会上传,而且如果多选文件也会反复调用后端接口来完成上传。因为项目需要,所以要实现手动上传,和一次性上传多个文件(调用一次后端接口)在实现这个功能时,我翻阅了很多博客,可能是因为版本原因,很多代码都无用,最后还是通过翻阅官方文档,才最终实现。..._antd upload

sqlite3 环境搭建_sqlite 部署-程序员宅基地

文章浏览阅读246次。注意 第一步在一个文件下打开终端然后 sqlite3 student.db(创建一个数据库),然后再create stu。callback 回调函数 (只有sql为查询语句的时候,才会执行此语句)6--删除一列(sqlite3 不支持) 用下面方法。功能 :打开sqlite 数据库。功能 :关闭sqlite 数据库。基本sql命令,不以 . 夹头,db:指向sqlite句柄的指针。将新表的名字改为原来表的名字。sqlite3的基本命令。功能:执行一条sql语句。以 . 开头的命令。_sqlite 部署

canal-adapter趟坑实践:canal-server的kafka SASLPLAIN方式鉴权适配_canal adapter kafka sasl-程序员宅基地

文章浏览阅读1.4w次。前言canal-server同步到kafka本身是支持Kerberos方式的鉴权的,但是鉴于项目现在使用的kafka集群使用的是SASL/PLAIN的鉴权方式,所以需要对canal-server同步kafka做一下适配改造。准备kafka SASL/PLAIN鉴权的搭建我参考的这篇文章kafka SASL/PLAIN鉴权的搭建了解如何使用java向以SASL/PLAIN方式鉴权的kafk..._canal adapter kafka sasl

Android adb shell相关命令_android的shell命令工具:设备规范管理-程序员宅基地

文章浏览阅读711次。adb(调试桥):debug工具。adb作用:借助adb工具,可以管理设备或手机模拟器状态。adb相关操作命令如下: 1. 显示系统中全部Android平台: android list targets2. 显示系统中全部AVD(模拟器): android list avd3. 创建AVD(模拟器): android create avd_android的shell命令工具:设备规范管理

Centos 7.9 在线安装 VirtualBox 7.0_centos安装virtualbox-程序员宅基地

文章浏览阅读769次,点赞10次,收藏7次。Centos 7.9 在线安装 VirtualBox 7.0_centos安装virtualbox

随便推点

Autodesk官方卸载工具软件安装教程-程序员宅基地

文章浏览阅读1.4w次,点赞9次,收藏10次。Autodesk卸载工具是一个专门用于Autodesk软件的卸载工具,可以自动识别电脑中的所有Autodesk软件,只需一键点击就能将Autodesk的软件完美卸载,并且不保留任何痕迹,这款卸载工具就可以帮助用户全面卸载Autodesk软件。_autodesk官方卸载工具

JDBC报错:Cannot find class: com.mysql.jdbc.Driver-程序员宅基地

文章浏览阅读4.9k次。1.配置书写错误:配置文件value值引号内不能有空格,属性文件配置信息末尾不能有空格(1)打开属性文件中com.mysql.jdbc.Driver后发现多了一个空格(如下我标出了),所以写属性文件时一定别多输入多余的空格了。 jdbc.driverClassName=com.mysql.jdbc.Driver(此处有空格)(2)配置文件中的value值的" "号中前面或..._cannot find class: com.mysql.jdbc.driver

软件常用术语_软件术语-程序员宅基地

文章浏览阅读1.8k次。软件常用术语,免得你面对各种设计模式头发晕_软件术语

Machine Learning 2 - 非线性回归算法分析_非线性回归分析方法-程序员宅基地

文章浏览阅读2.8k次。2017-08-02@erixhao 技术极客TechBoosterAI 机器学习第二篇 - 非线形回归分析。我们上文深入本质了解了机器学习基础线性回归算法后,本文继续研究非线性回归。非线性回归在机器学习中并非热点,并且较为小众,且其应用范畴也不如其他广。鉴于此,我们本文也将较为简单的介绍,并不会深入展开。非线性回归之后,我们会继续经典机器学习算法包括决策_非线性回归分析方法

hive基本函数_josn mincol-程序员宅基地

文章浏览阅读164次。一、关系运算:1.等值比较: =语法:A=B操作类型:所有基本类型描述:如果表达式A与表达式B相等,则为TRUE;否则为FALSE举例:hive>select 1 from lxw_dual where 1=1;12.不等值比较: <>语法: A <> B操作类型:所有基本类型描述:如果表达式A为NULL,或者表..._josn mincol

FI 与SD MM相关接口配置_sd 和fi 接口产生什么凭证?-程序员宅基地

文章浏览阅读767次。1 FI/SD 借口配置FI/SD通过tcode VKOA为billing设置过帐科目,用户可以创建自己的科目定义数据表。 科目是做到COA级的,通过KOFI/KOFK这两个condition type确定分别过帐到FI和CO凭证中。 由于PricingProc.是同Sale_sd 和fi 接口产生什么凭证?

推荐文章

热门文章

相关标签