1、 大数据导论第十二章CONTENTS目录PART 01 SPARK SQL简介PART 02 SPARK SQL执行流程PART 03 基础数据模型DATAFRAMEPART 04 使用Spark SQL的方式PART 05 SPARK SQL数据源PART 06 SPARK SQL CLI介绍PART 07在Pyspark中使用Spark SQLPART 08 在Java中连接Spark SQLPART 09 习题PART 01 Spark SQL简介Spark SQL是一个用来处理结构化数据的Spark组件,为Spark提供了查询结构化数据的能力。Spark SQL可被视为一个分布式的SQ
2、L查询引擎,可以实现对多种数据格式和数据源进行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。Spark SQL简介Spark SQL介绍:1.Spark SQL是为了处理结构化数据的一个Spark 模块。2.不同于Spark RDD的基本API,Spark SQL接口拥有更多关于数据结构本身与执行计划等更多信息。3.在Spark内部,Spark SQL可以利用这些信息更好地对操作进行优化。4.Spark SQL提供了三种访问接口:SQL,DataFrame API和Dataset API。5.当计算引擎被用来执行一个计算时,有不同的AP
3、I和语言种类可供选择。6.这种统一性意味着开发人员可以来回轻松切换各种最熟悉的API来完成同一个计算工作。Spark SQL简介Spark SQL具有如下特点p 数据兼容方面:能加载和查询来自各种来源的数据。p 性能优化方面:除了采取内存列存储、代码生成等优化技术外,还引进成本模型对查询进行动态评估、获取最佳物理计划等;p 组件扩展方面:无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。p 标准连接:Spark SQL包括具有行业标准JDBC和ODBC连接的服务器模式。Spark SQL简介Spark SQL具有如下特点p 集成:无缝地将SQL查询与Spark程序混合。Spa
4、rk SQL允许将结构化数据作为Spark中的分布式数据集(RDD)进行查询,在Python,Scala和Java中集成了API。这种紧密的集成使得SQL查询以及复杂的分析算法可以轻松地运行。p 可扩展性:对于交互式查询和长查询使用相同的引擎。Spark SQL利用RDD模型来支持查询容错,使其能够扩展到大型作业,不需担心为历史数据使用不同的引擎。PART 02 Spark SQL执行流程Spark SQL执行流程类似于关系型数据库,Spark SQL语句也是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)三部分组成,分别对
5、应SQL查询过程中的Result、Data Source、Operation,也就是说SQL语句按Result-Data Source-Operation的次序来描述的。Spark SQL执行流程1.解析(Parse)对读入的SQL语句进行解析,分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是 Projection、哪些是 Data Source 等,从而判断SQL语句是否规范;2.绑定(Bind)将SQL语句和数据库的数据字典(列、表、视图等)进行绑定,如果相关的Projection、Data Source等都存在,则这个SQL语句是可以执行的;
6、Spark SQL执行流程3.优化(Optimize)一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划;4.执行(Execute)按Operation-Data Source-Result 的次序来执行计划。在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。PART 03 基础数据模型DataFrameDataFrame是由“命名列”(类似关系表的字段定义)所组织起来的一个分布式数据集合,可以把它看成是一个关系型数据库的表。基础数据模型DataFrameDataFram
7、e是Spark SQL的核心,它将数据保存为行构成的集合,行对应列有相应的列名。DataFrame与RDD的主要区别在于,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL可以掌握更多的结构信息,从而能够对DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。基础数据模型DataFrameDataFrame与RDD的对比:PART 04 使用Spark SQL的方式使用Spark SQL的方式使用Spark SQL,首先利用sqlContext从
8、外部数据源加载数据为DataFrame;然后,利用DataFrame上丰富的API进行查询、转换;最后,将结果进行展现或存储为各种外部数据形式。Spark SQL 为Spark提供了查询结构化数据的能力,查询时既可以使用SQL也可以使用DataFrame API(RDD)。通过Thrift Server,Spark SQL支持多语言编程包括Java、Scala、Python及R。使用Spark SQL的方式使用Spark SQL的方式1.加载数据.从Hive中的users表构造DataFrame:users=sqlContext.table(users).加载S3上的JSON文件:logs=s
9、qlContext.load(s3n:/path/to/data.json,json).加载HDFS上的Parquet文件:clicks=sqlContext.load(hdfs:/path/to/data.parquet,parquet)使用Spark SQL的方式1.加载数据.通过JDBC访问MySQL:comments=sqlContext.jdbc(jdbc:mysql:/localhost/comments,user).将普通RDD转变为DataFrame:rdd=sparkContext.textFile(“article.txt”).flatMap(_.split().map(_
10、,1).reduceByKey(_+_)wordCounts=sqlContext.createDataFrame(rdd,word,count)使用Spark SQL的方式1.加载数据.将本地数据容器转变为DataFrame:data=(Alice,21),(Bob,24)people=sqlContext.createDataFrame(data,name,age).将Pandas DataFrame转变为Spark DataFrame(Python API特有功能):sparkDF=sqlContext.createDataFrame(pandasDF)使用Spark SQL的方式2.使
11、用DataFrame.创建一个只包含年轻用户的DataFrame:young=users.filter(users.age 21).也可以使用Pandas风格的语法:young=usersusers.age=13 AND age=19)teenagers.show()Parquet文件数据源JSON DataSets 数据源JSON DataSets 数据源Spark SQL可以自动根据JSON DataSet的格式把其上载为DataFrame。用路径指定JSON dataset;路径下可以是一个文件,也可以是多个文件:sc=spark.sparkContextpath=examples/sr
12、c/main/resources/people.jsonpeopleDF=spark.read.json(path)使用的结构可以调用printSchema()方法打印:peopleDF.printSchema()利用DataFrame创建一个临时表:使用Spark的sql方法进行SQL查询:peopleDF.createOrReplaceTempView(people)teenagerNamesDF=spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19)teenagerNamesDF.show()JSON DataSets
13、 数据源JSON dataset的DataFrame也可以是RDDString 格式,每个JSON对象为一个string:jsonStrings=name:Yin,address:city:Columbus,state:OhiootherPeopleRDD=sc.parallelize(jsonStrings)otherPeople=spark.read.json(otherPeopleRDD)otherPeople.show()JSON DataSets 数据源Hive表数据源Hive表数据源Spark SQL支持对Hive中的数据进行读写。首先创建一个支持Hive的SparkSession
14、对象,包括与Hive metastore的连接,支持Hive的序列化和反序列化操作,支持用户定义的Hive操作等。warehouse_location=abspath(spark-warehouse)spark=SparkSession.builder .appName(Python Spark SQL Hive integration example).config(spark.sql.warehouse.dir,warehouse_location).enableHiveSupport().getOrCreate()warehouse_location 指定数据库和表的缺省位置:Hive表
15、数据源spark.sql(CREATE TABLE IF NOT EXISTS src(key INT,value STRING)USING hive)spark.sql(LOAD DATA LOCAL INPATH examples/src/main/resources/kv1.txt INTO TABLE src)基于新创建的SparkSession创建表和上载数据到表中:spark.sql(SELECT*FROM src).show()spark.sql(SELECT COUNT(*)FROM src).show()使用HiveQL进行查询:Hive表数据源sqlDF=spark.sql
16、(SELECT key,value FROM src WHERE key val sqlContext=new org.apache.spark.sql.SQLContext(sc)sqlContext:org.apache.spark.sql.SQLContext=org.apache.spark.sql.SQLContext1943a343scala import sqlContext.implicits._import sqlContext.implicits._1.SQLContextSpark SQL CLI介绍下面的操作基于一个简单的数据文件people.json,文件的内容如下:
17、name:Michaelname:Andy,age:30name:Justin,age:192.数据文件下面语句从本地文件people.json读取数据创建DataFrame:val df=sqlContext.read.json(file:/data/people.json)df:org.apache.spark.sql.DataFrame=age:bigint,name:string3.创建DataFramesPyspark是针对Spark的Python API。Spark使用py4j来实现Python与Java的互操作,从而实现使用Python编写Spark程序。Spark也同样提供了P
18、yspark,一个Spark的Python Shell,可以以交互的方式使用Python编写Spark程序。PART 07 在Pyspark中使用Spark SQL在Pyspark中使用Spark SQL在终端上启动Python Spark Shell:./bin/pyspark使用JSON文件作为数据源,创建JSON文件/home/sparksql/courses.json,并输入下面的内容:1.实例描述name:Linux,type:basic,length:10name:TCPIP,type:project,length:15name:Python,type:project,length
19、:8name:GO,type:basic,length:2name:Ruby,type:basic,length:5在Pyspark中使用Spark SQL首先使用SQLContext模块,其作用是提供Spark SQL处理的功能。在Pyspark Shell中逐步输入下面步骤的内容:引入pyspark.sql中的SQLContext:from pyspark.sql import SQLContext2.创建SQLContext对象使用pyspark的SparkContext对象,创建SQLContext对象:sqlContext=SQLContext(sc)在Pyspark中使用Spark
20、 SQLDataFrame对象可以由RDD创建,也可以从Hive表或JSON文件等数据源创建。创建DataFrame,指明来源自JSON文件:df=sqlContext.read.json(/home/shiyanlou/courses.json)3.创建DataFrame对象在Pyspark中使用Spark SQL首先打印当前DataFrame里的内容和数据表的格式:df.select(name).show()#展示了所有的课程名df.select(name,length).show()#展示了所有的课程名及课程长度4.对DataFrame进行操作show()函数将打印出JSON文件中存储的
21、数据表;使用printSchema()函数打印数据表的格式。然后对DataFrame的数据进行各种操作:df.show()df.printSchema()在Pyspark中使用Spark SQLdf.filter(dftype=basic).select(name,type).show()#展示了课程类型为基础课(basic)的课程名和课程类型df.groupBy(type).count().show()#计算所有基础课和项目课的数量。首先需要将DataFrame注册为Table才可以在该表上执行SQL语句:df.registerTempTable(courses)coursesRDD=sql
22、Context.sql(SELECT name FROM courses WHERE length=5 and length=10)names=coursesRDD.rdd.map(lambda p:Name:+p.name)for name in names.collect():print name5.执行SQL语句在Pyspark中使用Spark SQLParquet是Spark SQL读取的默认数据文件格式,把从JSON中读取的DataFrame保存为Parquet格式,只保存课程名称和长度两项数据:df.select(name,length).write.save(/tmp/cours
23、es.parquet,format=parquet)6.保存 DataFrame为其他格式将创建hdfs:/master:9000/tmp/courses.parquet文件夹并存入课程名称和长度数据。Spark SQL实现了Thrift JDBC/ODBC server,所以Java程序可以通过JDBC远程连接Spark SQL发送SQL语句并执行。PART 08 在Java中连接Spark SQL在Java中连接Spark SQL首先将$HIVE_HOME/conf/hive-site.xml 拷贝到$SPARK_HOME/conf目录下。另外,因为Hive元数据信息存储在MySQL中,所
24、以Spark在访问这些元数据信息时需要MySQL连接驱动的支持。添加驱动的方式有三种:在$SPARK_HOME/conf目录下的spark-defaults.conf中添加:spark.jars/opt/lib2/mysql-connector-java-5.1.26-bin.jar;可以实现添加多个依赖jar比较方便:spark.driver.extraClassPath/opt/lib2/mysql-connector-java-5.1.26-bin.jar;1.设置配置 在运行时添加-jars/opt/lib2/mysql-connector-java-5.1.26-bin.jar做完上
25、面的准备工作后,Spark SQL和Hive就继承在一起了,Spark SQL可以读取Hive中的数据。1.设置配置2.启动Thrift在Spark根目录下执行:./sbin/start-thriftserver.sh开启thrift服务器,它可以接受所有spark-submit的参数,并且还可以接受-hiveconf 参数。不添加任何参数表示以local方式运行,默认的监听端口为10000 在Java中连接Spark SQL3.添加依赖打开Eclipse用JDBC连接Hive Server2。新建一个Maven项目,在pom.xml添加以下依赖:org.apache.hive hive-jd
26、bc 1.2.1 org.apache.hadoop hadoop-common 2.4.1 在Java中连接Spark SQL3.添加依赖 jdk.tools jdk.tools 1.6 system$JAVA_HOME/lib/tools.jar 在Java中连接Spark SQLJDBC连接Hive Server2的相关参数:l 驱动:org.apache.hive.jdbc.HiveDriverl url:jdbc:hive2:/192.168.1.131:10000/defaultl 用户名:hadoop(启动thriftserver的linux用户名)l 密码:“”(默认密码为空)
27、4.JDBC连接参数在Java中连接Spark SQLimport java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;public class Test1 public static void main(String args)throws SQLException String url=jdbc:hive2:/192.168.1.131:10000/default;try Class
28、.forName(org.apache.hive.jdbc.HiveDriver);catch(ClassNotFoundException e)e.printStackTrace();Connection conn=DriverManager.getConnection(url,hadoop,);Statement stmt=conn.createStatement();String sql=SELECT*FROM doc1 limit 10;System.out.println(Running+sql);ResultSet res=stmt.executeQuery(sql);while(
29、res.next()System.out.println(id:+res.getInt(1)+ttype:+res.getString(2)+tauthors:+res.getString(3)+ttitle:+res.getString(4)+tyear:+res.getInt(5);5.JDBC连接参数在Java中连接Spark SQLPART 09 作业 作业作业:1.什么是Spark SQL?其主要目的是什么?2.Spark SQL的执行流程有哪几个步骤?3.在Spark SQL中,什么是DataFrame?使用DataFrame的优势是什么?DataFrame与RDD的主要区别是什么?4.使用Spark SQL的方式有哪几种?使用Spark SQL的步骤是什么?5.常用的Spark SQL的数据源有哪些?6.Parquet文件格式是什么?它的主要特点是什么?7.为了使Java程序可以通过JDBC远程连接Spark SQL,需要做哪些准备工作?连接数据库的语句是什么?有哪些参数?作业作业:8.请按下述要求写出相应的Spark SQL语句:从一个本地JSON文件创建DataFrame;打印DataFrame元数据;按照列属性过滤DataFrame的数据;返回某列满足条件的数据;把DataFrame注册成数据库表。谢谢FOR YOUR LISTENING