java获取kafka当前的offset,获取Kafka Consumer的offset
从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的t
从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。
Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。
第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
importkafka.api.*;
importkafka.cluster.Broker;
importkafka.common.OffsetAndMetadata;
importkafka.common.OffsetMetadataAndError;
importkafka.common.TopicAndPartition;
importkafka.javaapi.ConsumerMetadataResponse;
importkafka.javaapi.OffsetCommitRequest;
importkafka.javaapi.OffsetCommitResponse;
importkafka.javaapi.OffsetFetchRequest;
importkafka.javaapi.OffsetFetchResponse;
importkafka.network.BlockingChannel;
importjava.util.*;
...
try{
BlockingChannelchannel=newBlockingChannel("localhost",9092,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000/* read timeout in millis */);
channel.connect();
finalStringMY_GROUP="demoGroup";
finalStringMY_CLIENTID="demoClientId";
intcorrelationId=0;
finalTopicAndPartitiontestPartition0=newTopicAndPartition("demoTopic",0);
finalTopicAndPartitiontestPartition1=newTopicAndPartition("demoTopic",1);
channel.send(newConsumerMetadataRequest(MY_GROUP,ConsumerMetadataRequest.CurrentVersion(),correlationId++,MY_CLIENTID));
ConsumerMetadataResponsemetadataResponse=ConsumerMetadataResponse.readFrom(channel.receive().buffer());
if(metadataResponse.errorCode()==ErrorMapping.NoError()){
BrokeroffsetManager=metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel=newBlockingChannel(offsetManager.host(),offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000/* read timeout in millis */);
channel.connect();
}else{
// retry (after backoff)
}
}
catch(IOExceptione){
// retry the query (after backoff)
}
...
第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// How to commit offsets
longnow=System.currentTimeMillis();
Mapoffsets=newLinkedHashMap();
offsets.put(testPartition0,newOffsetAndMetadata(100L,"associated metadata",now));
offsets.put(testPartition1,newOffsetAndMetadata(200L,"more metadata",now));
OffsetCommitRequestcommitRequest=newOffsetCommitRequest(
MY_GROUP,
offsets,
correlationId++,
MY_CLIENTID,
(short)1/* version */);// version 1 and above commit to Kafka, version 0 commits to ZooKeeper
try{
channel.send(commitRequest.underlying());
OffsetCommitResponsecommitResponse=OffsetCommitResponse.readFrom(channel.receive().buffer());
if(commitResponse.hasError()){
for(partitionErrorCode:commitResponse.errors().values()){
if(partitionErrorCode==ErrorMapping.OffsetMetadataTooLargeCode()){
// You must reduce the size of the metadata if you wish to retry
}elseif(partitionErrorCode==ErrorMapping.NotCoordinatorForConsumerCode()||partitionErrorCode==ErrorMapping.ConsumerCoordinatorNotAvailableCode()){
channel.disconnect();
// Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
}else{
// log and retry the commit
}
}
}
}
catch(IOExceptionioe){
channel.disconnect();
// Go to step 1 and then retry the commit
}
// How to fetch offsets
Listpartitions=newArrayList();
partitions.add(testPartition0);
OffsetFetchRequestfetchRequest=newOffsetFetchRequest(
MY_GROUP,
partitions,
(short)1/* version */,// version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
MY_CLIENTID);
try{
channel.send(fetchRequest.underlying());
OffsetFetchResponsefetchResponse=OffsetFetchResponse.readFrom(channel.receive().buffer());
OffsetMetadataAndErrorresult=fetchResponse.offsets().get(testPartition0);
shortoffsetFetchErrorCode=result.error();
if(offsetFetchErrorCode==ErrorMapping.NotCoordinatorForConsumerCode()){
channel.disconnect();
// Go to step 1 and retry the offset fetch
}elseif(errorCode==ErrorMapping.OffsetsLoadInProgress()){
// retry the offset fetch (after backoff)
}else{
longretrievedOffset=result.offset();
StringretrievedMetadata=result.metadata();
}
}
catch(IOExceptione){
channel.disconnect();
// Go to step 1 and then retry offset fetch after backoff
}
####
更多推荐
所有评论(0)