1、第5章基本统计5.1 相关性 计算两个数据系列之间的相关性是统计学中的常见操作。用 spark.ml 可灵活地计算多个系列两两之间的相关性。目前 Spark 支持的相关方法是 Pearson 方法和 Spearman 方法。Correlation 使用指定的方法计算输入数据集的相关矩阵。输出将是一个 DataFrame,它包含向量列的相关矩阵。【例 5-1】Correlation 的 Python API 代码。from _future_ import print_function#$example on$from pyspark.ml.linalg import Vectors from p
2、yspark.ml.stat import Correlation#$example off$5.1 相关性 from pyspark.sql import SparkSession if _name_=_main_:spark=SparkSession .builder .appName(CorrelationExample).getOrCreate()#$example on$data=(Vectors.sparse(4,(0,1.0),(3,-2.0),),(Vectors.dense(4.0,5.0,0.0,3.0),),(Vectors.dense(6.0,7.0,0.0,8.0),
3、),5.1 相关性(Vectors.sparse(4,(0,9.0),(3,1.0),)df=spark.createDataFrame(data,features)r1=Correlation.corr(df,features).head()print(Pearson correlation matrix:n+str(r10)r2=Correlation.corr(df,features,spearman).head()print(Spearman correlation matrix:n+str(r20)#$example off$spark.stop()5.1 相关性5.2 假设检验 假
4、设检验是统计学中一种强有力的工具,用于确定结果是否具有统计显著性,无论该结果是否偶然发生。spark.ml 目前支持 Pearson 的卡方(2)独立性测试。ChiSquareTest 针对标签的每个特征进行 Pearson 独立测试。对于每个特征,(特征,标签)对被转换为列联矩阵,对其计算卡方统计量。所有标签和特征值必须是可分类的。【例 5-2】Pearson 卡方独立性测试的 Python API 代码。5.2 假设检验 from _future_ import print_function from pyspark.sql import SparkSession#$example on$
5、from pyspark.ml.linalg import Vectors from pyspark.ml.stat import ChiSquareTest#$example off$if _name_=_main_:spark=SparkSession 5.2 假设检验 builder .appName(ChiSquareTestExample).getOrCreate()#$example on$data=(0.0,Vectors.dense(0.5,10.0),(0.0,Vectors.dense(1.5,20.0),(1.0,Vectors.dense(1.5,30.0),(0.0,
6、Vectors.dense(3.5,30.0),(0.0,Vectors.dense(3.5,40.0),(1.0,Vectors.dense(3.5,40.0)5.2 假设检验 df=spark.createDataFrame(data,label,features)r=ChiSquareTest.test(df,features,label).head()print(pValues:+str(r.pValues)print(degreesOfFreedom:+str(r.degreesOfFreedom)print(statistics:+str(r.statistics)#$exampl
7、e off$spark.stop()5.2 假设检验 将上述代码保存成 Pearsonx.py,然后用命令 spark-submit Pearsonx.py 运行,结果如图 5-2 所示。5.3 累积器 通过 Summarizer 为 Dataframe 提供向量列摘要统计。可用指标是列的 大值、小值、平均值、方差、非零数及总计数。【例 5-3】Summarizer 的 Python API 代码。from _future_ import print_function from pyspark.sql import SparkSession#$example on$from pyspark.m
8、l.stat import Summarizer from pyspark.sql import Row from pyspark.ml.linalg import Vectors 5.3 累积器#$example off$if _name_=_main_:spark=SparkSession .builder .appName(SummarizerExample).getOrCreate()sc=spark.sparkContext#$example on$5.3 累积器 df=sc.parallelize(Row(weight=1.0,features=Vectors.dense(1.0,
9、1.0,1.0),Row(weight=0.0,features=Vectors.dense(1.0,2.0,3.0).toDF()#create summarizer for multiple metrics mean and count summarizer=Summarizer.metrics(mean,count)#compute statistics for multiple metrics with weight df.select(summarizer.summary(df.features,df.weight).show(truncate=False)5.3 累积器#compu
10、te statistics for multiple metrics without weight df.select(summarizer.summary(df.features).show(truncate=False)#compute statistics for single metric mean with weight df.select(Summarizer.mean(df.features,df.weight).show(truncate=False)#compute statistics for single metric mean without weight df.sel
11、ect(Summarizer.mean(df.features).show(truncate=False)#$example off$spark.stop()5.3 累积器 将上述代码保存成 Summarizerx.py,然后用命令 spark-submit Summarizerx.py 运行,结果如图 5-3 所示。5.4 摘要统计 【例 5-4】通过函数 colStats 为 RDD Vector提供列摘要统计信息,colStats()返回一个 MultivariateStatisticalSummary 实例。import numpy as np from pyspark.mllib.s
12、tat import Statistics mat=sc.parallelize(np.array(1.0,10.0,100.0),np.array(2.0,20.0,200.0),np.array(3.0,30.0,300.0)#an RDD of Vectors#Compute column summary statistics.summary=Statistics.colStats(mat)5.4 摘要统计 print(summary.mean()#a dense vector containing the mean value for each column print(summary
13、.variance()#column-wise variance print(summary.numNonzeros()#number of nonzeros in each column 将上述代码保存成 RDDVectorx.py,然后用命令 spark-submit RDDVectorx.py 运行,结果如图 5-4 所示。5.4 摘要统计 5.4 摘要统计 与驻留在 spark.mllib 中的其他统计函数不同,可以对 RDD 的键值对执行分层抽样方法sampleByKey 和 sampleByKeyExact。对于分层抽样,可以将键视为标签,将值视为特定属性。例如,关键字可以是男人、
14、女人或文档 ID,并且相应的值可以是人的年龄列表或文档中的单词列表。sampleByKey 方法将翻转硬币以决定是否对样本进行采样,因此需要对数据进行一次传递,并提供预期的样本大小。sampleByKeyExact 比 sampleByKey 中使用的每层简单随机抽样需要更多的资源,但是会提供 99.99%置信度的精确抽样大小。5.4 摘要统计 【例 5-5】分层抽样。sampleByKey()允许用户近似采样。from pyspark import SparkContext if _name_=_main_:sc=SparkContext(appName=StratifiedSampling
15、Example)#SparkContext#$example on$#an RDD of any key value pairs data=sc.parallelize(1,a),(1,b),(2,c),(2,d),(2,e),(3,f)5.4 摘要统计#specify the exact fraction desired from each key as a dictionary fractions=1:0.1,2:0.6,3:0.3 approxSample=data.sampleByKey(False,fractions)#$example off$for each in approxS
16、ample.collect():print(each)sc.stop()5.4 摘要统计 将上述代码保存成 SparkContext.py,然后用命令 spark-submit SparkContext.py 运行,结果如图 5-5 所示。5.5 分层抽样 与驻留在 spark.mllib 中的其他统计函数不同,可以对 RDD 的键值对执行分层抽样方法sampleByKey 和 sampleByKeyExact。对于分层抽样,可以将键视为标签,将值视为特定属性。例如,关键字可以是男人、女人或文档 ID,并且相应的值可以是人的年龄列表或文档中的单词列表。sampleByKey 方法将翻转硬币以决
17、定是否对样本进行采样,因此需要对数据进行一次传递,并提供预期的样本大小。sampleByKeyExact 比 sampleByKey 中使用的每层简单随机抽样需要更多的资源,但是会提供 99.99%置信度的精确抽样大小。5.5 分层抽样 【例 5-5】分层抽样。sampleByKey()允许用户近似采样。from pyspark import SparkContext if _name_=_main_:sc=SparkContext(appName=StratifiedSamplingExample)#SparkContext#$example on$#an RDD of any key va
18、lue pairs data=sc.parallelize(1,a),(1,b),(2,c),(2,d),(2,e),(3,f)5.5 分层抽样#specify the exact fraction desired from each key as a dictionary fractions=1:0.1,2:0.6,3:0.3 approxSample=data.sampleByKey(False,fractions)#$example off$for each in approxSample.collect():print(each)sc.stop()5.5 分层抽样 将上述代码保存成 S
19、parkContext.py,然后用命令 spark-submit SparkContext.py 运行,结果如图 5-5 所示。5.6 流数据显著性检验 Spark 提供一些检验的在线实现,以支持 A/B 检验等用例。这些检验可以在 Spark Streaming DStream(Boolean,Double)上执行,其中每个元组的第一个元素表示对照组(false)或实验组(true),第二个元素是观测值。流显著性检验支持以下参数:peacePeriod:要忽略的流数据中,初始数据点的数量,用于减轻新异效应。windowSize:执行假设检验的先前批次数。设置为 0 将使用所有先前批次执行累
20、积处理。5.6 流数据显著性检验 【例 5-6】StreamingTest 提供的流数据假设检验。import org.apache.spark.SparkConf import org.apache.spark.mllib.stat.test.BinarySample,StreamingTest import org.apache.spark.streaming.Seconds,StreamingContext import org.apache.spark.util.Utils 5.6 流数据显著性检验 object StreamingTestExample def main(args:A
21、rrayString)if(args.length!=3)/scalastyle:off println System.err.println(Usage:StreamingTestExample +)5.6 流数据显著性检验 /scalastyle:on println System.exit(1)val dataDir=args(0)val batchDuration=Seconds(args(1).toLong)val numBatchesTimeout=args(2).toInt 5.6 流数据显著性检验 val conf=new SparkConf().setMaster(local
22、).setAppName(StreamingTestExample)val ssc=new StreamingContext(conf,batchDuration)ssc.checkpoint val dir=Utils.createTempDir()dir.toString 5.6 流数据显著性检验 /$example on$val data=ssc.textFileStream(dataDir).map(line=line.split(,)match case Array(label,value)=BinarySample(label.toBoolean,value.toDouble)va
23、l streamingTest=new StreamingTest().setPeacePeriod(0).setWindowSize(0).setTestMethod(welch)5.6 流数据显著性检验 val out=streamingTest.registerStream(data)out.print()/$example off$/Stop processing if test becomes significant or we time out var timeoutCounter=numBatchesTimeout out.foreachRDD rdd=timeoutCounte
24、r-=1 val anySignificant=rdd.map(_.pValue 0.05).fold(false)(_|_)if(timeoutCounter=0|anySignificant)rdd.context.stop()ssc.start()ssc.awaitTermination()5.6 流数据显著性检验 将上述代码保存成 StreamingTestExample.scala,然后使用命令 spark-submit-class org.apache.spark.examples.mllib.StreamingTestExample spark-examples_2.11-2.4
25、.0.jar 运行,结果如图 5-6 所示。5.6 流数据显著性检验 5.7 随机数据生成 随机数据生成对于随机化算法、原型设计和性能测试都非常有用。Spark 支持从给定分布(均匀分布、标准正态分布或泊松分布)抽取 id 值,并生成对应的随机 RDD。【例 5-7】生成随机双 RDD,其值遵循标准正态分布 N(0,1),然后将其映射到 N(1,4)。RandomRDDs 提供相应方法来生成随机双 RDD 或向量 RDD。5.7 随机数据生成 from pyspark.mllib.random import RandomRDDs from pyspark import SparkContext
26、,SparkConf sc=SparkContext(appName=PythonRandomnumberGeneration)#sc=.#SparkContext#Generate a random double RDD that contains 1 million i.i.d.values drawn from the#standard normal distribution N(0,1),evenly distributed in 10 partitions.5.7 随机数据生成 u=RandomRDDs.normalRDD(sc,1000000,10)#Apply a transfo
27、rm to get a random double RDD following N(1,4).v=u.map(lambda x:1.0+2.0*x)print(v)将上述代码保存成 RandomRDDs.py,然后用命令 spark-submit RandomRDDs.py 运行,结果如图 5-7 所示。5.7 随机数据生成 5.8 核密度估计 核密度估计是一种可用于可视化经验概率分布的技术,而无须对观察到样本的特定分布进行假设。它可以用来计算随机变量概率密度函数的估计值,在给定的一组点处进行评估。通过将特定点的经验分布 PDF,表示为以每个样本为中心的正态分布 PDF 的均值来实现该估计。【
28、例 5-8】用 KernelDensity 从样本的 RDD 计算核密度估计。from pyspark import SparkContext#$example on$from pyspark.mllib.stat import KernelDensity 5.8 核密度估计#$example off$if _name_=_main_:sc=SparkContext(appName=KernelDensityEstimationExample)#SparkContext#$example on$#an RDD of sample data data=sc.parallelize(1.0,1.0
29、,1.0,2.0,3.0,4.0,5.0,5.0,6.0,7.0,8.0,9.0,9.0)#Construct the density estimator with the sample data and a standard deviation for the Gaussian 5.8 核密度估计#kernels kd=KernelDensity()kd.setSample(data)kd.setBandwidth(3.0)#Find density estimates for the given values densities=kd.estimate(-1.0,2.0,5.0)#$example off$print(densities)sc.stop()将上述代码保存成 KernelDensity.py,然后用命令 spark-submit KernelDensity.py 运行,结果如图 5-8 所示。5.8 核密度估计 习 题 1.用 PySpark 编程生成随机双 RDD,其值遵循标准正态分布 N(0,1),并将其映射到 N(1,5)。2.通过 sampleByKey()进行近似分层采样编程。3.用 PySpark 编程实现 Pearson 的卡方(2)统计。