kafka基本原理及C/C++ API 实现
Kafka 消息队列
·
引入消息队列的作用:
- 解耦,不同服务组件通过消息队列实现接口
- 缓冲,利于消费者和生产者速度匹配,也利于流量削峰
- 异步,消息队列可以实现将
Partition:为了实现扩展性,提⾼并发能⼒,⼀个⾮常⼤的 Topic 可以分布到多个 Broker(即服务器)上,⼀个 Topic 可以分为多个 Partition,同⼀个topic在不同的分区的数据是不重复的,每个 Partition 是⼀个有序的队列,其表现形式就是⼀个⼀个的⽂件夹。
Replication:每⼀个分区都有多个副本,副本的作⽤是做备胎。当主分区(Leader)故障的时候会选择⼀个备胎(Follower)上位,成为Leader。在kafka中默认副本的最⼤数量是10个,且副本的数量不能⼤于Broker的数量,follower和leader绝对是在不同的机器,同⼀机器对同⼀个分区也只可能存放⼀个副本(包括⾃⼰)
Leader:每个分区多个副本的“主”副本,⽣产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。
Follower:每个分区多个副本的“从”副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发⽣故障时,某个 Follower 还会成为新的 Leader。
以下,展示kafka中,一个主题TopicA中,三个patition在不同的broker中的分布,以及生产者和消费者是如何分配的。
C/C++接口实现
libkafka C接口实现:
kafkaConsumer.h
#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H
#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>
#include "rdkafka.h"
using namespace std;
class KafkaConsumer
{
public:/**
* @brief KafkaConsumer
* @param brokers
* @param groupID
* @param topics
* @param partition
*/
KafkaConsumer();
virtual ~KafkaConsumer();
int InitCfg(const string& strBrokers, const string& strGroupID, const vector<string>& vTopics);
void pullMessage();
static void RebalanceCb(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque);
static void log_cb(const rd_kafka_t *rk, int level,
const char *fac, const char *buf);
static void EventErrorCb(rd_kafka_t *rk, int err,
const char *reason,
void *opaque);
static int EventStatsCb(rd_kafka_t *rk,
char *json,
size_t json_len,
void *opaque);
protected:
string m_strBrokers;
string m_groupID;
vector<string> m_topicVector;
int m_partition;
rd_kafka_t *m_pkafka;
};
#endif // KAFKACONSUMER_H
kafkaConsumer.c
#include "KafkaConsumer.h"
KafkaConsumer::KafkaConsumer()
: m_pkafka(nullptr)
{
}
int KafkaConsumer::InitCfg(const string& strBrokers, const string& strGroupID, const vector<string>& vTopics)
{
m_strBrokers = strBrokers;
m_groupID = strGroupID;
m_topicVector = vTopics;
rd_kafka_conf_t *pConf = rd_kafka_conf_new();
if(!pConf)
{
return -1;
}
char szErr[512] = { 0};
if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", szErr);
rd_kafka_conf_destroy(pConf);
return -1;
}
if (rd_kafka_conf_set(pConf, "group.id", m_groupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", szErr);
rd_kafka_conf_destroy(pConf);
return -1;
}
if (rd_kafka_conf_set(pConf, "statistics.interval.ms", "10000", szErr, sizeof(szErr))!= RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", szErr);
rd_kafka_conf_destroy(pConf);
return -1;
}
// rd_kafka_conf_set_stats_cb(pConf, &KafkaConsumer::EventStatsCb);
//
// rd_kafka_conf_set_error_cb(pConf, &KafkaConsumer::EventErrorCb);
// rd_kafka_conf_set_log_cb(pConf, &KafkaConsumer::log_cb);
// rd_kafka_conf_set_rebalance_cb(pConf, &KafkaConsumer::RebalanceCb);
// topic配置
rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "earliest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", szErr);
rd_kafka_topic_conf_destroy(pTopicConf);
return -1;
}
// rd_kafka_conf_set_default_topic_conf(pConf, pTopicConf);
m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
if (!m_pkafka) {
fprintf(stderr, "%% Failed to create new consumer: %s\n", szErr);
return -1;
}
rd_kafka_poll_set_consumer(m_pkafka);
/* Convert the list of topics to a format suitable for librdkafka */
rd_kafka_topic_partition_list_t *subscription = rd_kafka_topic_partition_list_new(m_topicVector.size());
for (int i = 0 ; i < m_topicVector.size() ; i++)
{
rd_kafka_topic_partition_list_add(subscription, m_topicVector[i].c_str(), RD_KAFKA_PARTITION_UA);
}
/* Subscribe to the list of topics */
rd_kafka_resp_err_t err = rd_kafka_subscribe(m_pkafka, subscription);
if (err) {
fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n", subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_destroy(m_pkafka);
return -1;
}
fprintf(stderr, "%% Subscribed to %d topic(s), " "waiting for rebalance and messages...\n", subscription->cnt);
rd_kafka_topic_partition_list_destroy(subscription);
return 0;
}
void KafkaConsumer::pullMessage()
{
rd_kafka_message_t *rkm;
rkm = rd_kafka_consumer_poll(m_pkafka, 2000);
if (!rkm)
{
return ; /* Timeout: no message within 100ms,
* try again. This short timeout allows
* checking for `run` at frequent intervals.
*/
}
/* consumer_poll() will return either a proper message
* or a consumer error (rkm->err is set). */
if (rkm->err) {
/* Consumer errors are generally to be considered
* informational as the consumer will automatically
* try to recover from all types of errors. */
fprintf(stderr,
"%% Consumer error: %s\n",
rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
return;
}
/* Print the message value/payload. */
if (rkm->payload)
{
printf(" Value: %.*s\n", (int)rkm->len, (const char *)rkm->payload);
}
rd_kafka_message_destroy(rkm);
}
void KafkaConsumer::RebalanceCb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
{
if(partitions)
{
for (int i=0; i<partitions->cnt; i++)
{
partitions->elems[i].offset = 0;
}
}
rd_kafka_assign(rk, partitions);
}
#include <iostream>
#include <stdio.h>
void KafkaConsumer::log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
cout<< "< EventErrorCb >" << level << " " << fac << " " << buf <<endl;
}
void KafkaConsumer::EventErrorCb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
cout<< "< EventErrorCb >" << err << " " << reason <<endl;
}
int KafkaConsumer::EventStatsCb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
cout<< "< EventStatsCb >" << json <<endl;
return 0;
}
KafkaConsumer::~KafkaConsumer()
{
rd_kafka_consumer_close(m_pkafka);
/* Destroy the consumer */
rd_kafka_destroy(m_pkafka);
m_pkafka = nullptr;
}
libkafkacpp C++消费者接口实现:
#include "KafkaConsumer.h"
KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
const std::vector<std::string>& topics, int partition)
{
m_brokers = brokers;
m_groupID = groupID;
m_topicVector = topics;
m_partition = partition;
std::string errorStr;
RdKafka::Conf::ConfResult errorCode;
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("enable.partition.eof", "false", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("group.id", m_groupID, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
// partition.assignment.strategy range,roundrobin
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// 获取最新的消息数据
errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Topic Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
if(m_consumer == NULL)
{
std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
}
std::cout << "Created consumer " << m_consumer->name() << std::endl;
}
void msg_consume(RdKafka::Message* msg, void* opaque)
{
switch (msg->err())
{
case RdKafka::ERR__TIMED_OUT:
std::cerr << "Consumer error: " << msg->errstr() << std::endl;
break;
case RdKafka::ERR_NO_ERROR: // 有消息进来
std::cout << " Message in " << msg->topic_name() << " ["
<< msg->partition() << "] at offset " << msg->offset()
<< " key: " << msg->key() << " payload: "
<< (char*)msg->payload() << std::endl;
break;
default:
std::cerr << "Consumer error: " << msg->errstr() << std::endl;
break;
}
}
void KafkaConsumer::pullMessage()
{
// 订阅Topic
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
}
// 消费消息
while(true)
{
RdKafka::Message *msg = m_consumer->consume(1000);
msg_consume(msg, NULL);
delete msg;
}
}
KafkaConsumer::~KafkaConsumer()
{
m_consumer->close();
delete m_config;
delete m_topicConfig;
delete m_consumer;
delete m_event_cb;
delete m_rebalance_cb;
}
libkafkacpp C++生产者接口实现:
#include "KafkaProducer.h"
KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition)
{
m_brokers = brokers;
m_topicStr = topic;
m_partition = partition;
// 创建Kafka Conf对象
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config == NULL)
{
std::cout << "Create RdKafka Conf failed." << std::endl;
}
// 创建Topic Conf对象
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(m_topicConfig == NULL)
{
std::cout << "Create RdKafka Topic Conf failed." << std::endl;
}
// 设置Broker属性
RdKafka::Conf::ConfResult errCode;
m_dr_cb = new ProducerDeliveryReportCb;
std::string errorStr;
errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
m_event_cb = new ProducerEventCb;
errCode = m_config->set("event_cb", m_event_cb, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
m_partitioner_cb = new HashPartitionerCb;
errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
errCode = m_config->set("message.max.bytes", "10240000", errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
// request.required.acks 0=Broker does not send any response/ack to client,
// -1 or all=Broker will block until message is committed by all in sync replicas (ISRs)
// 创建Producer
m_producer = RdKafka::Producer::create(m_config, errorStr);
if(m_producer == NULL)
{
std::cout << "Create Producer failed:" << errorStr << std::endl;
}
// 创建Topic对象
m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
if(m_topic == NULL)
{
std::cout << "Create Topic failed:" << errorStr << std::endl;
}
}
void KafkaProducer::pushMessage(const std::string& str, const std::string& key)
{
int32_t len = str.length();
void* payload = const_cast<void*>(static_cast<const void*>(str.data()));
RdKafka::ErrorCode errorCode = m_producer->produce(m_topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
payload, len, &key, NULL);
m_producer->poll(0);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cerr << "Produce failed: " << RdKafka::err2str(errorCode) << std::endl;
if(errorCode == RdKafka::ERR__QUEUE_FULL)
{
m_producer->poll(100);
}
}
}
KafkaProducer::~KafkaProducer()
{
while (m_producer->outq_len() > 0)
{
std::cerr << "Waiting for " << m_producer->outq_len() << std::endl;
m_producer->flush(5000);
}
delete m_config;
delete m_topicConfig;
delete m_topic;
delete m_producer;
delete m_dr_cb;
delete m_event_cb;
delete m_partitioner_cb;
}
更多推荐
已为社区贡献1条内容
所有评论(0)