基于 Spark 的数据分析实践

转载本文需注明出处:微信公众号EAWorld,违者必究。

引言:

Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷。Spark主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等组件。

本文主要分析了 Spark RDD 以及 RDD 作为开发的不足之处,介绍了 SparkSQL 对已有的常见数据系统的操作方法,以及重点介绍了普元在众多数据开发项目中总结的基于 SparkSQL Flow 开发框架。

目录:

一、Spark RDD

二、基于Spark RDD数据开发的不足

三、SparkSQL

四、SparkSQL Flow一、Spark RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、元素可并行计算的集合。

RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。

//Scala 在内存中使用列表创建

val lines = List(“A”, “B”, “C”, “D” …)
val rdd:RDD = sc.parallelize(lines);

//以文本文件创建

val rdd:RDD[String] = sc.textFile(“hdfs://path/filename”)

Spark RDD Partition 分区划分

新版本的 Hadoop 已经把 BlockSize 改为 128M,也就是说每个分区处理的数据量更大。

Spark 读取文件分区的核心原理

本质上,Spark 是利用了 Hadoop 的底层对数据进行分区的 API(InputFormat):

public abstract class InputFormat{
public abstract List getSplits(JobContextcontext
) throwsIOException,InterruptedException;

public abstract RecordReader createRecordReader(InputSplitsplit,
TaskAttemptContextcontext
)throwsIOException,InterruptedException;
}

Spark 任务提交后通过对输入进行 Split,在 RDD 构造阶段,只是判断是否可 Split(如果参数异常一定在此阶段报出异常),并且 Split 后每个 InputSplit 都是一个分区。只有在Action 算子提交后,才真正用 getSplits 返回的 InputSplit 通过 createRecordReader 获得每个 Partition 的连接。

然后通过 RecordReader 的 next() 遍历分区内的数据。

Spark RDD 转换函数和提交函数

Spark RDD 的众多函数可分为两大类Transformation 与 Action。Transformation 与 Action 的区别在于,对 RDD 进行 Transformation 并不会触发计算:Transformation 方法所产生的 RDD 对象只会记录住该 RDD 所依赖的 RDD 以及计算产生该 RDD 的数据的方式;只有在用户进行 Action 操作时,Spark 才会调度 RDD 计算任务,依次为各个 RDD 计算数据。这就是 Spark RDD 内函数的“懒加载”特性。二、基于Spark RDD数据开发的不足

由于MapReduce的shuffle过程需写磁盘,比较影响性能;而Spark利用RDD技术,计算在内存中流式进行。另外 MapReduce计算框架(API)比较局限, 使用需要关注的参数众多,而Spark则是中间结果自动推断,通过对数据集上链式执行函数具备一定的灵活性。

即使 SparkRDD 相对于 MapReduce 提高很大的便利性,但在使用上仍然有许多问题。体现在一下几个方面:

  1. RDD 函数众多,开发者不容易掌握,部分函数使用不当 shuffle时造成数据倾斜影响性能;
  2. RDD 关注点仍然是Spark太底层的 API,基于 Spark RDD的开发是基于特定语言(Scala,Python,Java)的函数开发,无法以数据的视界来开发数据;
  3. 对 RDD 转换算子函数内部分常量、变量、广播变量使用不当,会造成不可控的异常;
  4. 对多种数据开发,需各自开发RDD的转换,样板代码较多,无法有效重利用;
  5. 其它在运行期可能发生的异常。如:对象无法序列化等运行期才能发现的异常。

三、SparkSQL

Spark 从 1.3 版本开始原有 SchemaRDD 的基础上提供了类似Pandas DataFrame API。新的DataFrame API不仅可以大幅度降低普通开发者的学习门槛,同时还支持Scala、Java与Python三种语言。更重要的是,由于脱胎自SchemaRDD,DataFrame天然适用于分布式大数据场景。

一般的数据处理步骤:读入数据 -> 对数据进行处理 -> 分析结果  -> 写入结果

SparkSQL 结构化数据

  • 处理结构化数据(如 CSV,JSON,Parquet 等);
  • 把已经结构化数据抽象成 DataFrame (HiveTable);
  • 非结构化数据通过 RDD.map.filter 转换成结构化进行处理;
  • 按照列式数据库,只加载非结构化中可结构化的部分列(Hbase,MongoDB);

处理非结构化数据,不能简单的用 DataFrame 装载。而是要用 SparkRDD 把数据读入,在通过一系列的 Transformer Method 把非结构化的数据加工为结构化,或者过滤到不合法的数据。

SparkSQL DataFrame

SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。如果熟悉 Python Pandas 库中的 DataFrame 结构,则会对 SparkSQL DataFrame 概念非常熟悉。

TextFile DataFrame

import.org.apache.spark.sql._
//定义数据的列名称和类型
valdt=StructType(List(id:String,name:String,gender:String,age:Int))

//导入user_info.csv文件并指定分隔符
vallines = sc.textFile(“/path/user_info.csv”).map(_.split(“,”))

//将表结构和数据关联起来,把读入的数据user.csv映射成行,构成数据集
valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))

