高级数据库课件:06-Hadoop.pptx

上传人(卖家):罗嗣辉 文档编号:2040843 上传时间:2022-01-19 格式:PPTX 页数:116 大小:6.13MB
下载 相关 举报
高级数据库课件:06-Hadoop.pptx_第1页
第1页 / 共116页
高级数据库课件:06-Hadoop.pptx_第2页
第2页 / 共116页
高级数据库课件:06-Hadoop.pptx_第3页
第3页 / 共116页
高级数据库课件:06-Hadoop.pptx_第4页
第4页 / 共116页
高级数据库课件:06-Hadoop.pptx_第5页
第5页 / 共116页
点击查看更多>>
资源描述

1、NoSQL的技术基础及其实践Hadoop Overview & ArchitectureHDFS Design Principles40,000 + machines in 20+ clustersLargest cluster is 4,000 machines170 Petabytes of storage1000+ users1,000,000+ jobs/month线性伸缩性,同一时间内节点越多,可以完成的工作越多数据规模的线性化 计算资源的线性化将计算移动到数据位置,降低大量数据传输开销可靠性和可用性:规模越大,失效越频繁简化计算模型,在高效的执行框架中隐藏复杂性顺序的数据处理 (避

2、免随机读) 一个可靠的、可伸缩的、高性能分布式计算系统The Hadoop Distributed File System (HDFS) 可靠存储层上层可以构建更复杂的层次MapReduce 分布式计算框架Hadoop通过增加货架服务器扩展计算能力、存储能力和I/O带宽分而治之(Divide-and-conquer)地使用货架硬件HDFS 分布式文件系统 NameNode 名字空间和块管理DataNodes 块副本容器 MapReduce 分布式计算框架JobTracker 作业调度,资源管理, 生命周期协调TaskTracker 任务执行 The Scale-out-Ability of D

3、istributed Storage 名字空间描述文件和目录的层次文件换分成块(典型的是128 MB) 名字空间(元数据) 与数据内容解耦对名字空间的快速操作不会受到数据流的拖累单个NameNode在RAM中保持整个的名字空间DataNode在本地磁盘中使用本地文件系统文件存储块副本出于冗余和可用性设计目的,块复制在3个DataNode中名字空间的持久性维持使用了提前写日志(write-ahead journal)和检查点(checkpoints)日志事务在回答客户之前持久化到edits文件Checkpoint由Checkpointer处理,周期性写入fsimage文件SecondaryNam

4、eNode DataNodes在启动时报告块位置,不存储在NameNode中 持久存储设备的类型 本地硬盘驱动器远程驱动器或者NFS filer 备份节点多级存储目录两个在本地驱动器,一个在远程服务器(如,NFS) DataNodes注册在 NameNode,周期性地报告持有的块副本列表报告包括每个副本的block id, generation stamp and lengthDataNodes每个3sec向 NameNode 发送心跳以确认存活如果10分钟内没有收到心跳消息,就假设节点丢失,节点持有的副本也就不可访问NameNode调度丢失副本的重新复制心跳响应表明正在管理副本复制块到其它节

5、点删除块副本重新注册或者关闭节点发送紧急的块报告支持传统的文件系统操作Create, read, write, delete files Create, delete directories Rename files and directories 权限文件的修改和访问时间配额: 1) namespace, 2) disk space 创建时,每个文件的复制因子块尺寸块副本位置暴露给外部应用客户从NameNode中请求副本位置从其中一个DataNode的副本中pull数据打开文件,返回DFSInputStream 当前块的DFSInputStream从NameNode中得到副本位置,客户缓存副

6、本位置副本位置根据与客户的邻近程度排序,一般选择第一个打开开一个socket流来选择 DataNode,从流中得到数据如果失败,添加到dead DataNodes;选择下一个DataNode重试2次MapReduce调度一个任务到一个DataNode来处理B块,则由这个DataNode提供B的副本的服务数据的局部访问客户从NameNode中请求候选DataNodes列表,组织一个写管道 在名字空间中创建文件调用addBlock() ,得到下一个块NameNode返回一个根据与客户邻近程度排序的预期副本位置列表客户创建一个流向DataNode的数据管道HDFS客户写入内部缓冲,并形成包队列Dat

