数据从Kafka导入到Hbase
一、数据从Kafka导入到Hbase
前面两篇博客是第一步和第二步操作
(1)将数据通过flume导入kafka:
(2)将导入kafka的数据通过kafka streaming的方式对数据进行处理,从一个topic传入到另一个topic:
(3)将kafka中的数据导入到hbase
举例:将topic:userfriend中的数据导入到hbase的‘events_db:user_friends’中
(1) 非面向对象写法
public class UserFriendToHB {
static int num = 0;
public static void main(String[] args) {
// kafka消费端属性 配置
final Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.153.141:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
// prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000");
// 设置是否自动提交,获取数据的装态 false 手动提交 true 自动提交
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"user_friend_group");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("user_friends"));
// 配置hbase信息,连接hbase数据库
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.HBASE_DIR, "hdfs://192.168.153.141:9000/hbase");
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.153.141");
conf.set(HConstants.CLIENT_PORT_STR, "2181");
// "hbase.zookeeper.property.clientPort",
try {
Connection connection = ConnectionFactory.createConnection(conf);
Table userFriendTable = connection
.getTable(TableName.valueOf("events_db:user_friend"));
while (true){
ConsumerRecords<String, String> poll = consumer.poll(100);
List<Put> datas = new ArrayList<>();
for (ConsumerRecord record : poll) {
System.out.println(record.value().toString());
String[] split = record.value().toString().split(",");
// 30386403 30279525
Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));
put.addColumn("uf".getBytes(),"userid".getBytes(),split[0].getBytes());
put.addColumn("uf".getBytes(),"friendid".getBytes(),split[1].getBytes());
datas.add(put);
}
num+=datas.size();
System.out.println("----------------------------------num: " +num);
if(datas.size()!=0) {
userFriendTable.put(datas);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
(2) 面向对象(OOP)写法
1)先将这一段写成接口,这里面的内容根据不同的表数据结构而不同,其余部分的代码都几乎是不用变化的
定义接口:
public interface IParseRecord {
public List<Put> parse(ConsumerRecords<String, String> records);
}
实现接口:
public class UserFriendHandler implements IParseRecord {
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
List<Put> datas = new ArrayList<>();
for (ConsumerRecord record : records) {
System.out.println(record.value().toString());
String[] split = record.value().toString().split(",");
Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));
put.addColumn("uf".getBytes(),"userid".getBytes(),split[0].getBytes());
put.addColumn("uf".getBytes(),"friendid".getBytes(),split[1].getBytes());
datas.add(put);
}
return datas;
}
}
2)将写入hbase部分写成接口形式(红色方框的部分)
定义接口:
public interface IWriter {
public int write(String tableName, ConsumerRecords<String, String> records);
}
实现接口:
public class HBaseWriter implements IWriter {
private Connection connection=null;
private IParseRecord handler=null;
public HBaseWriter(IParseRecord handler) {
this.handler = handler;
// 配置hbase信息,连接hbase数据库
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.HBASE_DIR, "hdfs://192.168.64.2:9000/hbase");
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.64.2");
conf.set(HConstants.CLIENT_PORT_STR, "2181");
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public int write(String tableName, ConsumerRecords<String, String> records) {
try {
Table table = connection
.getTable(TableName.valueOf(tableName));
List<Put> datas = handler.parse(records);
table.put(datas);
table.close();
return datas.size();
} catch (IOException e) {
e.printStackTrace();
}
return 0;
}
}
3)最后将kafka消费端属性配置写入接口
定义接口:
public interface IWorker {
/**
* 目标表名字
* @param targetName
*/
public void fillData(String targetName);
}
实现接口:
public class Worker implements IWorker {
KafkaConsumer<String, String> consumer;
IWriter writer = null;
public Worker(String topicName, String groupId, IWriter writer){
this.writer = writer;
final Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.2:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
// prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000");
// 设置是否自动提交,获取数据的装态 false 手动提交 true 自动提交
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton(topicName)); //"user_friends"
}
@Override
public void fillData(String targetName) {
int sum=0;
try {
while (true){
ConsumerRecords<String, String> records = consumer.poll(100);
int count = writer.write(targetName, records);
sum+=count;
System.out.println("处理数据量:" + sum);
Thread.sleep(50);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
二、提取接口遵循规则
个人觉得是 将一个完整的代码中,处理某一个动作的那一部分代码提取成接口(本例中可以分为三个阶段:kafka消费端的配置代码、hbase写入数据的配置代码、将kafka中的数据写入hbase的具体处理过程)
接口中方法的传入参数和返回值类型如何确定?
传入的参数是那一段将要封装的代码需要用到的并且需要外部传入的变量值
返回值参数是这一段要封装的代码最后得到的结果,如果该结果后续代码还需用到,则将它返回;如果后续用不到,则无需返回值。
更多推荐