1、腾讯广告的数据分析系统腾讯社交广告 丰富的广告资源 精准的定向能力数据分析 查询时聚合 预先将数据处理成易 预先聚合 实时+离线于查询的格式 灵活性高 灵活性不够 查询速度快 查询时计算量大预先聚合的计算系统LAMDA架构实时离线接入消息队列yarnTDWstormHdfsTimerspoutspout spout spout spout计算存储MapReduceHdfsBoltBoltBoltHBase实时性 + 可靠性统一计算框架 问题:任务越来越多,计算资源消耗越来越大 合并原始数据聚合的工作,减少重复IO和数据解析的开销 多个pig, hive, mapreduce - 一个mapre
2、duce生成多个聚合结果 一份代码,易于性能优化 时间均匀分布,提高集群利用率任务解耦 流式计算111222333444日志1日志2日志3理想数据及时到达时间11223344计算任务日志1日志2日志31234现实总是有数据晚到如何减少MTBF?12 3 4时间123456计算任务平均故障次数= 每个日志源的故障次数 + 计算系统故障次数任务解耦:只计算已经就绪的数据。某个日志源故障不会影响其他数据。避免故障时机群空闲,恢复后机群压力过大。向流式计算系统演进一代系统:lambda架构快Storm纯内存计算,实时,不准确MapReduce离线计算,准确,慢实时离线两套系统,代码实现和环境部署都是两
3、套准二代系统:流式计算Spark Streaming同时做到准确实时Spark worker常驻进程,避免进程启停开销Spark分层调度,减少中央调度器的压力快+准Mapreduce - spark streamingSpark Streaming的优势常驻进程,避免任务分发的延迟和消耗分层调度,降低调度器负载内存加速计算MapReduce任务里面,yarn既要管理工作流各任务的依赖,又要管理每个任务在不同时间的实例。Spark Streaming里面,每个任务在不同时间的实例是在sparkdriver里面管理的计算流程日志切分:数据按最细时间粒度落地到hdfs计算:分粒度聚合,输入输出都在h
4、dfs,相当于用spark调度更小的mapreduce作业将hdfs目录当做checkpoint,不依赖spark的状态输出到hbase 需要保证操作幂等 采用put而不是increase 每次输出的时候,累加过去N个周期的计算结果 与上次周期的输出数据计算diff,减少对hbase压力 不采用spark window,避免输出任务batch之间依赖,在集群抖动时快速恢复 利用hbase version,避免新数据被老数据覆盖调度优化 推测执行,避免struggle 会有很多的推测备份任务被杀掉调度优化 优化思路 利用 streaming周期任务的特点,动态评估每个 executor 的吞吐量
5、 能力强的节点分配大任务,能力弱的节点分配小任务 实现 a) RDD 及 NewHadoopRDD:获取每个 task 对应的 HDFS 文件的 大小并上报 b) 在每个 batch 结束时,分析 executor 处理任务的情况,动态更新到 driver 端的记录里 c) 按照 executor 的 吞吐量排序在任务列表里选择合适的任务Executor1(4MB/s)Task(32M)Task(32M)Executor1(8MB/s)Task1(128M)Task2(120M)1234选择最适合executor 3执行的任务Executor2(1MB/s)Task(128M)Task(128
6、M)Executor2(4MB/s)Task3(110M)Task4(96M)Executor3(2MB/s)Task5(80M)Task6(78M)Executor1(4MB/s)Task(128M)Task(32M) * 5 = 3Executor2(1MB/s)Task(32M)Task(128M)Executor4(1MB/s)Task7(75M)Task8(70M)调度优化 性能提升 a) 推测任务减少 86.57%b) 计算延迟降低 20.96% c) 少用 1/10 资源 预先聚合的计算系统 统一计算框架 Lamda架构-spark streaming查询时聚合的计算系统要解决的
7、问题 - SQL举例 从万亿条数据中选择符合条件的数据,计算聚合结果 查询用户年龄性别的分布SELECT age, gender, COUNT(*) FROM log WHERE advertiser_id=123 group by age, gender; 查询不同曝光次数的用户的占比、点击率、消耗等SELECT exposure_num, COUNT(*) as user_num,SUM(sum_click) / SUM(exposure_num) as click_rate, SUM(sum_cost) AS total_costFROM(SELECT user_id, COUNT(*)
8、 AS exposure_num,SUM(click_count) AS sum_click, SUM(cost) AS sum_costFROM logGROUP BY user_id) temp_tableGROUP BY exposure_num;流程描述分组Groupby聚合Sum|Count|原始数据集过滤Where流程描述:结果 从万亿条数据中选择符合条件的数据,计算聚合结果 为了提高查询速度,就需要预先将数据整理成适合查询的格式两种数据模型1. 平铺结构:宽表,其中每个cell可以是term list,可以有正排,倒排2. 嵌套结构:proto buffer,正排存储类似于dre
9、mel/parquet,没法倒排Message pageview optional UserProfile user;repeated Position pos repeated Impression imps optional Advertisement ad;repeated Click clicks optional BillInfo bill;嵌套结构的列存储格式 dragon 同列数据连续存储,压缩效率高 读取数据只需读取需要的列,节省磁盘IO 读取数据省去了反序列化整个PB的过程 定义repetition level和definition level 只存储叶子节点的值和rleve
10、l,dlevelDragon vs parquet文件大小(单位:MB)写入时间(单位:秒)80060040020007005004003002001000383289Dragonparquetrecordio254Dragonparquet266271211161138153请求日志曝光日志请求日志曝光日志数据读取耗时(单位:秒)864206.76.73.40.56.74.5DragonParquetRecordio2.10.11.6读一列 读三列 读十列数据特点Message pageview 空节点很多optional UserProfile user;Pageview:3000+叶子节
11、点Pos : 3000+叶子节点Imps : 2000+ 叶子节点Click : 1000+ leavesrepeated Position pos repeated Impression imps optional Advertisement ad;repeated Ad candidate_ads;Candidated_ads : 200+ 叶子节点repeated Click clicks optional BillInfo bill;请求日志曝光日志EmptyColumns29262868EmptyGroups855831Dragon写性能优化:非递归实现column io: col
12、name, max d, max r, childrenfileld writer: traverse message in dfsmessagemessageconsumercolumn writercolumn writer全空子节点的rlevel/dlevel是0,1,2的序列,编码后浪费大量空间不输出完全为空的叶子节点column writer:chunk writer24Dragon写性能优化:空节点缓存Group : ListPairProto 模板message1message2message4message3遇到空节点,先不用遍历子树,缓存到有值或文件结束再遍历到子节点25Dr
13、agon写性能优化:Discard early 最后一次刷空节点缓存非常的耗时 Proto Tempate中每一个节点增加一个字段,标记该节点及其所有孩子节点是不是完全为空(默认为true) 最后一次刷空节点缓存只刷写不为空的groupDragon写性能优化:lazy flush 每次刷空节点缓存时,只刷到孩子节点,而不是叶子节点Prototemplate优化前优化后Totalempty27索引文件设计索引数据结构:检索查询流程:全局信息列值字典倒排数据正排数据列值ID对应的文档ID列式存储的压缩数据每一列(Column)的值编码为列值(Term ) ID列表查询条 Where age=20
14、and age=20 and age30GROUP BY gender;命中列 找到age满足条件的列值ID值倒排拉 拉出列值对应的倒排拉链链集合运 根据逻辑对结果进行集合运算算列值字典String压缩 前缀压缩节省空间 例如词表:“aa”, “abc”, “abcd”, “abd”, “abe” 实现采用更高效的Vector前缀压缩 词表排序之后,利用两两相邻的两个词的前缀压缩。仅保存后一个词相对前一个词的后缀部分内容 支持按词ID快速定位struct Node uint32_t suffix_offset; / 后缀在suffix block中的偏移uint8_t common_len;
15、/ 与前一个词的公共前缀长度倒排数据存储 倒排 列值对应的文档ID列表 文档ID在文件内编码,减少值域范围 稀疏数据保存array,稠密数据保存bitmap 采用roaring bitmap进行自适配 实现高效的bitmap与array之间交并差的计算 基于RoaringBitmap源码做了存储和性能的优化 对稀疏数据计算not,会产生非常稠密的数据,新增补集类型来优化存储 两个array求并集,需要临时数组来保存结果,可以在栈上分配这种小内存 对bit置1,先检查后写更快 etc正排数据存储60%的索引空间是正排数据减少磁盘IO访问-列存储最大程度节省磁盘空间-编码压缩通过采样进行比较,自动
16、确定最优编码方案也可以由用户进行配置指定对于不同 定长类型编码:定长数组编码、列值个数索引编码、定长列表编码、变长列表编码、run length encording的数据特String类型编码:单string长度索引编码、多string长度索引编码、单string列表编码、多string列表编码征,适用简单字典编码:适用列值重复较多的string类型于不同的 Huffman字典编码:列值分布不均匀情况下对简单字典编码的优化编码算法二进制String编码:较特殊的二进制数据类型 存储格式 平铺结构:倒排,正排 嵌套结构:正排,列存储,优化写入速度 编码,压缩 计算PIVOT系统架构 利用SSD进
17、行检索,而不是内存 利用hadoop集群构建索引系统架构存储部分离线计算部分本地节点PullerCleaner统一索引构建LocalMetaServiceTaskScannerRetrievalTaskDispatherTdw Clientpull fileMasterMetaServiceAggregator中心节点pivot/dragon file路由表版本表配置表明细表HDFSHBase34分组(Group By)实现分组Groupby聚合Sum|Count|原始数据集过滤Where流程描述:结果 基于排序 代价高, O(n*log(n) 数据库Group by未命中索引时 基于索引 同
18、一组的数据在此索引(有序) 连续排列 数据库Group by有序命中索引时遍历正排数据实现分组 Group by age, gender, city 如果直接排序: 每次比较age_gender_city,而前缀基本一致,浪费计算量 Map太大,log(N)的插入操作也很耗时 按照列的顺序分层计算 先group by age,然后在每个age里面group by gender,以此类推 每次查找和插入都只操作一个很小的map基于倒排数据实现分组 基于正排数据,计算量跟命中的doc数量成正比 基于倒排的roaring bitmap,进行集合求交计算量有上限,在命中doc很多时会更快 仅适用于gr
19、oup分组数量比较小的情况,否则需要求交的次数太多倒排分层Group By集合求交分组性能对比 6000万条数据,按照性别、年龄两列分组查询65543210.30基于排序的算法Pivot算法耗时(秒)求和实现算法(SUM) Group By后对各个组的某列进行SUM求和 借鉴Group by算法,实现了两种SUM算法基于倒排集合求交基于列值字典遍历正排累加变乘法,操作结果集数方法和Group By类似量级优化适用结果集合总数很大时适用结果集合总数较少时数据版本一致(MVCC) 用户希望看到每个计算周期的完整数据,而不是部分数据 索引的一个周期(Batch)是否可用所有的分片全部加载所有分片的版
20、本一致 LocalServer上报成功加载的索引列表到hbase Master周期扫描HBASE表,计算出全局索引版本信息实时查询表的数据版本表join 大表与小表join: 小的维表分发到所有节点 Pivot支持大表与小表之间进行lookup join 大表与大表join: 多个表按相同的rowkey进行hash分布,locality group配置 Pivot支持同一个locality group内部的大表join索引更新设计 Spark streaming实时生成增量索引 增量索引内对docid的编码,与全量索引保持一致 增量索引和检索独立部署 避免IO干扰性能数据 Pivot VS Druid:Druid索引小,Pivot查询快数量级性能数据性能数据 查询时聚合的计算系统 预先处理好数据 正排,倒排 计算聚合优化