7、aStreamer将包发送个DataNode1DataNOde1以同样的形式流给 DataNode2如果一个节点失效,使用剩余节点重建管道直到最少一个节点存在复制将由NameNode处理HDFS实现了单写多读的模型HDFS客户在写打开文件时维持一个租约对于一个文件,只有一个客户可以持有租约客户通过向NameNode周期性发送心跳来续借租约租约过期在软期限(10m)过期之前,客户独占地访问文件软期限之后,任何客户都可以收回租约硬期限 (1h)之后,NameNode强制关闭文件,撤回租约写者租约不禁止其他客户的文件读开始时,支持一次写语义文件关闭之后,写入的字节不能更改或删除现在,文件可以修改,但

8、是只能将内容添加至末尾使用写时拷贝技术实现追加期间的块修改HDFS在文件关闭之前,向读者提供一致的数据可见性hflush 操作提供了可见性保证一旦hflush,当前包立即压入管道Hflush等待,直到所有DataNode成功接收到这个包hsync 也保证了数据持久化到DataNode的本地盘集群拓扑逻辑根据网络距离对节点进行层次分组缺省块布局策略权衡最小写开销 最大读可靠性、可用性和聚集读带宽1.第一个副本位于写节点的本地2.第二和第三个副本位于不同机架上的不同节点3.余下副本根据以下约束随机选择节点:同一块的副本不在一个节点上一个机架不会持有2个以上的副本HDFS提供了可配置的块布局策略接口

9、Namespace ID 集群组件(NameNode和DataNode)共有一个唯一cluster id在格式化时,赋予未见系统Namespace ID防止其它集群的DataNode加入这个集群Storage ID DataNode持有集群内唯一的Storage ID 即使DataNode更换了IP地址和端口,也可识别在第一次注册到NameNode时分配给DataNodeSoftware Version (build version) NameNode和DataNode的不同软件版本不相兼容从Hadoop-2开始,可以实现向前兼容通过块校验和来保证数据完整性NameNode startup R

10、ead image, replay journal, write new image and empty journal Enter SafeMode DataNode startup Handshake: check Namespace ID and Software Version Registration: NameNode records Storage ID and address of DN Send initial block report SafeMode read-only mode for NameNode Disallows modifications of the na

11、mespace Disallows block replication or deletion Prevents unnecessary block replications until the majority of blocks are reported Minimally replicated blocks SafeMode threshold, extension Manual SafeMode during unforeseen circumstances 保证每个块总是有期望数量的副本遵从块布局策略 接收到块报告时更新副本过量复制块:选择副本删除在不降低块可用性的前提下平衡节点间的

12、存储利用率不减少持有副本的机架数量删除最长心跳或者最少可用空间DataNode上的副本过低复制块:放入复制优先队列副本越少,越位于队列前部在遵从块布局策略前提下,最小化副本创建成本错误复制块:不遵循块布局策略缺失块:因为不知道怎么办,所以什么都不干坏块:尝试复制一个好副本,保留坏副本快照避免软件升级时的数据损坏或者丢失允许软件升级变坏时的回滚布局版本识别持久化存储在NameNode和DataNode存储目录上的数据表示格式NameNode镜像快照启动hadoop namenode upgrade 基于持久化布局版本,读取检查点镜像和日志新软件可以一直读老布局将当前存储目录重命名到以前的版本将镜

13、像保存到新的当前位置DataNode升级根据NameNode指令进行升级创建一个新的存储目录,将现有的块文件硬链接到这个新目录Fsck 检验缺失块、每个文件的块复制块布局策略报告工具退役从集群中安全剔除DataNode确保正在删除节点上的副本复制到其它节点均衡器当新节点加入集群时,重新均衡已使用盘空间的均衡DataNode间已使用空间的平坦分布块扫描器块检验和提供数据完整性保证读者检查检验和,并且向NameNode报告错误其它块由扫描器周期性检查Y! cluster 2010 70 million files, 80 million blocks 15 PB capacity 4000+ no

14、des. 24,000 clients 50 GB heap for NN Data warehouse Hadoop cluster at Facebook 2010 55 million files, 80 million blocks. Estimate 200 million objects (files + blocks) 2000 nodes. 21 PB capacity, 30,000 clients 108 GB heap for NN should allow for 400 million objects Analytics Cluster at eBay (Ares)

