1、NoSQL的技术基础及其实践Nosql_yinjunwen主题:nosql_proj1_学号_姓名_1/2/3简介分布式HBASE体系结构过滤器CoprocessorsSchema设计hbase(main):001:0 scan table1ROW COLUMN+CELLrow-1 column=cf1:, timestamp=1297073325971 .row-10 column=cf1:, timestamp=1297073337383 .row-11 column=cf1:, timestamp=1297073340493 .row-2 column=cf1:, timestamp=1
2、297073329851 .row-22 column=cf1:, timestamp=1297073344482 .row-3 column=cf1:, timestamp=1297073333504 .row-abc column=cf1:, timestamp=1297073349875 .7 row(s) in 0.1100 seconds(Table, RowKey, Family, Column, Timestamp) ValueSortedMapRowKey, ListSortedMapColumn, ListSortedMapRowKey, ListSortedMapColum
3、n, List数据存储的基本单位,一般64K持久化的、排序的不可变映射KeyValue数据更新操作写入预写日志(write-ahead log)Create a table$ hbase shellHBase Shell; enter help for list of supported commands.Type exit to leave the HBase ShellVersion 0.92.0, r1231986, Mon Jan 16 13:16:35 UTC 2012hbase(main):001:0create users, info0 row(s) in 0.1200 seco
4、ndshbase(main):002:0hbase(main):002:0 listTABLEusers1 row(s) in 0.0220 secondshbase(main):003:0 describe usersDESCRIPTION ENABLEDNAME = users, FAMILIES = NAME = info, trueBLOOMFILTER = NONE, REPLICATION_SCOPE = 0, COMPRESSION = NONE, VERSIONS = 3, TTL= 2147483647, BLOCKSIZE = 65536, IN_MEMORY = fals
5、e, BLOCKCACHE = true1 row(s) in 0.0330 secondsHTableInterface usersTable = new HTable(users);Configuration myConf = HBaseConfiguration.create();HTableInterface usersTable = new HTable(myConf, users);HTablePool pool = new HTablePool();HTableInterface usersTable = pool.getTable(users);. / work with th
6、e tableusersTable.close();Hbase的每一行都有一个标识符,称为rowkey数据操作的API成为命令GetPutDeleteScanIncrementPut p = new Put(Bytes.toBytes(TheRealMT);p.add(Bytes.toBytes(info),Bytes.toBytes(name),Bytes.toBytes(Mark Twain);p.add(Bytes.toBytes(info),Bytes.toBytes(email),Bytes.toBytes(samuelclemens.org);p.add(Bytes.toBytes
7、(info),Bytes.toBytes(password),Bytes.toBytes(Langhorne);HTableInterface usersTable = pool.getTable(users);Put p = new Put(Bytes.toBytes(TheRealMT);p.add(.);usersTable.put(p);usersTable.close();Put p = new Put(Bytes.toBytes(TheRealMT);p.add(Bytes.toBytes(info),Bytes.toBytes(password),Bytes.toBytes(ab
8、c123);usersTable.put(p); write-ahead log (WAL )HLog MemStoreHFile一个列族可以有多个HFile一个HFile不会存储多个列族的数据Get g = new Get(Bytes.toBytes(TheRealMT);Result r = usersTable.get(g);Get g = new Get(Bytes.toBytes(TheRealMT);g.addColumn(Bytes.toBytes(info),Bytes.toBytes(password);Result r = usersTable.get(g);Get g =
9、 new Get(Bytes.toBytes(TheRealMT);g.addFamily(Bytes.toBytes(info);byte b = r.getValue(Bytes.toBytes(info),Bytes.toBytes(email);String email = Bytes.toString(b);Delete d = new Delete(Bytes.toBytes(TheRealMT);usersTable.delete(d);Delete d = new Delete(Bytes.toBytes(TheRealMT);d.deleteColumns(Bytes.toB
10、ytes(info),Bytes.toBytes(email);usersTable.delete(d);Delete将记录打上“删除”标记“删除”标记记录不会在Get和Scan的返回结果中“删除”标记记录在主要紧致(major compaction)时释放空间次要紧致将小HFile合并成大HFile主要紧致则将Region中的HFile合并成一个List passwords = r.getColumn(Bytes.toBytes(info),Bytes.toBytes(password);b = passwords.get(0).getValue();String currentPasswd
11、 = Bytes.toString(b); / abc123b = passwords.get(1).getValue();String prevPasswd = Bytes.toString(b); / Langhornelong version =passwords.get(0).getTimestamp(); / 1329088818321 TableHBase organizes data into tables Row Within a table, data is stored according to its row Column family Data within a row
12、 is grouped by column family Column qualifier Data within a column family is addressed via its column qualifier ,or column Cell A combination of rowkey, column family, and column qualifier uniquelyidentifies a cell . Version Values within a cell are versionedKeyvalueTheRealMT, info, password, 132908
13、8818321abc123TheRealMT, info, password1329088818321 : abc123,1329088321289 : LanghorneTheRealMT, infoemail : 1329088321289 : samuelclemens.org”,password : 1329088818321 : abc123,1329088321289 : Langhorne“TheRealMTinfo : “name” : 1329088321289 : “Mark Twain”,email : 1329088321289 : samuelclemens.org”
14、,password : 1329088818321 : abc123,1329088321289 : LanghorneScan s = new Scan();Scan s = new Scan(Bytes.toBytes(T),Bytes.toBytes(U);byte userHash = Md5Utils.md5sum(user);byte startRow = Bytes.padTail(userHash, longLength); / 212d.866f00.byte stopRow = Bytes.padTail(userHash, longLength);stopRowMd5Ut
15、ils.MD5_LENGTH-1+; / 212d.867000.Scan s = new Scan(startRow, stopRow);ResultsScanner rs = twits.getScanner(s);for(Result r : rs) byte b = r.getValue(Bytes.toBytes(twits),Bytes.toBytes(user);String user = Bytes.toString(b);b = r.getValue(Bytes.toBytes(twits),Bytes.toBytes(twit);String message = Bytes
16、.toString(b);b = Arrays.copyOfRange(r.getRow(),Md5Utils.MD5_LENGTH,Md5Utils.MD5_LENGTH + longLength);DateTime dt = new DateTime(-1 * Bytes.toLong(b);Scan s = new Scan();s.addColumn(TWITS_FAM, TWIT_COL);String expression = ValueFilter(=,regexString:.*TwitBase.*);ParseFilter p = new ParseFilter();Filt
17、er f = p.parseSimpleFilterExpression( Bytes.toBytes(expression);s.setFilter(f);long ret = usersTable.incrementColumnValue(Bytes.toBytes(TheRealMT),Bytes.toBytes(info),Bytes.toBytes(tweet_count),1L); Map tasks, one per region. The tasks take the key range of the region as their input split and scan o
18、ver it Regions being served by the RegionServerpublic void map(ImmutableBytesWritable row, Result columns, Context context) throws IOException context.getCounter(Counters.ROWS).increment(1);String value = null;try for (KeyValue kv : columns.list() context.getCounter(Counters.COLS).increment(1);value
19、 = Bytes.toStringBinary(kv.getValue();JSONObject json = (JSONObject) parser.parse(value);String author = (String) json.get(author);context.write(new Text(author), ONE);context.getCounter(Counters.VALID).increment(1); catch (Exception e) e.printStackTrace();System.err.println(Row: + Bytes.toStringBin
20、ary(row.get() +, JSON: + value);context.getCounter(Counters.ERROR).increment(1);Scan scan = new Scan();if (column != null) byte colkey = KeyValue.parseColumn(Bytes.toBytes(column);if (colkey.length 1) scan.addColumn(colkey0, colkey1); else scan.addFamily(colkey0);Job job = new Job(conf, Analyze data
21、 in + table);job.setJarByClass(AnalyzeData.class);TableMapReduceUtil.initTableMapperJob(table, scan, AnalyzeMapper.class,Text.class, IntWritable.class, job);job.setReducerClass(AnalyzeReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setNumReduceTasks(1)
22、;FileOutputFormat.setOutputPath(job, new Path(output);System.exit(job.waitForCompletion(true) ? 0 : 1);TableMapReduceUtil.initTableReducerJob(users,IdentityTableReducer.class,job);Map-side join with HBaseRoot-level filesTable-level filesRegion-level filesRegion splitsCompactionsHLogKey除了记录Key之外,还包含r
23、egion、表以及写入时间戳用户的每次修改会产生若干个KeyValue,WALEdit包含一次修改的所有KeyValue,保证一致性LogSyncer实现日志的flushLogRoller实现日志文件的循环利用使用一个HLog文件存储所有日志利用HDFS实现,减少I/O和设计复杂度在日志回放时,拆分日志文件,利用Zookeeper完成日志回放Region server检查Recovered.edits文件,进行Region重建StateDescriptionOfflineThe region is offline.Pending OpenA request to open the region
24、 was sent to the server.OpeningThe server has started opening the region.OpenThe region is open and fully operational.Pending CloseA request to close the region has been sent to the server.ClosingA request to close the region has been sent to the server.ClosedThe region is closed.SplittingThe server
25、 started splitting the region.SplitThe server started splitting the region.(HBase cluster) master pushThe replication is done asynchronouslyWALEdits-based replication used to maintain atomicityThe HLogs from each region server are the basis of HBase replicationPut、Delete、Increment转换成WALEdit,写入日志处理线程
26、读取日志,将待复制的KeyValue读入缓冲,缓冲满(或者日志尾),发送给随机选择的复制Region Server,并将WAL的复制位置注册到ZooKeeper如果复制Region Server没有响应,WAL回滚,并记录到ZooKeeper,另行选择复制Region Server收到修改请求后,缓冲,直到所有修改请求到达,写入表OperatorDescriptionLESSMatch values less than the provided one.LESS_OR_EQUALMatch values less than or equal to the provided one.EQUALD
27、o an exact match on the value and the provided one.NOT_EQUALInclude everything that does not match the provided value.GREATER_OR_EQUALMatch values that are equal to or greater than the provided one.GREATEROnly include values greater than the provided one.NO_OPExclude everything.ComparatorDescription
28、BinaryComparatorUses BpareTo() to compare the current with the provided value.BinaryPrefixComparatorSimilar to the above, but does a lefthand, prefix-based match using BpareTo().NullComparatorDoes not compare against an actual value but whether a given one is null, or not null.BitComparatorPerforms
29、a bitwise comparison, providing a BitwiseOp class with AND, OR, and XOR operators.RegexStringComparatorGiven a regular expression at instantiation this comparator does a pattern match on the table data.SubstringComparatorGiven a regular expression at instantiation this comparator does a pattern matc
30、h on the table data.CompareFilter(CompareOp valueCompareOp,WritableByteArrayComparable valueComparator)Row-FilterFamily-FilterQualifier-FilterValue-FilterDependent-ColumnFilterSingleColumnValueFilterSingleColumnValueExcludeFilterPrefixFilterPageFilterKeyOnlyFilterFirstKeyOnlyFilterInclusiveStopFilte
31、rTimestampsFilterColumnCountGetFilterColumnPaginationFilterColumnPrefixFilterRandomRowFilterSkipFilterWhileMatchFilterFilterList(List rowFilters)FilterList(Operator operator)FilterList(Operator operator, List rowFilters)OperatorMUST_PASS_ALL, MUST_PASS_ONEvoid addFilter(Filter filter)public interfac
32、e Filter extends Writable public enum ReturnCode INCLUDE, SKIP, NEXT_COL, NEXT_ROW, SEEK_NEXT_USING_HINTpublic void reset()public boolean filterRowKey(byte buffer, int offset, int length)public boolean filterAllRemaining()public ReturnCode filterKeyValue(KeyValue v)public void filterRow(List kvs)pub
33、lic boolean hasFilterRow()public boolean filterRow()public KeyValue getNextKeyHint(KeyValue currentKV)1.Client sends Put request.2.Request is dispatched to appropriate RegionServer and region.3.CoprocessorHost intercepts the request and invoices prePut() on each RegionObserver registered on the tabl
34、e.4.Unless interrupted by a prePut(), the request continues to region and is processed normally.5.The result produced by the region is once again intercepted by the CoprocessorHost.This time postPut() is called on each registered RegionObserver.6.Assuming no postPut() interrupts the response, the fi
35、nal result is returned to ResponseOverridepublic void postPut(final ObserverContext e,final Put put, final WALEdit edit, final boolean writeToWAL)throws IOException byte table= e.getEnvironment().getRegion().getRegionInfo().getTableName();if (!Bytes.equals(table, FOLLOWS_TABLE_NAME) return;KeyValue
36、kv = put.get(RELATION_FAM, FROM).get(0);String from = Bytes.toString(kv.getValue();kv = put.get(RELATION_FAM, TO).get(0);String to = Bytes.toString(kv.getValue();RelationsDAO relations = new RelationsDAO(pool);relations.addFollowedBy(to, from);$ hbase shellhbase(main):001:0 disable followshbase(main
37、):002:0 alter follows, METHOD = table_att, coprocessor=file:/Users/ndimiduk/repos/hbaseiatwitbase/target/twitbase-1.0.0.jar |HBaseIA.TwitBase.coprocessors.FollowsObserver|1001|hbase(main):004:0 describe followspublic class RelationCountImplextends BaseEndpointCoprocessor implements RelationCountProt
38、ocol Overridepublic long followedByCount(String userId) throws IOException byte startkey = Md5Utils.md5sum(userId);Scan scan = new Scan(startkey);scan.setFilter(new PrefixFilter(startkey);scan.addColumn(RELATION_FAM, FROM);scan.setMaxVersions(1);RegionCoprocessorEnvironment env = (RegionCoprocessorE
39、nvironment)getEnvironment();InternalScanner scanner = env.getRegion().getScanner(scan);long sum = 0;List results = new ArrayList();boolean hasMore = false;do hasMore = scanner.next(results);sum += results.size();results.clear(); while (hasMore);scanner.close();return sum;public long followedByCount
40、(final String userId) throws Throwable HTableInterface followed = pool.getTable(FOLLOWED_TABLE_NAME);final byte startKey = Md5Utils.md5sum(userId);final byte endKey = Arrays.copyOf(startKey, startKey.length);endKeyendKey.length-1+;Batch.Call callable =new Batch.Call() Overridepublic Long call(Relati
41、onCountProtocol instance) throws IOException return instance.followedByCount(userId);Map results = followed.coprocessorExec(RelationCountProtocol.class, startKey, endKey, callable);long sum = 0;for(Map.Entry e : results.entrySet() sum += e.getValue().longValue();return sum;$HBASE_HOME/confhbase-site
42、.xmlhbase.coprocessor.region.classesHBaseIA.TwitBase.coprocessors.RelationCountImplhbase-env.shexport HBASE_CLASSPATH=/path/to/hbaseia-twitbase/target/twitbase-1.0.0.jar : : : : 12345 : data : 5fc38314-e290-ae5da5fc375d : 1307097848 : Hi Lars, .12345 : data : 725aae5f-d72e-f90f3f070419 : 1307099848
43、: Welcome, and .12345 : data : cc6775b3-f249-c6dd2b1a7467 : 1307101848 : To Whom It .12345 : data : dcbee495-6d5e-6ed48124632c : 1307103848 : Hi, how are .- : : : : 12345-5fc38314-e290-ae5da5fc375d : data : : 1307097848 : Hi Lars, .12345-725aae5f-d72e-f90f3f070419 : data : : 1307099848 : Welcome, an
44、d .12345-cc6775b3-f249-c6dd2b1a7467 : data : : 1307101848 : To Whom It .12345-dcbee495-6d5e-6ed48124632c : data : : 1307103848 : Hi, how are .n=表中KeyValue项的数量b=HFile中块的数量e=HFile中KeyValue项的平均数量c=每行中平均列数e=MemStore中KeyValue项的可能数量(近似)MemStore命中O(1) region搜索+ O(log e) 在MemeStore中定位KeyValueMemStore不命中O(1)
45、 region搜索+ O(log b) HFile块搜索 + O(max(c,e/b) 记录定位外加磁盘访问时间Tall-NarrowFlat WideIf we query using a row ID, it will skip rows fasterThis has to be queried using a column name that will not skip rows or store filesNot good for atomicityThis has to be queried using a column name that will not skip rows or
46、 store filesBetter for scalabilityNot as good for scalability as Tall-Thin isCommandDescriptionScan over all messages for a given user ID.-Scan over all messages on a given date for the given user ID.-Scan over all parts of a message for a given user ID and date.-Scan over all attachments of a messa
47、ge for a given user IDand date.使用salting技术将数据分布到regionint salt = new Integer(new Long(timestamp).hashCode().shortValue() % Hash变长数据转换成定长数据MD5较快,SHA-1的冲突较少将较常用于查询的数据项内容放在Key的前面,利于让他们存放在相邻位置使用类似于Long.MAX_VALUE-timstamp的形式支持逆序搜索将Timestamp作为key将某些列(甚至,值)作为Key(Fields swap/promotion)使用Hash将Timestamp随机化MD5
48、(UserID1) + MD5(UserID2)优于MD5(UserID1 + UserID2)便于检索减少冲突使用Script完成相应操作借助Coprocessor实现同步Indexed-Transactional Hbase提供IndexedTableDescriptor定义二级索引表的索引关系Indexed Hbase在主存维持索引关系搜索事务Bloom过滤器为Value建立索引使用Lucene或者Solr为表建立索引Hbaseneprotected void doInitDocs() throws CorruptIndexException, IOException Configura
49、tion conf = HBaseConfiguration.create();HBaseIndexStore.createLuceneIndexTable(idxtbl, conf, true);tablePool = new HTablePool(conf, 10);HBaseIndexStore hbaseIndex = new HBaseIndexStore(tablePool, conf,idxtbl);HBaseIndexWriter indexWriter = new HBaseIndexWriter(hbaseIndex, id)for (int i = 100; i = 0; -i) Document doc = getDocument(i);indexWriter.addDocument(doc, new StandardAnalyzer(Version.LUCENE_30);Indexed Transactional Hbase提供beginTransac/commit接口ZooKeeper提供了两阶段提交的锁机制也可以将它作为协调机制实现锁抽象, Cages