1、NoSQL的技术基础及其实践数据的预处理和后处理HivePigFlumeOozieSqoopProtocol BuffrtsThriftAvroXMLJSON压缩数据支持代码的生成支持相互兼容的多版本演化多语言支持透明压缩可分割在MapReduce中的自然支持编码特性Deflate类似于zlib,采用与gzip同样的压缩算法,但是没有gzip头gzip 文件有头部和数据体组成,数据体是一个Deflate压缩载荷bzip2空间效率高LZO基于块 的压缩算法,压缩后的数据可以分割LZOP具有附加头部的LZO,因为许可原因没有打包到Hadoop中Snappy Google的开源压缩算法,可以使用Ma
2、pReduce和BigTable进行压缩。最大缺陷是不可分割。CodecExtensionLicensingSplittableJava-only compressionsupportNative compressionsupportDeflate.deflatezlibNYYgzip .gzGNU GPLNYYbzip2.gzBSDYYNLZO.lzo_deflateGNU GPLNNYLZOP.lzoGNU GPLYNYSnappy .gzNew BSDNNY编码特性Dpress.DeflateCodec gzip press.GzipCodec press.BZip2Codec LZOp
3、ression.lzo.LzoCodec LZOPpression.lzo.LzopCodec Snappy press.SnappyCodec 大量的小文件可以存储在大容量系统中大量的小文件会产生大量的元数据大量的元数据带来文件定位的性能问题将小文件组织成大文件,提供专门的文件索引能力Avro175175TECH NIQUE 24TECH NIQUE 24Usi ng AUsi ng Avrvro o t o st or t o st ore m ul t i pl e sm al l f i l ese m ul t i pl e sm al l f i l esAt thi s poi
4、nt you have Avro fi l es i n H D FS. Even though thi s chapter i s about H D FS, thenext thi ng you l l l i kel y want to do i s to process the fi l es that you wrote i n M apReduce.Let s segue and l ook at how to do that. You l l wri te a m ap- onl y M apReduce j ob thatcan read as i nput the Avro
5、records and wri te out a text fi l e contai ni ng the fi l enam esand M D 5 hashes of the fi l e contents, as shown i n fi gure 5. 2. The next l i sti ng5 shows the code for thi s M apReduce j ob.public class SmallFilesMapReduce public static void main(String. args) throws Exception JobConf job = ne
6、w JobConf();job.setJarByClass(SmallFilesMapReduce.class);Path input = new Path(args0);Path output = new Path(args1);output.getFileSystem(job).delete(output, true);AvroJob.setInputSchema(job, SmallFilesWrite.SCHEMA);job.setInputFormat(AvroInputFormat.class);job.setOutputFormat(TextOutputFormat.class)
7、;job.setMapperClass(Map.class);FileInputFormat.setInputPaths(job, input);A M apReduce j ob that takes as i nput Avro f i l es contai ni ng the sm al l f i l esM apper 1H D FSAvro f i l eRe a d i n p u t s p l i tAv r o r e c o r dsdefcbaInput split 1Input split 2M apR educeM apper 2.a 8e9591a43c.b c
8、00f7cd895.c 93987f1eb3.Text fl eH D FSFi l e n a meMD5 ha s h库库代码生成代码生成版本化版本化语言支持语言支持透明压缩透明压缩可分割可分割MapReduce的自然支持SequenceFile NNJava,PythonYYYProtocol BuffersY(可选)YC+,Java,Python,Perl, RubyNNNThrift Y(强制)YC, C+,Java,Python,Ruby, PerlNNNAvro Y(可选)YC, C+,Java,Python,Ruby, C#YYY% sqoop helpusage: sqoop
9、 COMMAND ARGSAvailable commands:codegen Generate code to interact with database recordscreate-hive-table Import a table definition into Hiveeval Evaluate a SQL statement and display the resultsexport Export an HDFS directory to a database tablehelp List available commandsimport Import a table from a
10、 database to HDFSimport-all-tables Import tables from a database to HDFSlist-databases List available databases on a serverlist-tables List available tables in a databaseversion Display version information% sqoop help importusage: sqoop import GENERIC-ARGS TOOL-ARGSCommon arguments:-connect Specify
11、JDBC connect string-driver Manually specify JDBC driver class to use-hadoop-home Override $HADOOP_HOME-help Print usage instructions-P Read password from console-password Set authentication password-username Set authentication username-verbose Print more information while working% sqoop import -conn
12、ect jdbc:mysql:/localhost/hadoopguide -table widgets -m 110/06/23 14:44:18 INFO tool.CodeGenTool: Beginning code generation.10/06/23 14:44:20 INFO mapred.JobClient: Running job: job_201006231439_000210/06/23 14:44:21 INFO mapred.JobClient: map 0% reduce 0%10/06/23 14:44:32 INFO mapred.JobClient: map
13、 100% reduce 0%10/06/23 14:44:34 INFO mapred.JobClient: Job complete:job_201006231439_0002.10/06/23 14:44:34 INFO mapreduce.ImportJobBase: Retrieved 3 records.-table-target-dir-where-as-sequencefile-as-avrodatafile-compress-map-column-java-num-mappers-exclude-tables-incremental append -check-column
14、id -last-value 1-incremental lastmodified -check-column last_update_date -last-value 2013-05-22 01:01:01”sqoop export -connect jdbc:mysql:/ -username sqoop -password sqoop -table cities -export-dir cities-batchsqoop export -connect jdbc:mysql:/ -username sqoop -password sqoop -table cities -update-k
15、ey id sqoop export -connect jdbc:mysql:/ -username sqoop -password sqoop -table cities -update-key id -update-mode allowinsert Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programsPig generates and compiles a Map/Reduce program(s
16、) on the fly.Ease of programming - It is trivial to achieve parallel execution of simple, embarrassingly parallel data analysis tasks. Complex tasks comprised of multiple interrelated data transformations are explicitly encoded as data flow sequences, making them easy to write, understand, and maint
17、ain.Grunt Shell: Enter Pig commands manually using Pigs interactive shell, Grunt.Script File: Place Pig commands in a script file and run the script.Embedded Program: Embed Pig commands in a host language and run the program. Local Mode: To run Pig in local mode, you need access to a single machine.
18、Hadoop (mapreduce) Mode: To run Pig in hadoop (mapreduce) mode, you need access to a Hadoop cluster and HDFS installation. A = load passwd using PigStorage(:); B = foreach A generate $0 as id;store B into id.out;A = LOAD student_data AS (name: chararray, age: int, gpa: float);B = FOREACH A GENERATE
19、myudfs.UPPER(name);AVGCONCATExampleCOUNTCOUNT_STARDIFFIsEmptyMAXMINSIZESUMTOKENIZEABSACOSASINATANCBRTCEILCOSHCOSEXPFLOORLOGLOG10RANDOMROUNDSINSINHSQRTTANTANHPig Type Java Class bytearrayDataByteArray chararray StringintIntegerlongLongfloatFloat doubleDoubletupleTuplebagDataBagmapMapRawInput = LOAD $
20、INPUT USING com.contextweb.pig.CWHeaderLoader($RESOURCES/schema/wide.xml);input = foreach RawInput GENERATE ContextCategoryId as Category, TagId, URL, Impressions;GroupedInput = GROUP input BY (Category, TagId, URL);result = FOREACH GroupedInput GENERATE group, SUM(input.Impressions) as Impressions;
21、STORE result INTO $OUTPUT USING com.contextweb.pig.CWHeaderStore();RawInput = LOAD $INPUT USING com.contextweb.pig.CWHeaderLoader($RESOURCES/schema/wide.xml);input = foreach RawInput GENERATE ContextCategoryId as Category, DefLevelId , TagId, URL,Impressions;defFilter = FILTER input BY (DefLevelId =
22、 8) or (DefLevelId = 12);GroupedInput = GROUP defFilter BY (Category, TagId, URL);result = FOREACH GroupedInput GENERATE group, SUM(input.Impressions) as Impressions;STORE result INTO $OUTPUT USING com.contextweb.pig.CWHeaderStore();UDF - User Defined FunctionTypes of UDFs:Eval Functions (extends Ev
23、alFunc)Aggregate Functions (extends EvalFunc implements Algebraic)Filter Functions (extends FilterFunc)UDFContextAllows UDFs to get access to the JobConf objectAllows UDFs to pass configuration information between instantiations of the UDF on the front and backends.public class TopLevelDomain extend
24、s EvalFunc Overridepublic String exec(Tuple tuple) throws IOException Object o = tuple.get(0);if (o = null) return null;return Validator.getTLD(o.toString();REGISTER $WORK_DIR/pig-support.jar;DEFINE getTopLevelDomain com.contextweb.pig.udf.TopLevelDomain();AA = foreach input GENERATE TagId, getTopLe
25、velDomain(PublisherDomain) as RootDomainOverride public Tuple getNext() throws IOException tuple = new ArrayList(11); for (int i = 0; i REGISTER target/hadoop-book-1.0.0-SNAPSHOT-jar-with-dependencies.jar;grunt DEFINE LogLoader com.manning.hip.ch11.TypedCommonLogLoader(); grunt logs = LOAD apachelog
26、.txt USING LogLoader;grunt describe logs; logs: remoteAddr: chararray, remoteLogname: chararray,userid: chararray,time: chararray,requestLine: chararray,statusCode: long,objSize: long, method: chararray,resource: chararray,protocol: chararray, epoch: long grunt dump logs; (240.12.0.2,23/Jun/2009:10:
27、40:54 +0300,GET / HTTP/1.1,500,612,.)(240.12.0.2,23/Jun/2009:10:40:54 +0300,GET /favicon.ico HTTP/.)(242.0.22.2,23/Jun/2009:10:54:51 +0300,GET / HTTP/1.1,200,34,.) (242.0.22.2,23/Jun/2009:10:54:51 +0300,GET /favicon.ico HTTP/.) . grunt grpd = GROUP logs BY statusCode; grunt cntd = FOREACH grpd GENER
28、ATE group, COUNT(logs); grunt dump cntd; (200,10) (404,9) (500,2) projected_logs = FOREACH logs GENERATE remoteAddr, statusCode, resource; filtered_logs = FILTER projected_logs BY (NOT (remoteAddr MATCHES 10.* OR remoteAddr MATCHES 192.168.* ORremoteAddr MATCHES (172.16-9.*)|(172.20-9.*)|(172.30-1.*
29、) ) AND statusCode = 400; public class IsPrivateIP extends FilterFunc protected ListRange ipRanges; public IsPrivateIP() ipRanges = new ArrayListRange();ipRanges.add(getRange(10.0.0.0, 10.255.255.255);ipRanges.add(getRange(172.16.0.0, 172.31.255.255);ipRanges.add(getRange(“192.168.0.0”,192.168.255.2
30、55); REGISTER target/hadoop-book-1.0.0-SNAPSHOT-jar-with-dependencies.jar; DEFINE LogLoader com.manning.hip.ch11.TypedCommonLogLoader(); DEFINE IsPrivateIP com.manning.hip.ch11.IsPrivateIP(); logs = LOAD apachelog.txt USING LogLoader; filtered_logs = FILTER logs BY NOT IsPrivateIP(remoteAddr); DUMP
31、filtered_logs; ip_group = GROUP filtered_logs BY (remoteAddr, statusCode); DESCRIBE ip_group;ip_group: group: (remoteAddr: chararray, statusCode: long), filtered_logs: remoteAddr: chararray, statusCode: long, resource: chararray addrstatus_counts = FOREACH ip_group GENERATE FLATTEN(group), COUNT(fil
32、tered_logs); DESCRIBE addrstatus_counts; addrstatus_counts: group:remoteAddr: chararray,group:statusCode:long, long public class PigGeolocationUDF extends EvalFunc private LookupService geoloc; private static final String COUNTRY = country; private final static String DIST_CACHE_GEOIP_NAME = geoip;
33、private final List distributedCacheFiles; public String exec(Tuple input) throws IOException if (input = null | input.size() = 0) return null; Object object = input.get(0); if (object = null) return null; String ip = (String) object; return lookup(ip); public class PigGeolocationUDF extends EvalFunc
34、 protected String lookup(String ip) throws IOException if (geoloc = null) geoloc = new LookupService (./ + DIST_CACHE_GEOIP_NAME, LookupService.GEOIP_MEMORY_CACHE); String country = geoloc.getCountry(ip).getName(); if (N/A.equals(country) return null; return country; DESCRIBE addrstatus_counts; addr
35、status_counts: group:remoteAddr: chararray, group:statusCode: long, long SET mapred.cache.files file:/tmp/GeoIP.dat#geoip;SET mapred.create.symlink yes;DEFINE GeoIP com.manning.hip.ch11.PigGeolocationUDF();countries =FOREACH addrstatus_countsGENERATE *, GeoIP(remoteAddr);DUMP countries;import sys, r
36、andom for line in sys.stdin: fields = line.rstrip(nr) sys.stdout.write(%s,%sn % (fields, random.randint(1, 10) DEFINE cmd python ip_security.py INPUT(stdin using PigStreaming(,) OUTPUT(stdout using PigStreaming(,) SHIP(/tmp/ip_security.py); ip_metadata = STREAM countries THROUGH cmd AS (remoteAddr:
37、chararray, statusCode: long, count: long, country: chararray, severity: int); fs -put test-data/ch11/bad-ip-history.txt . bad_ips = LOAD bad-ip-history.txt USING PigStorage( ) AS (ip: chararray, bad_instances: int); joined_ips = JOIN ip_metadata BY remoteAddr, bad_ips BY ip; DESCRIBE joined_ips; joi
38、ned_ips: ip_metadata:remoteAddr: chararray, ip_metadata:statusCode: long, ip_metadata:count: long, ip_metadata:country: chararray, ip_metadata:severity: int, bad_ips:ip: chararray, bad_ips:bad_instances: int sorted_ips = ORDER joined_ips BY count DESC; LIMITTo reduce the number of tuples in the resu
39、lt to a fixed size SAMPLETo select a random selection of tuples from a relation EXPLAINTo evaluate the execution plan for inefficiencies ILLUSTRATETo show the results of each statement in a given data flow grunt logs = LOAD logs-simple-large.txt AS (ip, date, status); grunt limited_logs = LIMIT logs
40、 10; grunt status_group = GROUP limited_logs BY status; grunt status_counts = FOREACH status_group GENERATE group, COUNT(limited_logs); grunt STORE status_counts INTO status-counts.txt; grunt logs = LOAD logs-simple-large.txt AS (ip, date, status); grunt sampled_logs = SAMPLE logs 0.15; . grunt logs
41、 = LOAD logs-simple-large.txt USING RandomSampleLoader(PigStorage, 50) AS (ip, date, status);grunt logs = LOAD logs-simple-large.txt AS (ip, date, status);grunt bad_request = filter logs by status = 400; grunt dump bad_request; Input(s): Successfully read 200 records (5867 bytes) from: hdfs:/. Outpu
42、t(s): Successfully stored 0 records in: hdfs:/. grunt illustrate bad_request; - | logs| ip: bytearray | date: bytearray | status: bytearray | - | | 127.0.0.1 | 10/Apr/2007 | 400 | | | 127.0.0.1 | 10/Apr/2007 | 404 | - -| bad_request | ip: bytearray | date: bytearray | status: bytearray | - | | 127.0
43、.0.1 | 10/Apr/2007 | 400 | - grunt logs = LOAD logs-simple-large.txt AS (ip, date, status); grunt not_found = filter logs by status = 404; grunt explain not_found; #- # Logical Plan: #- fake: Store 1-186 Schema: ip: bytearray,date: bytearray, status: bytearray Type: Unknown|-not_found:Filter1-185Sch
44、ema:ip:bytearray, date: bytearray, status: bytearray Type: bag | | Equal 1-184 FieldSchema: boolean Type: boolean| | |-Const 1-183( 404 ) FieldSchema: int Type: int | |-Cast 1-187 FieldSchema: int Type: int | | | | | |-logs: Load 1-177 Schema: ip: bytearray, date: bytearray, status: bytearray Type:
45、bag Logical data partitioningMetastore (command line and web interfaces)Query LanguageLibraries to handle different serialization formats (SerDes)JDBC interfaceHive EntitySample Metastore EntitySample HDFS LocationTableT/wh/TPartitiondate=d1/wh/T/date=d1Bucketing columnuserid/wh/T/date=d1/part-0000/
46、wh/T/date=d1/part-1000(hashed on userid)External TableextT/wh2/existing/dir(arbitrary location)Primitive Types integer types, float, string, date, booleanNest-able Collections array mapUser-defined types structures with attributes which can be of any-typeDDLcreate/alter/drop table/view/partitioncrea
47、te table as selectDMLInsert overwriteQLSub-queries in from clauseEqui-joins (including Outer joins)Multi-table InsertSamplingLateral ViewsInterfacesJDBC/ODBC/ThriftStatus updates table:status_updates(userid int, status string, ds string)Load the data from log files:LOAD DATA LOCAL INPATH /logs/statu
48、s_updates INTO TABLE status_updates PARTITION (ds=2009-03-20)User profile tableprofiles(userid int, school string, gender int)Filter status updates containing michael jacksonSELECT * FROM status_updates WHERE status LIKE michael jacksonFigure out total number of status_updates in a given daySELECT C
49、OUNT(1) FROM status_updates WHERE ds = 2009-08-01FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid = b.userid and a.ds=2009-03-20 ) ) subq1INSERT OVERWRITE TABLE gender_summary PARTITION(ds=2009-03-20)SELECT subq1.gender, COUNT(1) GROUP BY subq1.genderINSER
50、T OVERWRITE TABLE school_summary PARTITION(ds=2009-03-20)SELECT subq1.school, COUNT(1)GROUP BY subq1.schoolDDLHQL内置/UDF/UADFHQL+UDF, Pig, MapReduceDDLSQL统计函数分析函数Table Partition Sampling TablePartition Bucket NUMBER(n) NUMBER(n,m)VARCHAR2 DATE TINYINT INT/BIGINT FLOAT/DOUBLE STRING STRING “yyyy-MM-dd