15、77 million files, 85 million blocks 1000 nodes: 24 TB of local disk storage, 72 GB of RAM, and a 12-core CPU Cluster capacity 19 PB raw disk space Runs upto 38,000 MapReduce tasks simultaneously Hadoop使用SSH,保证节点间通信的安全性在Mater节点生成SSH秘钥串将公钥拷贝到Slave节点,实现授权通信ssh-keygen -t rsassh targethadoop-usermaster$

16、cd $HADOOP_HOMEhadoop-usermaster$ ls -l conf/-rw-rw-r- 1 hadoop-user hadoop 2065 Dec 1 10:07 capacity-scheduler.xml-rw-rw-r- 1 hadoop-user hadoop 535 Dec 1 10:07 configuration.xsl-rw-rw-r- 1 hadoop-user hadoop 49456 Dec 1 10:07 hadoop-default.xml-rwxrwxr-x 1 hadoop-user hadoop 2314 Jan 8 17:01 hadoo

17、p-env.sh-rw-rw-r- 1 hadoop-user hadoop 2234 Jan 2 15:29 hadoop-site.xml-rw-rw-r- 1 hadoop-user hadoop 2815 Dec 1 10:07 log4j.properties-rw-rw-r- 1 hadoop-user hadoop 28 Jan 2 15:29 masters-rw-rw-r- 1 hadoop-user hadoop 84 Jan 2 15:29 slaves-rw-rw-r- 1 hadoop-user hadoop 401 Dec 1 10:07 sslinfo.xml.e

18、xampleexport JAVA_HOME=/usr/share/jdkLocal (standalone) modePseudo-distributed modeFully distributed modeHDFS管理大型的数据集合HDFS提供数据输入和输出的接口hadoop fs -cmd hadoop fs lshadoop fs mkdir /user/chuckhadoop fs -put example.txt .hadoop fs -put example.txt /user/chuckhadoop fs -get example.txt .hadoop fs -cat exa

19、mple.txthadoop fs rm example.txthadoop fs help lspublic static void main(String args) throws IOException Configuration conf = new Configuration();FileSystem hdfs = FileSystem.get(conf);FileSystem local = FileSystem.getLocal(conf);Path inputDir = new Path(args0); Path hdfsFile = new Path(args1);try F

20、ileStatus inputFiles = local.listStatus(inputDir); FSDataOutputStream out = hdfs.create(hdfsFile); for (int i=0; i 0) out.write(buffer, 0, bytesRead);in.close();out.close(); catch (IOException e) e.printStackTrace();import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputS

21、tream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;InputFormatMap functionPartitionerSorting & MergingCombinerShufflingMergingReduce functionOutputFormat 1:manyKey必须是WritableComparableVal

22、ue必须是Writable或者WriteableComparableBooleanWritableByteWritableDoubleWritableFloatWritableIntWritableLongWritableTextNullWritablevoid configure( JobConf job)void close ()MapperReducervoid map(K1 key, V1 value, OutputCollector output, Reporter reporter ) throws IOExceptionClassDescriptionIdentityMapper