//通过SparkSession.createDataFrame()创建表,并且数据表表头
val df= spark.createDataFrame(rowRDD, dt)

读取规则数据文件作为DataFrame

SparkSession.Builder builder = SparkSession.builder()
Builder.setMaster(“local”).setAppName(“TestSparkSQLApp”)
SparkSession spark = builder.getOrCreate();
SQLContext sqlContext = spark.sqlContext();

# 读取 JSON 数据,path 可为文件或者目录
valdf=sqlContext.read().json(path);

# 读取 HadoopParquet 文件
vardf=sqlContext.read().parquet(path);

# 读取 HadoopORC 文件
vardf=sqlContext.read().orc(path);

JSON 文件为每行一个 JSON 对象的文件类型,行尾无须逗号。文件头也无须[]指定为数组;SparkSQL 读取是只是按照每行一条 JSON Record序列化;

Parquet文件

Configurationconfig = new Configuration();
ParquetFileReaderreader = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path(“hdfs:///path/file.parquet”),conf));
Mapschema = reader.getFileMetaData().getKeyValueMetaData();
String allFields= schema.get(“org.apache.spark.sql.parquet.row.metadata”);

allFiedls 的值就是各字段的名称和具体的类型,整体是一个json格式进行展示。

读取 Hive 表作为 DataFrame

Spark2 API 推荐通过 SparkSession.Builder 的 Builder 模式创建 SparkContext。 Builder.getOrCreate() 用于创建 SparkSession,SparkSession 是 SparkContext 的封装。

在Spark1.6中有两个核心组件SQLcontext和HiveContext。SQLContext 用于处理在 SparkSQL 中动态注册的表,HiveContext 用于处理 Hive 中的表。

从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可执行 Hive 中的表,也可执行内部注册的表;

在需要执行 Hive 表时,只需要在 SparkSession.Builder 中开启 Hive 支持即可(enableHiveSupport())。

SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();
SparkSession spark = builder.getOrCreate();
SQLContext sqlContext = spark.sqlContext();

// db 指 Hive 库中的数据库名,如果不写默认为 default

// tableName 指 hive 库的数据表名

sqlContext.sql(“select * from db.tableName”)

SparkSQL ThriftServer

//首先打开 Hive 的 Metastore服务

hive$bin/hive –-service metastore –p 8093

//把 Spark 的相关 jar 上传到hadoophdfs指定目录,用于指定sparkonyarn的依赖 jar

spark$hadoop fs –put jars/*.jar /lib/spark2

// 启动 spark thriftserver 服务

spark$ sbin/start-thriftserver.sh –master yarn-client –driver-memory 1G –conf
spark.yarn.jars=hdfs:///lib/spark2/*.jar

当hdfs 上传了spark 依赖 jar 时,通过spark.yarn.jars 可看到日志 spark 无须每个job 都上传jar,可节省启动时间

19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar
19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar

//通过 spark bin 下的 beeline 工具,可以连接到 spark ThriftServer(SparkOnHive)

bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop

-u 是指定 beeline 的执行驱动地址;

-n 是指定登陆到 spark Session 上的用户名称;

Beeline 还支持传入-e 可传入一行 SQL, 

-e <query>                      query that should be executed

也可通过 –f 指定一个 SQL File,内部可用逗号分隔的多个 SQL(存储过程)

-f <exec file>                  script file that should be executed

SparkSQL Beeline 的执行效果展示

SparkSQL ThriftServer

对于 SparkSQL ThriftServer 服务,每个登陆的用户都有创建的 SparkSession,并且执行的对个 SQL 会通过时间顺序列表展示。

SparkSQL ThriftServer 服务可用于其他支持的数据库工具创建查询,也用于第三方的 BI 工具,如 tableau。四、SparkSQL Flow

SparkSQL Flow 是以 SparkSQL 为基础,开发的统一的基于 XML 配置化的可执行一连串的 SQL 操作,这一连串的 SQL 操作定义为一个 Flow。下文开始 SparkSQL Flow 的介绍:

SparkSQL Flow 是基于 SparkSQL 开发的一种基于 XML 配置化的 SQL 数据流转处理模型。该模型简化了 SparkSQL 、Spark RDD的开发,并且降低开发了难度,适合了解数据业务但无法驾驭大数据以及 Spark 技术的开发者。

  • 一个由普元技术部提供的基于 SparkSQL 的开发模型;
  • 一个可二次定制开发的大数据开发框架,提供了灵活的可扩展 API;
  • 一个提供了 对文件,数据库,NoSQL 等统一的数据开发视界语义;
  • 基于 SQL 的开发语言和 XML 的模板配置,支持 Spark UDF 的扩展管理;
  • 支持基于 Spark Standlone,Yarn,Mesos 资源管理平台;
  • 支持开源、华为、星环等平台统一认证。

SparkSQL Flow 适合的场景:

  1. 批量 ETL;
  2. 非实时分析服务;

SparkSQL Flow XML 概览

  1. Properties 内定义一组变量,可用于宏替换;
  2. Methods 内可注册 udf 和 udaf 两种函数;
  3. Prepare 内可定义前置 SQL,用于执行 source 前的 sql 操作;
  4. Sources 内定义一个到多个数据表视图;
  5. Transformer 内可定义 0 到多个基于 SQL 的数据转换操作(支持 join);
  6. Targets 用于定义 1 到多个数据输出;
  7. After 可定义 0到多个任务日志;

如你所见,source 的 type 参数用于区分 source 的类型,source 支持的种类直接决定SparkSQL Flow 的数据源加载广度;并且,根据 type 不同,source 也需要配置不同的参数,如数据库还需要 driver,url,user和 password 参数。

Transformer 是基于 source 定的数据视图可执行的一组转换 SQL,该 SQL 符合 SparkSQL 的语法(SQL99)。Transform 的 SQL 的执行结果被作为中间表命名为 table_name 指定的值。

Targets 为定义输出,table_name 的值需在 source 或者 Transformer 中定义。

SparkSQL Flow 支持的Sourse

  • 支持从 Hive 获得数据;
  • 支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile
  • 支持RDBMS数据库:PostgreSQL, MySQL,Oracle
  • 支持 NOSQL 数据库:Hbase,MongoDB

SparkSQL Flow TextFile Source

textfile 为读取文本文件,把文本文件每行按照 delimiter 指定的字符进行切分,切分不够的列使用 null 填充。

  1. Tablename 为该文件映射的数据表名,可理解为数据的视图;
  2. Fields 为切分后的字段,使用逗号分隔,字段后可紧跟该字段的类型,使用冒号分隔;
  3. Delimiter 为每行的分隔符;
  4. Path 用于指定文件地址,可以是文件,也可是文件夹;
  5. Path 指定地址需要使用协议,如:file:// 、 hdfs://,否则跟 core-site.xml 配置密切相关;

SparkSQL Flow DB Source

RDBMS 是从数据库使用 JDBC读取 数据集。支持 type 为:db、mysql、oracle、postgres、mssql;

  1. tablename 为该数据表的抽象 table 名称(视图);
  2. url、driver、user,password 为数据库 JDBC 驱动信息,为必须字段;
  3. SparkSQL 会加载该表的全表数据,无法使用 where 条件。

SparkSQL Flow Transformer

SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids
FROM user_concat_testx
group by c_phone,c_type,c_num

Transform 支持 cached 属性,默认为 false;如果设置为 true,相当于把该结果缓存到内存中,缓存到内存中的数据在后续其它 Transform 中使用能提高计算效率。但是需使用大量内存,开发者需要评估该数据集能否放到内存中,防止出现 OutofMemory 的异常。

SparkSQL Flow Targets

SparkSQL Flow Targets 支持输出数据到一个或者多个目标。这些目标,基本覆盖了 Source 包含的外部系统。下面以 Hive 举例说明:

  1. table_name 为 source 或者 Transform 定义的表名称;
  2. target_table_name 为 hive 中的表结果,Hive 表可不存在也可存在,sparksql 会根据 DataFrame 的数据类型自动创建表;
  3. savemode 默认为 overwrite 覆盖写入,当写入目标已存在时删除源表再写入;支持 append 模式, 可增量写入。

Target 有一个特殊的 show 类型的 target。用于直接在控制台输出一个 DataFrame 的结果到控制台(print),该 target 用于开发和测试。

Rows 用于控制输出多少行数据。

SparkSQL Around

After 用于 Flow 在运行结束后执行的一个环绕,用于记录日志和写入状态。类似 Java 的 try {} finally{ round.execute() }

多个 round 一定会执行,round 异常不会导致任务失败。 

Image placeholder
Secondyxx
未设置
  68人点赞

没有讨论,发表一下自己的看法吧

推荐文章
Python数据分析实战 | 爬遍拉勾网,带你看看数据分析师还吃香吗?

微信公众号:「Python读财」如有问题或建议,请公众号留言伴随着移动互联网的飞速发展,越来越多用户被互联网连接在一起,用户所积累下来的数据越来越多,市场对数据方面人才的需求也越来越大,由此也带火了如

最适合入门的Python数据分析实战项目

微信公众号:「Python读财」如有问题或建议,请公众号留言伴随着移动互联网的飞速发展,越来越多用户被互联网连接在一起,用户所积累下来的数据越来越多,市场对数据方面人才的需求也越来越大,由此也带火了如

AB test | 数据分析师面试必知 !

前言关于ABtest的重要性无需多言,数据、产品等从业人员几乎必知,好的数据科学家我想一定是知道理解业务比模型更为重要,而ABtest就是伴随着业务增长的利器。如果你心中的ABtest几乎都没有用到中

数据分析利器之Pandas

Pandas是一个python的开源库,它基于Numpy,提供了多种高性能且易于使用的数据结构。Pandas最初被用作金融数据分析工具而开发,由于它有着强大的功能,目前广泛应用于数据分析、机器学习以及

如何避免人类偏见对数据分析产生影响

随着越来越多的企业开始采用机器学习技术以实现流程的自动化,人们也逐渐开始质疑计算机决策中的伦理含义。我们如何处理计算机系统中潜在的偏见?相对较少被提及但同样重要的,是人类本身的偏见,它与分析和商业决策

实现人工智能落地 你还差一个“数据分析流水线”的距离

在智慧生产场景,生产制造商可以在生产线上利用深度学习,尤其是图像识别,将产品的质量检测自动化。比如自动检测产品表面有没有划伤、有没有零部件的缺失、有没有标签的错位。研究表明,相比人工检测,智慧检测可以

调查:2019年数据分析市场面临的挑战有哪些?

分析和数据科学行业对人才的需求急剧增长,为该行业求职者提供了很多工作机会。无论是500强还是初创企业,每个团队都在使用分析来从数据中获得有价值的信息。然而,在人才、客户定位和收集数据等方面,这个行业仍

零基础的小白怎么学习数据分析?

微信公众号:「Python读财」如有问题或建议,请公众号留言作为一个从超级菜鸟阶段过来的人,也曾迷茫,也曾面对一大堆资料不知所措,从无到有踩过太多的坑,在这里分享一下我总结出来的数据分析学习路径,为了

Pandas数据分析——超好用的Groupby详解

微信公众号:「Python读财」如有问题或建议,请公众号留言在日常的数据分析中,经常需要将数据根据某个(多个)字段划分为不同的群体(group)进行分析,如电商领域将全国的总销售额根据省份进行划分,分

做银行家里的数据专家:ING探索大数据时代下的金融最佳实践

大数据文摘出品记者:高延6月18-21日,O’ReillyAIConference在北京召开。大会上,来自荷兰的金融公司ING的IT主管BasGeerdink带来了《关于数字驱动企业》的主题分享。进入

腾讯基于全时态数据库技术的数据闪回

作者简介:李海翔,网名“那海蓝蓝”,腾讯金融云数据库技术专家。中国人民大学信息学院工程硕士企业导师。著有《数据库事务处理的艺术:事务管理和并发访问控制》、《数据库查询优化器的艺术:原理解析与SQL性能

数据结构与算法分析——开篇以及复杂度分析

开篇 你也许已经发现了,工作了几年,原以为已经是一只老鸟。但看到刚参加工作的同事,你发现,原来自己一直在原地踏步。跟新人相比,你的唯一优势就是对业务更熟悉而已,别的就没有什么优势了。 怎样才能够让自己

数据结构与算法分析——开篇以及复杂度分析

开篇你也许已经发现了,工作了几年,原以为已经是一只老鸟。但看到刚参加工作的同事,你发现,原来自己一直在原地踏步。跟新人相比,你的唯一优势就是对业务更熟悉而已,别的就没有什么优势了。怎样才能够让自己更上

Spring-SpringAOP原理,手写Spring事务框架

一、Spring核心知识Spring是一个开源框架,Spring是于2003年兴起的一个轻量级的Java开发框架,由RodJohnson在其著作ExpertOne-On-OneJ2EEDevelopm

云端的生存之道,第 2 单元:将 Spring Boot 应用程序连接到云托管的数据库

前提条件 本系列教程的第1部分,因为本教程直接以第1部分中的课程内容和完成的操作为基础。 一个IBMCloud帐户 云原生数据持久性 IBMCloud提供了许多可持久存储数据的选项。在本教程中,我

Oracle 之利用BBED修改数据块SCN—-没有备份数据文件的数据恢复

测试环境 OS:redhat6.6 oracle:12.1.0.2  BBED(OracleBlockBrowerandEDitorTool),用来直接查看和修改数据文件数据的一个工具,是Orac

workerman源码-workerman启动流程

前面我们跟着代码看了一遍workerman的初始化流程.但对于如何监听端口.等操作还没有具体的实现.我们这次就来看一下.workerman是如何监听端口并运行的.runAll在前面我们初始化方法过后,

基于中台实践的DevOps平台有何不同?

为了响应快速变化的市场需求,业务要快速迭代。IT正在向云原生架构转型,解放架构自由度,最大化业务敏捷性,解耦合、敏捷开发、快速部署是当下企业的追求,可以消除研发与运维之间鸿沟的DevOps(研发运维)

springDataJpa 最佳实践

springDataJpa最佳实践 前言 SpringDataJpa框架的目标是显著减少实现各种持久性存储的数据访问层所需的样板代码量。SpringDataJpa存储库抽象中的中央接口是Reposit

基于内存和文件存储的 queue worker, 不用 Redis 适合单进程使用没有外部依赖

因为最近要做一个简单的并发任务系统,在github上面找了一圈并没有简单可依赖的库,所以自己写了一个。欢迎大家Review贡献代码。项目地址https://github.com/iflamed/mfw

基于 Laravel6.x 构建的博客应用,支持 Markdown,支持 RBAC 权限管理

基于Laravel6.x构建的博客应用,支持Markdown,支持图片拖拽上传,基于RBAC权限管理系统首页基于RBAC的权限管理后台,Dashboard页面统计了用户总数、文章发布总数、评论率、评论

平安科技数据库总经理汪洋:开源数据库在平安的应用实践

本文转自| 平安科技数据库产品团队2019年5月9日,平安科技数据库产品及存储产品部总经理在第十届数据库技术大会DTCC上分享了《开源数据库在平安的应用实践》,本文根据演讲内容整理,围绕以下几个方面进

海量数据时代,金融行业数据库实践难题如何解决?

随着数字经济时代的到来,大数据、人工智能技术得到了快速发展与应用,可以说,各行各业都已全情投入到这一波数字化转型浪潮中,把握新的发展机遇,获取数字红利。其中,金融行业可以说是走在转型之路最前沿的行业之

Spark in action on Kubernetes – 存储篇

作者|阿里云智能事业群技术专家莫源前言在上篇文章中,SparkinactiononKubernetes–SparkOperator的原理解析我们分析了SparkOperator内部的机制,今天我们会讨

JS 中一定要了解的数据类型和数据转换

数据类型 前言 Js中的类型只有6种,其中基本数据类型有5种分别为string,number,boolen,null,undefined,引用类型有一种,就是object,object是一个大的综合