php/perl/python , 通过thrift 连接 hbase,进行条件过滤选择
云计算nosql中的棋手级产品,hbase应用越来越广。所以我也用到了,咨询了相关同事,为了将来存取海量数据,rowkey 设计成 uid_ 之类的形式。但是产品中,肯定会有根据uid取得 bissid列表的需求,根据文档,用scan,filter 可以方便的取出来。 同时,我还记录了thrift 部署及环境的相关问题,网上诸多文档千篇一律,很多都是有问题的。或者没说明什么原因。
云计算nosql中的棋手级产品,hbase应用越来越广。所以我也用到了,咨询了相关同事,为了将来存取海量数据,rowkey 设计成 uid_<bissid> 之类的形式。
但是产品中,肯定会有根据uid取得 bissid列表的需求,根据文档,用scan,filter 可以方便的取出来。
同时,我还记录了thrift 部署及环境的相关问题,网上诸多文档千篇一律,很多都是有问题的。或者没说明什么原因。
1 安装thrift, 我这里是用的0.8版本的;0.9 很早以前用过,没有弄成功,所以就以这个为基础了。安装步骤就不说了,只要你在linux上装过软件,基本都一样的。thrift的功能很强大,其中之一是异构系统数据交换的一个媒介。想比于rest,他的效率更好。相比于rpc,又没那么复杂,总之是个不错的东西。 如果你有类似的服务提供给别人,可以考虑用thrift; 而hbase的除了native方式的,所以想提供给其他语言访问,hbase就是用了thrift接口。所以,在hbase的源码里会包含一个 Hbase.thrift 的定义文件(这个我们是需要的)
2 用thrift 命令行接口生成各语言客户端包文件。这个就要用到Hbase.thrift 这个定义文件了。可以生成各种主流语言的 gen-[lang] 之类的文件夹了。在哪个目录生成不重要,我稍后会说一下文件部署结构。
3 假设我现在在 ~/test/ 这个目录下,以下所有的说明及命令,我的pwd都是这里。 不加特殊说明,我及不列绝对路径了。
4 把thrfit源码包里对应语言的 lib包,copy到当前目录(~/test/ ) , 以php为例
# mkdir php-src perl-src thrift (本来我想做成py-src的,但是调试时有点问题,就姑且把python的包先放入到这个文件夹吧)
# cp -r xxx/thrift0.8.0/lib/php/src/* php-src
# cp -r xx/thrift0.8.0/lib/php/lib/perl/lib/* perl-src
# cp -r xxx/thrift0.8.0/lib/py/src/* thrift
5 # 把第2 步骤生成的 gen-[lang] 对应的文件mv 到 php-src/packages/ 里(这里的packages文件夹需要你先建立好,当然用其他名字也行;比如 pkg)
6 python perl的也这样mv 过来。但是语言要对应。 perl 到 perl-src/packages. python 到 thrift/packages/
7 接下来是每种语言的脚本
php:
$GLOBALS['THRIFT_ROOT'] = './php-src';
require_once( $GLOBALS['THRIFT_ROOT'] . '/Thrift.php' );
require_once( $GLOBALS['THRIFT_ROOT'] . '/transport/TSocket.php' );
require_once( $GLOBALS['THRIFT_ROOT'] . '/transport/TBufferedTransport.php');
require_once( $GLOBALS['THRIFT_ROOT'] . '/protocol/TBinaryProtocol.php' );
// thrift 文件生成的
require_once( $GLOBALS['THRIFT_ROOT'] . '/packages/Hbase/Hbase.php' ); // 注意这里的packages,就是第5 步里建立的那个文件夹名
$table = 'your_table';
$host = '192.168.1.10';
$port = '9090';
$filter ="PrefixFilter('123_')";
$socket = new TSocket($host, $port);
$socket->setSendTimeout(10000); //
$socket->setRecvTimeout(20000); // Twenty seconds
$transport = new TBufferedTransport($socket);
$protocol = new TBinaryProtocol($transport);
$client = new HbaseClient($protocol);
try{
$transport->open();
$scan = new TScan();
$scan->filterString=$filter;
$scanner = $client->scannerOpenWithScan($table, $scan);
for($i=0; $i<100; $i++) {
echo "============= $i =============== \n";
$get_arr = $client->scannerGetList($scanner,1);
if(!$get_arr) break;
foreach ( $get_arr as $rowresult ){
foreach ($rowresult as $k=>$item) {
echo "row: ".$rowresult->{'row'}."\n";
echo "cols: ";
print_r($rowresult->{'columns'} );
echo "-----\n";
}
}
}
$client->scannerClose($scan );
$transport->close();
}
catch (Exception $e) {
echo "";
}
perl:
#!/bin/env perl
use strict;
use warnings;
use Data::Dumper;
use lib './perl-src';
use lib './perl-src/packages';
use Thrift;
use Thrift::BinaryProtocol;
use Thrift::Socket;
use Thrift::BufferedTransport;
use Hbase::Hbase;
my $host = '192.168.1.10';
my $port = '9090';
my $table = 'your_table';
my $filter ="PrefixFilter('123_')";
my $socket = new Thrift::Socket($host,$port);
my $transport = new Thrift::BufferedTransport($socket,1024,1024);
my $protocol = new Thrift::BinaryProtocol($transport);
my $client = new Hbase::HbaseClient($protocol);
eval{
$transport->open();
my $scan = new Hbase::TScan();
$scan->{filterString} = $filter;
my $scanner = $client->scannerOpenWithScan($table, $scan);
for(1..100) {
print "============ $_ ========\n";
my $get_arr = $client->scannerGetList($scanner,1);
last unless @$get_arr;
#print Dumper ($get_arr);
foreach my $rowresult ( @$get_arr) {
foreach my $k (keys %$rowresult) {
print "row: ",$rowresult->{row}, $/;
print "cols: ";
print Dumper $rowresult->{columns};
print "-----\n";
}
}
}
$client->scannerClose( $scan );
$transport->close();
};
if($@){
warn(Dumper($@));
}
python:
#!/bin/env python
#-*- coding: utf-8 -*-
import sys
sys.path.append('./thrift/packages')
sys.path.append('./thrift')
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.packages.hbase import Hbase
from thrift.packages.hbase.ttypes import *
host = '192.168.1.10'
port = 9090
table = 'your_table'
filter = "PrefixFilter('123_')"
transport = TSocket.TSocket(host, port)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
try:
transport.open()
scan = TScan()
scan.filterString=filter
scanner = client.scannerOpenWithScan(table, scan)
for i in range(1,100):
print "============%d============" %(i)
get_arr = client.scannerGetList(scanner,1)
if not get_arr :
break;
for rowresult in get_arr:
print "row: %s" % (rowresult.row)
for k in rowresult.columns:
print "colomns: %s" % (k)
print '-----'
client.scannerClose( scan )
transport.close()
except AlreadyExists, tx:
print "Thrift exception"
print '%s' % (tx.message)
except:
print "error"
更多推荐
所有评论(0)