1、Hadoop数据收集与入库系统数据收集与入库系统 Flume与与Sqoop主要内容1.背景介绍背景介绍2.Hadoop数据收集系统数据收集系统3.传统数据库与传统数据库与Hadoop间数据同步间数据同步4.总结总结背景介绍 Hadoop提提供供了了一一个中个中央央化的存化的存储储系统系统:有利于进行集中式的数据分析有利于进行集中式的数据分析与与数据数据共共享享 Hadoop对存储对存储格格式没式没有有要求要求:用户访问日志;用户访问日志;产品信息产品信息 网页数据等网页数据等 如何将数据存如何将数据存入入Hadoop:数据分散在各个离散的设备上数据分散在各个离散的设备上 数据保存在传统的存储设
2、备和数据保存在传统的存储设备和系系统中统中常见的两种数据来源分散的数据源:分散的数据源:机器产生的数据;机器产生的数据;用户访问日用户访问日志志;用户购买日用户购买日志志;传统系统中的数据:传统系统中的数据:传统关系型数据传统关系型数据库库:MySQL、Oracle等等;磁盘阵磁盘阵列列;磁磁带带.Hadoop收集和入库基本要求分布式 数据源多样化数据源多样化 数据源分散数据源分散可靠性可靠性保证不丢数据保证不丢数据允许丢部分数据允许丢部分数据 可扩展数据源可能会不断增加数据源可能会不断增加通过并行提高性能常见的Hadoop收集与入库系统数据收集数据收集 Flume Kafka Scribe传
3、统数据库传统数据库与与Hadoop同步同步Sqoop主要内容1.背景介绍背景介绍2.Hadoop数据收集系统数据收集系统3.传统数据库与传统数据库与Hadoop间数据同步间数据同步4.总结总结Hadoop数据收集系统Flume Flume OG OG:“Original Generation”0.9.x或cdh3以及更早版本 由agent、collector、master等组件构成 Flume NG NG:“Next/New Generation”1.x或cdh4以及之后的版本 由Agent、Client等组件构成 为什么要推为什么要推出出NG版本版本 精简代码 架构简化Flume OG基本架
4、构Flume OG基本架构Agent 用于采集数据用于采集数据 数据流产生的地方数据流产生的地方 通常由通常由source和和sink两部分组成两部分组成 Source用于获取数据,可从文本文件用于获取数据,可从文本文件,syslog,HTTP等获等获 取数据;取数据;Sink将将Source获得的数据进一步传输给后面获得的数据进一步传输给后面的的Collector。Flume自带了很自带了很多多source和和sink实现实现 syslogTcp(5140)|agentSink(localhost,35853)tail(/etc/services)|agentSink(localhost,3
5、5853)Collector 汇总多个汇总多个Agent结果结果 将汇总结果导入后端存储系统将汇总结果导入后端存储系统,比比如如HDFS,HBase Flume自带了很自带了很多多collector实现实现collectorSource(35853)|consoleCollectorSource(35853)|collectorSink(file:/tmp/flume/collected,syslog);collectorSource(35853)|collectorSink(hdfs:/namenode/user/flume/,syslog);Agent与Collector对应关系Agent
6、与Collector对应关系 可手动指定,也可自动匹配 自动匹配的情况下,master会平衡collector之间的负载。问题:为什么引入Collector?对对Agent数据进行汇总,避免产生过多小文件;数据进行汇总,避免产生过多小文件;避免多个避免多个agent连接对连接对Hadoop造成过大压造成过大压力力 ;中间件,屏蔽中间件,屏蔽agent和和hadoop间的异构性。间的异构性。Master 管理协调 agent 和collector的配置信息;Flume集群的控制器;跟踪数据流的最后确认信息,并通知agent;通常需配置多个master以防止单点故障;借助zookeeper管理管理
7、多Master。容错机制三种可靠性级别 agentE2ESink(machine,port)agent收到确认消息才认为数据发送成功,否则重试.agentDFOSink(machine,port)当agent发现在collector操作失败的时候,agent写入到本地硬盘上,当collctor恢复后,再重新发送数据。agentBESink(machine,port)效率最好,agent不写入到本地任何数据,如果在collector 发现处理失败,直接删除消息。构建基于Flume的数据收集系统 Agent和Collector均可以动态配置 可通过命令行或Web界面配置 命令行配置在已经启动的ma
8、ster节点上,依次输入”flume shell”connectlocalhost”如执行 exec config a1 tailDir(“/data/logfile”)agentSink Web界面选中节点,填写source、sink等信息常用架构举例拓扑1agentA:tail(“/ngnix/logs”)|agentSink(collector,35853);agentB:tail(“/ngnix/logs”)|agentSink(collector,35853);agentC:tail(“/ngnix/logs”)|agentSink(collector,35853);agentD:t
9、ail(“/ngnix/logs”)|agentSink(collector,35853);agentE:tail(“/ngnix/logs”)|agentSink(collector,35853);agentF:tail(“/ngnix/logs”)|agentSink(collector,35853);collector:collectorSource(35853)|collectorSink(hdfs:/namenode/flume/,srcdata);常用架构举例拓扑2agentA:src|agentE2ESink(collectorA,35853);agentB:src|agentE
10、2ESink(collectorA,35853);agentC:src|agentE2ESink(collectorB,35853);agentD:src|agentE2ESink(collectorB,35853);agentE:src|agentE2ESink(collectorC,35853);agentF:src|agentE2ESink(collectorC,35853);collectorA:collectorSource(35853)|collectorSink(hdfs:/.,src);collectorB:collectorSource(35853)|collectorSin
11、k(hdfs:/.,src);collectorC:collectorSource(35853)|collectorSink(hdfs:/.,src);常用架构举例拓扑3agentA:src|agentE2EChain(collectorA:35853,collectorB:35853);agentB:src|agentE2EChain(collectorA:35853,collectorC:35853);agentC:src|agentE2EChain(collectorB:35853,collectorA:35853);agentD:src|agentE2EChain(collectorB
12、:35853,collectorC:35853);agentE:src|agentE2EChain(collectorC:35853,collectorA:35853);agentF:src|agentE2EChain(collectorC:35853,collectorB:35853);collectorA:collectorSource(35853)|collectorSink(hdfs:/.,src);collectorB:collectorSource(35853)|collectorSink(hdfs:/.,src);collectorC:collectorSource(35853)
13、|collectorSink(hdfs:/.,src);主要内容1.背景介绍背景介绍2.Hadoop数据收集系统数据收集系统3.传统数据库与传统数据库与Hadoop间数据同步间数据同步4.总结总结Sqoop是什么 Sqoop:SQL-to-Hadoop 连连接接 传统关系型数据传统关系型数据库库 和和 Hadoop 的桥梁的桥梁把关系型数据库的数据导入到 Hadoop 系统(如 HDFS HBase 和 Hive)中;把数据从 Hadoop 系统里抽取并导出到关系型数据库里。利用利用MapReduce加快数据传输速度加快数据传输速度 批处理方式进行数据传输批处理方式进行数据传输Sqoop优势
14、高效、可控地利用资源高效、可控地利用资源任务并行度,超时时间等任务并行度,超时时间等 数据类型映射与转换数据类型映射与转换可自动进行,用户也可自定义可自动进行,用户也可自定义 支持多种数据库支持多种数据库MySQLOraclePostgreSQLSqoop1架构Sqoop2架构Sqoop import 将数据从关系型数据库导将数据从关系型数据库导入入Hadoop中中步骤步骤1:Sqoop与数与数据据库库Server 通信,获取数据库表的元数据通信,获取数据库表的元数据 信息;信息;步骤步骤2:Sqoop启动启动一一个个Map-Only的的MR作业作业,利用元利用元数数据据信信息并行将数据写入息
15、并行将数据写入Hadoop。Sqoop import使用sqoop import-connect jdbc:mysql:/ sqoop-password sqoop-table cities-connnect:指定指定JDBC URL-username/password:mysql数数据库的据库的用用户户名名-table:要读:要读取取的数的数据据库库表表bin/hadoop fs-cat cities/part-m-*1,USA,Palo Alto 2,Czech Republic,Brno 3,USA,SunnyvaleSqoop import示例sqoop import-connect
16、jdbc:mysql:/ sqoop-password sqoop-table cities-target-dir/etl/input/citiessqoop import-connect jdbc:mysql:/ sqoop-password sqoop-table cities-where country=USASqoop import示例sqoop import-connect jdbc:mysql:/ sqoop-password sqoop-table cities-as-sequencefilesqoop import-connect jdbc:mysql:/ sqoop-pass
17、word sqoop-table cities-num-mappers 10Sqoop import导入多个表sqoop import-connect jdbc:mysql:/ sqoop-password sqoop-query SELECT normcities.id,countries.country,normcities.city FROM normcities JOIN countries USING(country_id)WHERE$CONDITIONS-split-by id-target-dir citiesSqoop import增量导入sqoop import-connec
18、t jdbc:mysql:/ sqoop-password sqoop-table visits-incremental append-check-column id-last-value 1Sqoop import增量导入(一)sqoop import-connect jdbc:mysql:/ sqoop-password sqoop-table visits-incremental append-check-column id-last-value 1 适用于数据每次被追加到数据库中,而已有数据不变的情况;适用于数据每次被追加到数据库中,而已有数据不变的情况;仅导入仅导入id这一列值大这一
19、列值大于于1的记录。的记录。Sqoop import增量导入(二)sqoop job-create visits-import-connect jdbc:mysql:/ sqoop-password sqoop-table visits-incremental append-check-column id-last-value 0运行运行sqoop作业作业:sqoop job-exec visits 每次成功运行后每次成功运行后,sqoop将最后一条记录的将最后一条记录的id值保存到值保存到metastore中,供下次使用。中,供下次使用。Sqoop import增量导入(三)sqoop im
20、port-connect jdbc:mysql:/ sqoop-password sqoop-table visits-incremental lastmodified-check-column last_update_date-last-value“2013-05-22 01:01:01”数据库中有一数据库中有一列列last_update_date,记录了上次修改时记录了上次修改时间间;Sqoop仅将某时刻后的数据导仅将某时刻后的数据导入入Hadoop。Sqoop Export 将数据从将数据从Hadoop导入关系型数据库导中导入关系型数据库导中步骤步骤1:Sqoop与数与数据据库库Serv
21、er 通信,获取数据库表的元数通信,获取数据库表的元数据据 信息;信息;步骤步骤2:并行导入数据:并行导入数据:将将Hadoop上上文文件划分件划分成若成若干个干个split;每个每个split由一个由一个Map Task进进行数据导入。行数据导入。Sqoop Export使用方法sqoop export-connect jdbc:mysql:/ sqoop-password sqoop-table cities-export-dir cities-connnect:指定指定JDBC URL-username/password:mysql数数据库的据库的用用户户名名-table:要导要导入入的
22、数的数据据库库表表export-dir:数:数据据在在HDFS上上存存放目放目录录Sqoop Export保证原子性sqoop export-connect jdbc:mysql:/ sqoop-password sqoop-table cities-staging-table staging_citiesSqoop Export更新已有数据sqoop export-connect jdbc:mysql:/ sqoop-password sqoop-table cities-update-key id sqoop export-connect jdbc:mysql:/ sqoop-passwo
23、rd sqoop-table cities-update-key id-update-mode allowinsertSqoop Export选择性插入sqoop export-connect jdbc:mysql:/ sqoop-password sqoop-table cities-columns country,citySqoop与其他系统结合 Sqoop可以与可以与Oozie、Hive、Hbase等系统结合;等系统结合;用户需要用户需要在在sqoop-env.sh中增加中增加HBASE_HOME、HIVE_HOME等环境变量。等环境变量。Sqoop与Hive结合sqoop import-connect jdbc:mysql:/ sqoop-password sqoop-table cities-hive-importSqoop与HBase结合sqoop import-connect jdbc:mysql:/ sqoop-password sqoop-table cities-hbase-table cities-column-family world