23、 maps inputs directly to outputsInverseMapper reverses the key/value pairRegexMapper generates a(match, 1) pair for every regular expression matchTokenCountMapper generates a(token, 1) pair when the input value is tokenizedvoid reduce(K2 key, Iterator values, OutputCollector output, Reporter reporte

24、r ) throws IOExceptionClassDescriptionIdentityReducer maps inputs directly to outputsLongSumReducer determines the sum of all values corresponding to the given keypublic class WordCount2 public static void main(String args) JobClient client = new JobClient();JobConf conf = new JobConf(WordCount2.cla

25、ss);FileInputFormat.addInputPath(conf, new Path(args0);FileOutputFormat.setOutputPath(conf, new Path(args1);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(LongWritable.class);conf.setMapperClass(TokenCountMapper.class); conf.setCombinerClass(LongSumReducer.class);conf.setReducerClass(Lo

26、ngSumReducer.class); client.setConf(conf);try JobClient.runJob(conf); catch (Exception e) e.printStackTrace();InputFormat DescriptionTextInputFormat 文本文件的每一行是一条记录。Key是每一行的字节偏移,value是文本行的内容。key: LongWritable,value: TextKeyValueTextIn-putFormat 文本文件的每一行是一条记录。文本用分隔符分为两部分,第一个分隔符的前部为key,剩下的部分为value。key.v

27、alue.separator.in.input.line属性设定分隔符,缺省为 tab (t)字符。key: Text,value: TextSequenceFileIn-putFormat 顺序文件的输入格式。Key和value 都是自定义的。顺序文件是Hadoop特定的压缩二级制文件格式。特别适合于MapReduce 作业之间传递数据。key: K (user defined),value: V (user defined)NLineInput-Format 与TextInputFormat相同,但是保证每次抽取N行。mapred.line.input.format. Linesperma

28、p属性设定N值,缺省为1。key: LongWritable,value: TextOutputFormat DescriptionTextOutputFormat 每行写一条记录,Keys和 values 使用分隔符分开,分隔符在mapred.textoutputformat.separator中定义,缺省为tab.SequenceFileOutputFormat 将key/value对写入Hadoop顺序文件中,与SequenceFileInputFormat一起使用.NullOutputFormat 什么都不输出http:/www.nber.org/patents/The citatio

29、n data set cite75_99.txtThe patent description data set apat63_99.txtpublic static class MapClass extends MapReduceBaseimplements Mapper public void map(Text key, Text value,OutputCollector output,Reporter reporter) throws IOException output.collect(value, key);public static class Reduce extends Map

30、ReduceBaseimplements Reducer public void reduce(Text key, Iterator values,OutputCollector output,Reporter reporter ) throws IOException String csv = ;while (values.hasNext() if (csv.length() 0) csv += ,;csv += values.next().toString();output.collect(key, new Text(csv);public class MyJob extends Conf

31、igured implements Tool MapClassReducerpublic int run(String args) throws Exception Configuration conf = getConf();JobConf job = new JobConf(conf, MyJob.class);Path in = new Path(args0);Path out = new Path(args1);FileInputFormat.setInputPaths(job, in);FileOutputFormat.setOutputPath(job, out);job.setJ

32、obName(MyJob);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);job.setInputFormat(KeyValueTextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.set(key.value.separator.in.input.line, ,);JobClie

33、nt.runJob(job);return 0;public static void main(String args) throws Exception int res = ToolRunner.run(new Configuration(), new MyJob(), args);System.exit(res);cat input_file | mapper | sort | reducer output_filebin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar-input input/cite75_99.txt -

34、output output -mapper cut -f 2 -d , -reducer uniq#!/usr/bin/env pythonimport sys, randomfor line in sys.stdin:if (random.randint(1,100) = int(sys.argv1):print line.strip()bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar-input input/cite75_99.txt-output output-mapper RandomSample.py 10-fi

35、le RandomSample.py-D mapred.reduce.tasks=1?phpwhile (!feof(STDIN) $line = fgets(STDIN);if (mt_rand(1,100) = $argv1) echo $line;bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar -input input/cite75_99.txt-output output-mapper php RandomSample.php 10-file RandomSample.php-D mapred.reduce.ta

36、sks=1public static class MapClass extends MapReduceBaseimplements Mapper public void map(LongWritable key, Text value,OutputCollector output,Reporter reporter) throws IOException String fields = value.toString().split(, -20);String country = fields4;String numClaims = fields8;if (numClaims.length()

37、0 & !numClaims.startsWith() output.collect(new Text(country),new Text(numClaims + ,1); public static class Reduce extends MapReduceBaseimplements Reducer public void reduce(Text key, Iterator values, OutputCollector output,Reporter reporter) throws IOException double sum = 0;int count = 0;while (val

38、ues.hasNext() String fields = values.next().toString().split(,);sum += Double.parseDouble(fields0);count += Integer.parseInt(fields1);output.collect(key, new DoubleWritable(sum/count);public static class Combine extends MapReduceBaseimplements Reducer public void reduce(Text key, Iterator values,Out

39、putCollector output,Reporter reporter) throws IOException double sum = 0;int count = 0;while (values.hasNext() String fields = values.next().toString().split(,);sum += Double.parseDouble(fields0);count += Integer.parseInt(fields1);output.collect(key, new Text(sum + , + count);mapreduce-1 | mapreduce

40、-2 | mapreduce-3 | .MAP | REDUCE+MAP+ | REDUCE | MAP*Configuration conf = getConf();JobConf job = new JobConf(conf);job.setJobName(ChainJob);job.setInputFormat(TextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);FileInputFormat.setInputPaths(job, in);FileOutputFormat.setOutputPath(job

41、, out);JobConf map1Conf = new JobConf(false);ChainMapper.addMapper(job,Map1.class, LongWritable.class,Text.class,Text.class,Text.class,true,map1Conf);JobConf map2Conf = new JobConf(false);ChainMapper.addMapper(job,Map2.class,Text.class,Text.class,LongWritable.class,Text.class,true,map2Conf);JobConf

42、reduceConf = new JobConf(false);ChainReducer.setReducer(job,Reduce.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduceConf);JobConf map3Conf = new JobConf(false);ChainReducer.addMapper(job,Map3.class,Text.class,Text.class,LongWritable.class,Text.class,true,map3Conf);JobConf map4Con

43、f = new JobConf(false);ChainReducer.addMapper(job,Map4.class,LongWritable.class,Text.class,LongWritable.class,Text.class,true,map4Conf);JobClient.runJob(job);DataJoinMapperBaseDataJoinReducerBaseTaggedMapOutputprotected abstract Text generateInputTag(String inputFile);protected abstract TaggedMapOut

44、put generateTaggedMapOutput(Object value);protected abstract Text generateGroupKey(TaggedMapOutput aRecord);protected Text generateInputTag(String inputFile) String datasource = inputFile.split(-)0;return new Text(datasource);protected TaggedMapOutput generateTaggedMapOutput(Object value) TaggedWrit

45、able retv = new TaggedWritable(Text) value);retv.setTag(this.inputTag);return retv;protected TaggedMapOutput generateTaggedMapOutput(Object value) TaggedWritable retv = new TaggedWritable(Text) value);retv.setTag(this.inputTag);return retv;public void map(Object key, Object value,OutputCollector out

46、put, Reporter reporter) throws IOExceptionTaggedMapOutput aRecord = generateTaggedMapOutput(value);Text groupKey = generateGroupKey(aRecord);output.collect(groupKey, aRecord);combine()实现inner join, left outer join等protected abstract TaggedMapOutputcombine(Object tags, Object values);tags = Customers

47、, Orders;values = 3,Jose Madriz,281-330-8004, A,12.95,02-Jun-2008;protected TaggedMapOutput combine(Object tags, Object values) if (tags.length 2) return null;String joinedStr = ;for (int i=0; i 0) joinedStr += ,;TaggedWritable tw = (TaggedWritable) valuesi;String line = (Text) tw.getData().toString

48、();String tokens = line.split(, 2);joinedStr += tokens1;TaggedWritable retv = new TaggedWritable(new Text(joinedStr);retv.setTag(Text) tags0);return retv;public static class TaggedWritable extends TaggedMapOutput private Writable data;public TaggedWritable(Writable data) this.tag = new Text();this.d

49、ata = data;public Writable getData() return data; public void write(DataOutput out) throws IOException this.tag.write(out);this.data.write(out);public void readFields(DataInput in) throws IOException this.tag.readFields(in);this.data.readFields(in);distributed cachepublic static class MapClass exten

50、ds MapReduceBaseimplements Mapper private Hashtable joinData =new Hashtable();Overridepublic void configure(JobConf conf) public void map(Text key, Text value,OutputCollector output,Reporter reporter) throws IOException public void configure(JobConf conf) try Path cacheFiles = DistributedCache.getLo

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 大学
版权提示 | 免责声明

1,本文(高级数据库课件:06-Hadoop.pptx)为本站会员(罗嗣辉)主动上传,163文库仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。
2,用户下载本文档,所消耗的文币(积分)将全额增加到上传者的账号。
3, 若此文所含内容侵犯了您的版权或隐私,请立即通知163文库(发送邮件至3464097650@qq.com或直接QQ联系客服),我们立即给予删除!


侵权处理QQ:3464097650--上传资料QQ:3464097650

【声明】本站为“文档C2C交易模式”,即用户上传的文档直接卖给(下载)用户,本站只是网络空间服务平台,本站所有原创文档下载所得归上传人所有,如您发现上传作品侵犯了您的版权,请立刻联系我们并提供证据,我们将在3个工作日内予以改正。


163文库-Www.163Wenku.Com |网站地图|