1、 基于Kafka和Spark的实时数据质量监控平台邢国东 资深产品经理 微软改变中的微软我们服务的业务共享大数据团队AI&R我们要解决什么问题Kafka as data busDevicesServicesStreaming ProcessingBatchProcessingApplicationsScalable pub/sub for NRT data streamsInteractive analytics数据流快速增长的实时数据1.3 millionEVENTS PER SECOND INGRESS AT PEAK1 trillionEVENTS PER DAY PROCESSED A
2、T PEAK3.5 petabytesPROCESSED PER DAY100 thousandUNIQUE DEVICES AND MACHINES1,300PRODUCTION KAFKA BROKERS1 Sec99th PERCENTILE LATENCYKafka上下游的数据质量保证ProducerKafkaHLCDestinationDestinationProducerProducerProducerProducerProducerProducerProducerProducerKafkaHLCKafkaHLC100K QPS,300 Gb per hourData=MoneyL
3、ost Data=Lost M工作原理简介工作原理3 个审计粒度文件层级(file)批次层级(batch)记录层级(record level)Metadata“Action”:“Produced or Uploaded”,“ActionTimeStamp”:“action date and time(UTC)”,“Environment”:“environment(cluster)name”,“Machine”:“computer name”,“StreamID”:“type of data(sheeps,ducks,etc.)”,“SourceID”:“e.g.file name”,“Bat
4、chID”:“a hash of data in this batch”,“NumBytes”:“size in bytes”,“NumRecords”:“number of records in the batch”,“DestinationID”:“destination ID”工作原理 数据与审计流Audit systemKafka+HLCunder auditDestination 1ProducerFile 1:Produced:file 1:3 recordsRecord1Record2Record3Uploaded:file 1:3 recordsRecord4Record5Pr
5、oduced24 bytes3 recordsTimestamp“File 1”BatchID=abc123Produced40 bytes5 recordsTimestamp“File 1”BatchID=def456Produced:file 1:5 recordsUploaded24 bytes3 recordsTimestampBatchIDDestination 1ProducerData C数据时延的Kibana图表数据完整性Kibana图表 3 lines Green how many records producedBlue:how many reached destinati
6、on#1Green:how many reached destination#基于Power BI更丰富的图表4 阶段实时数据处理pipeline的监控发送Audit的代码Create a client objectPrepare audit objectLastlyclient.SendBondObject(audit);查询统计信息的API基于Audit数据的异常检测Audit数据实际是数据的meta data,可以用来做各种数据流量的异常检测和监控异常检测算法1Holt-Winters 算法用来训练模型和预测 强健性上的改进 使用Median Absolute Deviation(MAD
7、)得到更好的估值处理数据丢点和噪声(例如数据平滑)自动获取趋势和周期信息 允许用户人工标记和反馈来更好的处理趋势变化GLR(Generalized Likelihood Ratio)Floating Threshold GLR,基于新的输入数据动态调整模型 对于噪声比较大的数据做去除异常点异常检测算法2 基于Exchangeability Martingale时间序列的在线异常检测 分布是否发生变化?基于历史数据,定义“new value strangeness”在时刻t,我们收到一个新的值 Add it to the history.For each item i in the histor
8、ysi=strangeness function of(valuei,history)Let pt=(#i:si st+r*#i:si=st)/N,where r is uniform in(0,1)Uniform r makes sure p is 异常检测算法异常检测算法设计概述数据监控系统设计目标监控streaming数据的完整性和时延数据pipeline中,Multi-producer,multi-stage,multi-destination数据流In near real time提供诊断信息:哪个DC,机器,event/file发生问题超级稳定 99.9%在线Scale out审计
9、数据可信系统设计ProducerKafkaHLCDestinationFront End Web ServiceTransient storage(Kafka)“Produced”audits“Uploaded”audits Kafka ensures high-availability We dont want ingress/egress to stuck sending audit 系统设计ProducerKafkaHLCDestinationFront End Web ServiceTransient storage(Kafka)Audit data processing pipeli
10、ne(Spark)“Produced”audits“Uploaded”audits Pre-aggregates data to 1-minute chunks Keeps track of duplicates and increments,handles late arrival,out-of-order data and fault 系统设计ProducerKafkaHLCDestinationFront End Web ServiceTransient storage(Kafka)Audit data processing pipeline(Spark)Aggregated data
11、storage(ElasticSearch)“Produced”audits“Uploaded”audits Stores pre-aggregated data for reporting through DAS Allows for NRT charts using K系统设计ProducerKafkaHLCDestinationFront End Web ServiceTransient storage(Kafka)Audit data processing pipeline(Spark)Aggregated data storage(ElasticSearch)Data Analysi
12、s Web Service“Produced”audits“Uploaded”audits Final reporting endpoint for consumers Does destination have complete data for that time?Which files are missing?高可靠性Front End Web ServiceTransient storage(Kafka)Audit data processing pipeline(Spark)Aggregated data storage(ElasticSearch)Data Analysis Web
13、 S高可靠性Front End Web ServiceTransient storage(Kafka)Audit data processing pipeline(Spark)Aggregated data storage(ElasticSearch)Data Analysis Web ServiceTransient storage(Kafka)Audit data processing pipeline(Spark)Aggregated data storage(ElasticSearch)Active-Active disaster recovery Monitor for each k
14、ey componentDC1DC可信的质量监控ProducerKafkaHLCDestinationFront End Web ServiceTransient storage(Kafka)Audit data processing pipeline(Spark)Aggregated data storage(ElasticSearch)Data Analysis Web Service“Produced”audits“Uploaded”auditsElasticSearch“Produced”audits“Uploaded”audits Audit for 问题的诊断ProducerKaf
15、kaHLCDestinationFront End Web ServiceTransient storage(Kafka)Audit data processing pipeline(Spark)Aggregated data storage(ElasticSearch)“Produced”audits“Uploaded”audits When loss happens,rich diagnostics info are needed for ad hoc queriesData Analysis Web S问题的诊断ProducerKafkaHLCDestinationFront End W
16、eb ServiceTransient storage(Kafka)Audit data processing pipeline(Spark)Aggregated data storage(ElasticSearch)“Produced”audits“Uploaded”audits Store raw audits for diagnosticsData Analysis Web ServiceRaw audit storage(Cosmos)问题的诊断Joining audit data with traces for interactive diagnostics 目标回顾监控streaming数据的完整性和时延数据pipeline中,Multi-producer,multi-stage,multi-destination数据流In near real time提供诊断信息:哪个DC,机器,event/file发生问题超级稳定 99.9%在线Scale out审计数据可信版本Kafka under audit:0.8.1.1Audit pipeline:Kafka 0.8.1.1 Spark 1.6.1 ElasticSearch 1.7.0To be open sourced!