从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。

Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。

第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

importkafka.api.*;

importkafka.cluster.Broker;

importkafka.common.OffsetAndMetadata;

importkafka.common.OffsetMetadataAndError;

importkafka.common.TopicAndPartition;

importkafka.javaapi.ConsumerMetadataResponse;

importkafka.javaapi.OffsetCommitRequest;

importkafka.javaapi.OffsetCommitResponse;

importkafka.javaapi.OffsetFetchRequest;

importkafka.javaapi.OffsetFetchResponse;

importkafka.network.BlockingChannel;

importjava.util.*;

...

try{

BlockingChannelchannel=newBlockingChannel("localhost",9092,

BlockingChannel.UseDefaultBufferSize(),

BlockingChannel.UseDefaultBufferSize(),

5000/* read timeout in millis */);

channel.connect();

finalStringMY_GROUP="demoGroup";

finalStringMY_CLIENTID="demoClientId";

intcorrelationId=0;

finalTopicAndPartitiontestPartition0=newTopicAndPartition("demoTopic",0);

finalTopicAndPartitiontestPartition1=newTopicAndPartition("demoTopic",1);

channel.send(newConsumerMetadataRequest(MY_GROUP,ConsumerMetadataRequest.CurrentVersion(),correlationId++,MY_CLIENTID));

ConsumerMetadataResponsemetadataResponse=ConsumerMetadataResponse.readFrom(channel.receive().buffer());

if(metadataResponse.errorCode()==ErrorMapping.NoError()){

BrokeroffsetManager=metadataResponse.coordinator();

// if the coordinator is different, from the above channel's host then reconnect

channel.disconnect();

channel=newBlockingChannel(offsetManager.host(),offsetManager.port(),

BlockingChannel.UseDefaultBufferSize(),

BlockingChannel.UseDefaultBufferSize(),

5000/* read timeout in millis */);

channel.connect();

}else{

// retry (after backoff)

}

}

catch(IOExceptione){

// retry the query (after backoff)

}

...

第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

// How to commit offsets

longnow=System.currentTimeMillis();

Mapoffsets=newLinkedHashMap();

offsets.put(testPartition0,newOffsetAndMetadata(100L,"associated metadata",now));

offsets.put(testPartition1,newOffsetAndMetadata(200L,"more metadata",now));

OffsetCommitRequestcommitRequest=newOffsetCommitRequest(

MY_GROUP,

offsets,

correlationId++,

MY_CLIENTID,

(short)1/* version */);// version 1 and above commit to Kafka, version 0 commits to ZooKeeper

try{

channel.send(commitRequest.underlying());

OffsetCommitResponsecommitResponse=OffsetCommitResponse.readFrom(channel.receive().buffer());

if(commitResponse.hasError()){

for(partitionErrorCode:commitResponse.errors().values()){

if(partitionErrorCode==ErrorMapping.OffsetMetadataTooLargeCode()){

// You must reduce the size of the metadata if you wish to retry

}elseif(partitionErrorCode==ErrorMapping.NotCoordinatorForConsumerCode()||partitionErrorCode==ErrorMapping.ConsumerCoordinatorNotAvailableCode()){

channel.disconnect();

// Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager

}else{

// log and retry the commit

}

}

}

}

catch(IOExceptionioe){

channel.disconnect();

// Go to step 1 and then retry the commit

}

// How to fetch offsets

Listpartitions=newArrayList();

partitions.add(testPartition0);

OffsetFetchRequestfetchRequest=newOffsetFetchRequest(

MY_GROUP,

partitions,

(short)1/* version */,// version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper

correlationId,

MY_CLIENTID);

try{

channel.send(fetchRequest.underlying());

OffsetFetchResponsefetchResponse=OffsetFetchResponse.readFrom(channel.receive().buffer());

OffsetMetadataAndErrorresult=fetchResponse.offsets().get(testPartition0);

shortoffsetFetchErrorCode=result.error();

if(offsetFetchErrorCode==ErrorMapping.NotCoordinatorForConsumerCode()){

channel.disconnect();

// Go to step 1 and retry the offset fetch

}elseif(errorCode==ErrorMapping.OffsetsLoadInProgress()){

// retry the offset fetch (after backoff)

}else{

longretrievedOffset=result.offset();

StringretrievedMetadata=result.metadata();

}

}

catch(IOExceptione){

channel.disconnect();

// Go to step 1 and then retry offset fetch after backoff

}

####

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