1. librdkafka简介:

librdkafka 是 Apache Kafka 的 C/C++ 开发包,提供 生产者、消费者 和 管理客户端。

设计理念是可靠以及高性能的消息传输,当前可支持每秒超过100万的消息生产和300万每秒的消息消费。

官方README 文档对librdkafka的介绍:
“librdkafka — the Apache Kafka c/C++ client library”

librdkafka/INTRODUCTION.md
https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md

librdkafka/examples/
https://github.com/edenhill/librdkafka/tree/master/examples

Usage:
使用时,需要在源程序中包含包含 "rdkafka.h" 头文件


2. librdkafka的C++接口:

2.1 RdKafka::Conf::create():

创建Conf配置实例,用于填充用户指定的各配置项:

//namespace RdKafka;

//brief Create configuration object:
//RdKafka::Conf ---> 配置接口类,用来设置对生产者、消费者、broker的各配置项的值
static Conf *create(ConfType type);	

enum ConfType {
	CONF_GLOBAL,	//Global configuration
	CONF_TOPIC		//Topic specific configuration
};

使用举例:

RdKafka::Conf *m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config == nullptr) {
	std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
}

RdKafka::Conf *m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(m_config == nullptr) {
	std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
}

2.2 Conf::ConfResult set():

Conf类中的多个set成员函数,用于对不同的配置项进行赋值:

class Conf {
public:
	virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
	virtual Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
	virtual Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
	virtual Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
	//...
	//...
};

enum ConfResult {
	CONF_UNKNOWN = -2,	//Unknown configuration property
	CONF_INVALID = -1,	//Invalid configuration value
	CONF_OK = 0			//Configuration property was succesfully set
};

使用举例:

RdKafka::Conf::ConfResult  	result;
std::string					error_str;

RdKafka::Conf *m_config;
//设置 "booststrap::servers" 配置项:
result = m_config->set("bootstrap.servers", "127.0.0.`:9092", error_str);
if(result != RdKafka::Conf::CONF_OK) {
    std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl;
}

//设置 "event_cb" 配置项:
RdKafka::EventCb* m_event_cb = new ProducerEventCb;
result = m_config->set("event_cb", m_event_cb, error_str);
if(result != RdKafka::Conf::CONF_OK) {
    std::cout << "Global Conf set 'event_cb' failed: " << error_str << std::endl;
}

2.3 RdKafka::Producer::create():

创建Producer生产者客户端:

class Producer : public virtual Handle {
public:
	static Producer *create(Conf *conf, std::string &errstr);
};

使用举例:

RdKfka::Producer *m_producer;

m_producer = RdKafka::Producer::create(m_config, error_str);
if(m_producer == nullptr) {
    std::cout << "Create Topic failed: " << error_str << std::endl;
}

2.4 RdKafka::Topic::create():

创建Topic主题对象:

class Topic {
public:
	static Topic *create(Handle *base, const std::string &tipic_str, const Conf *conf, std::string &errstr);
};

使用举例:

RdKafka::Topic *m_topic;

m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, error_str);
if(m_topic == nullptr) {
    std::cout << "Create Topic failed: " << error_str << std::endl;
}

2.5 RdKafka::Producer::produce():

class Producer : public virtual Handle {
public:
	virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags, 
						void *payload, size_t len, const std::string *key, void *msg_opaque);

	virtual ErrorCode produce();
};


//Use RdKafka::err2str() to translate an error code a human readable string
enum ErrorCode {
	//Internal errors to rdkafka:
	ERR_BEGIN = -200,		//Begin internal error codes
	ERR_BAD_MSG = -199,		//Received message is incorrect
	//...
	ERR_END = -100,			//End interval error codes

	//Kafka broker errors: 
	ERROR_UNKNOWN = -1,		//Unknown broker error
	ERROR_NO_ERROR = 0,		//Success
	//...
};

使用举例:


RdKafka::ErrorCode error_code = m_producer->produce(m_topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, 
										payload, len, key, NULL);

m_producer->pool(0);	//poll()参数为0意味着不阻塞,poll(0)主要是为了触发应用程序提供的回调函数

if(error_code != ERROR_NO_ERROR) {
	std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl;
	if(error_code == ERR_QUEUE_FULL) {
		m_producer->poll(1000);		//如果发送失败的原始是队列正满,则阻塞等待一段时间
	}
	else if(error_code == ERR_MSG_SIZE_TOO_LARGE) {
		//如果消息过大超过了max_size,则需要对消息做裁剪后重新发送
	}
	else {
		std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl;
	}
}

2.6 RdKafka::KafkaConsumer::create():

创建Consumer消费者客户端:

class KafkaConsumer : public virtual Handle {
public:
	static KafkaConsumer *create(const Conf *conf, std::string &errstr);
};

使用举例:

RdKafka::KafkaConsumer *m_consumer;

m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
if(m_consumer == nullptr) {
    std::cout << "Create KafkaConsumer failed: " << error_str << std::endl;
}

2.7 RdKafka::KafkaConsumer::subscribe():

Consumer消费者订阅Topic主题:

class KafkaConsumer : public virtual Handle {
public:
	virtual ErrorCode subscribe(const std::vector<std::string> &topics);
};

使用举例:

std::vector<std::string> topics;
topics.push_back(topic_str);

RdKafka::ErrorCode error_code = m_consumer->subscribe(topics);
if(error_code != ERROR_NO_ERROR) {
	std::cerr << "Consumer Subscribe Topics Failed: " << RdKafka::err2str(error_code) << std::endl;
}

2.8 RdKafka::KafkaConsumer::consume():

Consumer消费者拉取消息进行消费:

class KafkaConsumer : public virtual Handle {
public
	virtual Message *consume(int timeout_ms);
};

使用举例:

RdKafka::Message *m_message = m_consumer->consume(5000);	//若超过 5000ms 未订阅到消息,则触发 RdKafka::ERR_TIMED_OUT

2. 使用librdkafka的C++接口实现生产者客户端:

2.1 main_producer.cpp


#include "producer_kafka.h"

using namespace std;


int main() {
    KafkaProducer producer("127.0.0.1:9092", "topic-demo", 0);

    sleep(5);

    for(int i = 0; i < 10; i++) {
        char msg[64] = {0};
        sprintf(msg, "%s%4d", "Hello Kafka ", i);   //msg = "Hello Kafka 0001";

        char key[8] = {0};
        sprintf(key, "%d", i);  //key = "1";

        producer.pushMessage(msg, key);
    }

    KafkaProducer::wait_destroyed(50000);

    return 0;
}

2.2 kafka_producer.h

#ifndef __KAFKAPRODUCER_H_
#define __KAFKAPRODUCER_H_

#include <string>
#include <iostream>
#include "rdkafkacpp.h"


class KafkaProducer {
public:
    explicit KafkaProducer(const std::string& brokers, const std::string& topic, int partition);       //epplicit:禁止隐式转换,例如不能通过string的构造函数转换出一个broker
    ~KafkaProducer();                                                                                                       

    void pushMessage(const std::string& msg, const std::string& key);



protected:
    std::string     m_brokers;
    std::string     m_topicStr;
    int             m_partition;

    RdKafka::Conf*      m_config;           //RdKafka::Conf --- 配置接口类,用来设置对 生产者、消费者、broker的各项配置值
    RdKafka::Conf*      m_topicConfig;

    RdKafka::Producer*  m_producer;
    RdKafka::Topic*     m_topic;
    
    RdKafka::DeliveryReportCb*      m_dr_cb;            //RdKafka::DeliveryReportCb 用于在调用 RdKafka::Producer::produce() 后返回发送结果,RdKafka::DeliveryReportCb是一个类,需要自行填充其中的回调函数及处理返回结果的方式
    RdKafka::EventCb*               m_event_cb;         //RdKafka::EventCb 用于从librdkafka向应用程序传递errors,statistics,logs 等信息的通用接口
    RdKafka::PartitionCb*           m_partitioner_cb;   //Rdkafka::PartitionCb 用于设定自定义分区器


};




class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message &message) {  //重载基类RdKafka::DeliveryReportCb中的虚函数dr_cb()
        if(message.err() != 0) {       //发送出错
            std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
        } else {                             //发送成功
            std::cerr << "Message delivered to topic: " << message.topic_name() 
                      << " [" << message.partition() 
                      << "] at offset " << message.offset() << std::endl;
        }
    }
};
/*
class DeliveryReportCb {                        //Kafka将会在produce()之后返回发送结果时调用 DeliveryReportCb::dr_cb(), 并将结果填充到message中
public:
    virtual void dr_cb(Message &message) = 0;   //Message::err() 中保存返回结果
    virtual ~DeliveryReportCb() {}
};

返回的发送结果 Message 中包含的信息:
class Message {
public:
    virtual std::string     errstr() const = 0;     //返回错误原因
    virtual ErrorCode       err() const = 0;        //返回错误码,ErrorCode是enum枚举类型
    virtual Topic           *topic() const = 0;     //返回生产的消息所发送到的主题,返回值类型为 RdKafka::Topic 类对象
    virtual std::string     topic_name() const = 0; //返回生产的消息所发送到的主题名
    virtual int32_t         partition() const = 0;  //返回生产的消息所发送到的分区号

    virtual void            *payload() const = 0;   //消息内容
    virtual size_t          len() const = 0;        //消息长度
    virtual const std::string *key() const = 0;     //消息的key
    virtual int64_t         offset() const = 0;     //消息偏移量
    //...
};

enum ErrorCode {
    ERR_BEGIN = -200,
    //...
    ERR_UNKNOWN = -1,
    ERR_NO_ERROR = 0,   //SUCCESS
    ERR_OFFSET_OUT_OF_RANGE = 1,
    //...
};
*/


class ProducerEventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) {
        switch(event.type()) {
            case RdKafka::EVENT::EVENT_ERROR:
                std::cout << "RdKafka::EVENT::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
                break;
            case RdKafka::EVENT::EVENT_STATS:
                std::cout << "RdKafka::EVENT::EVENT_STATS: " << events.str() << std::endl;
                break;
            case RdKafka::EVENT::EVENT_LOG:
                std::cout << "RdKafka::EVENT::EVENT_LOG: " << events.fac() << std::endl;
                break;
            case RdKafka::EVENT::EVENT_THROTTLE:
                std::cout << "RdKafka::EVENT::EVENT_THROTTLE: " << event.broker_name() << std::endl;
                break;
        }

    }
}
/*
EventCb 是用于从librdkafka向应用程序返回errors,statistics, logs等信息的通用接口:

class EventCb {
public:
    virtual void event_cb(Event &event) = 0;        //提供给应用程序去自定义重载的event_cb函数
    virtual ~EventCb(){ }
};

class Event {
public:
    enum Type {
        EVENT_ERROR,
        EVNET_STATS,
        EVENT_LOG,
        EVENT_THROTTLE      //Event is a throttle level signaling from the broker, 紧急事件?
    };

    enum Severity {         //表示Event事件的严重级别
        EVENT_SEVERITY_EMERGE = 0,
        EVENT_SEVERITY_ALERT = 1,
        EVENT_SEVERITY_CRITICAL = 2,
        EVENT_SEVERITY_ERROR = 3,
        EVENT_SEVERITY_WARNING = 4,
        EVENT_SEVERITY_NOTICE = 5,
        EVENT_SEVERITY_INFO = 6,
        EVENT_SEVERITY_DEBUG = 7,
    };

    virtual Type            type() const = 0;       //返回Event事件类型
    virtual ErrorCode       err() const = 0;        //错误码
    virtual Severity        severity() const = 0;   //return log serverity level, 返回log的严重级别
    virtual std::string     fac() const = 0;

    virtual std::string     broker_name() const = 0;
    virtual std::string     broker_id() const = 0;

	virtual vool 			fatal() const = 0;		//bool值,返回这是否是一个fatal级错误
    //...
};
*/


class HashPartitionerCb : public RdKafka::PartitionerCb {       //自定义生产者分区器,作用就是返回一个分区id。  对key计算Hash值,得到待发送的分区号(其实这跟默认的分区器计算方式是一样的)
public:
    int32_t partitioner_cb( const Topic *topic, const std::string *key, 
                            int32_t partition_cnt, void *msg_opaque)    
    {
        char msg[128] = {0};
        sprintf(smg, "HashPartitionCb:[%s][%s][%d]", topic->name().c_str(), key->c_str(), partition_cnt);
        std::cout << msg << std::endl; 

        //前面的操作只是为了在分区器回调中打印出一行打印,分区器真正的操作是在下面generate_hash,生成一个待发送的分区ID
        return generate_hash(key->c_str(), key->size()) % partition_cnt;    
    }
private:
    static inline unsigned int generate_hash(const char *str, size_t len) {
        unsigned int hash = 5381;
        for (size_t i = 0; i < len; i++) {
            hash = ( (hash << 5) + hash ) + str[i];
        }
        return hash;    //返回值必须在 0 到 partition_cnt 之间。如果出错则发回 PARTITION_UA(-1)
    }
};
/*
class PartitionerCb {
public:
    virtual int32_t partitioner_cb( const Topic *topic, 
                                    const std::string *key, 
                                    int32_t partition_cnt, 
                                    void *msg_opaque) = 0;      //自定义分区器的接口,需要指定发送的主题、key、分区数、opaque
    virtual ~PartitionerCb() { }
};

注意:
用户实现的派生类重载partitioner_cb() 这个函数后,也是要提供给Kafka去调用的,其中参数 partition_cnt 并非由Producer指定,而是Kafka根据Topic创建时的信息去查询,
且Kafka上的Topic创建也不是由Producer生产者客户端创建的,目前已知的方法只有使用 kafka-topics.sh 脚本这一种方法。

关于“创建主题”的描述:
-- 如果broker端配置参数 auto.create.topics.enable 设置为true(默认值为true),
那 么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为1)、副本因为为 default.replication.factor(默认值为1)的主题。
-- 除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数 num.partitions 和 default.replication.factor的值创建一个相应主题。
*/

#endif


/*
总结:
1. KafkaProducer构造函数:初始化一个“生产者客户端”,要指定三个参数:(1)生产者所要连往的Kafka地址,是zookeeper的ip和port;(2)生产者后续生产的消息所要发送的Topic主题;(3)消息所要发往的分区?

2. RdKafka::Conf 是配置接口类,用来设置生产者、消费者、broker的各项配置值:
namespace RdKafka {
    RdKafka::Conf {
        enum ConfType {
            CONF_GLOBAL,    //Global Configuration , 全局级别的配置 (broker级,所有主题都一样)
            CONF_TOPIC      //Topic specfic configuration , Topic主题级别的专用配置(每个主题都不一样)
        };

        enum ConfResult {           //用于保存 RdKafka::Conf::set() 函数的返回值
            CONF_UNKNOWN = -2,      //Unknown configuration property, set() 传入的配置项不合法(Kafka中没有定义对应的配置项)
            CONF_INVALID = -1,      //Invalid configuration value, set() 传入的配置项的值不正确
            CONF_OK = 0             //Configuration property was successfully set, set() 设置成功
        };

        static Conf *create(ConfType type);     //创建一个Conf实例,传入ConfType参数,返回Conf*实例的指针,是static静态函数

        virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);   //配置name指定的配置项的值
                //eg: RdKafka::Conf::ConfResult errCode = m_config->set("dr_cb", m_dr_cb, errorstr);

        
    };
};



Q: 为什么要创建 RdKafka::Conf(GLOBAL/TOPIC)、RdKafka::Producer、RdKafka::Topic ?
A: 这就跟socket() 函数一样,创建一个sockfd句柄,是为了通知内核创建相关的实例,sockfd是与内核中的实例相关联的句柄,内核需要维护不同的实例来区分不同的连接。
同理,Kafka服务器也需要区分不同的生产者,需要区分不同的生产者的差异化配置。因此需要应用程序创建一个Producer客户端实例(m_producer句柄),将自己想要的配置填充好(m_conf),
再将m_producer句柄和m_conf配置属性都传递给Kafka服务端,Kafka在内部创建并维护这个生产者实例,当生产者往Kafka的broker上推送消息时,Kafka就可以根据预先配置好的属性对其进行相应的处理。

*/

2.3 producer_kafka.cpp

#include "producer_kafka.h"


//("192.168.0.105:9092", "topic_demo", 0)
KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition) {
    m_brokers   = brokers;
    m_topicStr  = topic;
    m_partition = partition;

    //先填充构造生产者客户端的参数配置:
    m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if(m_config == nullptr) {
        std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
    }

    m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if(m_topicConfig == nullptr) {
        std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
    }

    //下面开始配置各种需要的配置项:
    RdKafka::Conf::ConfResult   result;
    std::string                 error_str;
    
    result = m_config->set("booststrap.servers", m_brokers, error_str); //设置生产者待发送服务器的地址: "ip:port" 格式
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl;
    }

    result = m_config->set("statistics.interval.ms", "10000", error_str);
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set ‘statistics.interval.ms’ failed: " << error_str << std::endl;
    }

    result = m_config->set("message.max.bytes", "10240000", error_str);     //设置发送端发送的最大字节数,如果发送的消息过大则返回失败
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set 'message.max.bytes' failed: " << error_str << std::endl;
    }


    m_dr_cb = new ProducerDeliveryReportCb;
    result = m_config->set("dr_cb", m_dr_cb, error_str);    //设置每个消息发送后的发送结果回调
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set ‘dr_cb’ failed: " << error_str << std::endl;
    }

    m_event_cb = new ProducerEventCb;
    result = m_config->set("event_cb", m_event_cb, error_str);
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set ‘event_cb’ failed: " << error_str << std::endl;
    }

    m_partitioner_cb = new HashPartitionerCb;
    result = m_topicConfig->set("partitioner_cb", m_partitioner_cb, error_str);     //设置自定义分区器
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Topic Conf set ‘partitioner_cb’ failed: " << error_str << std::endl;
    }

    //创建Producer生产者客户端:
    m_producer = RdKafka::Producer::create(m_config, error_str);    //RdKafka::Producer::create(const RdKafka::Conf *conf, std::string &errstr);
    if(m_producer == nullptr) {
        std::cout << "Create Producer failed: " << error_str << std::endl;
    }

    //创建Topic对象,后续produce发送消息时需要使用
    m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, error_str); //RdKafka::Topic::create(Hanle *base, const std::string &topic_str, const Conf *conf, std::string &errstr);
    if(m_topic == nullptr) {
        std::cout << "Create Topic failed: " << error_str << std::endl;
    }
}

/*
另外几个关键的参数:
partition.assignment.strategy   : range,roundrobin  消费者客户端partition分配策略,当被选举为leader时,分配partition分区给组员消费者的策略
*/


void KafkaProducer::pushMessage(const std::string& msg, const std::string& key) {
    int32_t len = str.length();
    void *payload = const_cast<void*>(static_cast<const void*>(str.data()));

    RdKafka::ErrorCode error_code = m_producer->prodce( m_topic, RdKafka::Topic::PARTITION_UA,
                                                        RdKafka::Producer::RK_MSG_COPY,
                                                        payload, len, key, NULL);
    m_producer->poll(0);        //poll()参数为0意味着不阻塞;poll(0)主要是为了触发应用程序提供的回调函数
    if(error_code != ERR_NO_ERROR) {
        std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl;
        if(error_code == ERR_QUEUE_FULL) {
            m_producer->poll(1000);     //如果发送失败的原因是队列正满,则阻塞等待一段时间
        } else if(error_code == ERR_MSG_SIZE_TOO_LARGE) {
            //如果发送消息过大,超过了max.size,则需要裁减后重新发送
        } else {
            std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl;
        }
    }
}
/*
RdKafka::ErrorCode RdKafka::Produce::produce(   RdKafka::Topic *topic, int32_t partition,   //生产消息发往的主题、分区
                                                int msgflags,                               //RK_MSG_COPY(拷贝payload)、 RK_MSG_FREE(发送后释放payload内容)、 RK_MSG_BLOCK(在消息队列满时阻塞produce函数)
                                                void *payload, size_t len,                  //消息净荷、长度
                                                const string &key, void *msg_opaque);       //key; opaque是可选的应用程序提供给每条消息的opaque指针,opaque指针会在dr_cb回调函数内提供,在Kafka内部维护
返回值:
    ERR_NO_ERROR            :   消息成功入队
    ERR_QUEUE_FULL          :   队列满
    ERR_MSG_SIZE_TOO_LARGE  :   消息长度过大
    ERR_UNKNOWN_PARTITION   :   所指定的分区不存在
    ERR_UNKNOWN_TOPIC       :   所指定的主题不存在


int RdKafka::Producer::poll (int timeout_ms);
阻塞等待生产消息发送完成,
poll另外的重要作用是:
(1)轮询处理指定的Kafka句柄的Event(m_producer上的Event事件,在本例中处理事件的方式是进行打印);
(2)触发应用程序提供的回调函数调用,例如 ProducerDeliveryReportCb 等回调函数都需要poll()进行触发。
*/


KafkaProducer::~KafkaProducer() {
    while(m_producer->outq_len() > 0) {         //当 Handle->outq_len() 客户端的“出队列” 的长度大于0
        std::cerr << "Waiting for: " << m_producer->outq_len() << std::endl;
        m_producer->flush(5000);
    }

    delete m_config;
    delete m_topicConfig;
    delete m_topic;
    delete m_producer;
    delete m_dr_cb;
    delete m_event_cb;
    delete m_partitioner_cb;
}

/*
-- flush():
ErrorCode Kafka::Producer::flush(int timeout_ms);
flush会优先调用poll(),去触发生产者提前注册的各种回调函数,然后等待生产者上的所有消息全部发送完毕。


-- outq_len: “出队列”长度,是 Handle 类的一个成员,
表示 生产者队列中中待发送到broker上的数据,或 消费者队列中待发送到broker上的ACK。

Handle是Producer和Consumer类的基类,表示“客户端的句柄”:
    class Producer : public virtual Handle { }
    class Consumer : public virtual Handle { }
    class KafkaConsumer : public virtual Handle { }
*/

3. 使用librdkafka的C++接口实现实现消费者客户端:

3.1 main_consumer.cpp

#include "kafka_consumer.h"

int main()
{
    std::string brokers = "127.0.0.1:9092";
    
    std::vector<std::string> topics;		//待消费主题的集合
    topics.push_back("topic-demo");

    std::string group = "consumer-group-demo";	//消费组
    
    KafkaConsumer consumer(brokers, group, topics, RdKafka::Topic::OFFSET_BEGINNING);
    
    consumer.pullMessage();

    RdKafka::wait_destroyed(5000);
    
    return 0;
}

/*
在生产者/消费者 客户端 连接 broker 时,填充“bootstrap.server” 参数:
	Q: 为什么只设置了一个broker的地址(port = 9092),如果Kafka集群中有多个broker,且生产者/消费者订阅的Topic横跨多个broker时,生产者是如何知道其他broker的?
	A: bootstrap.server 参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为:
	host1:port1, host2:port2
	可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为 " "。
	【注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker信息。】
	不过建议至少要设置两个以上的broker地址信息,当其中任意一个宕机时,生产者仍然可以连接到Kafka集群上。
*/

3.2 kafka_consumer.h

#ifndef __KAFKACONSUMER_H_
#define __KAFKACONSUMER_H_

#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>
#include "rdkafkacpp.h"

class KafkaConsumer {
public:
    explicit KafkaConsumer(const std::string& brokers, const std::string& groupID,
                           const std::vector<std::string>& topics, int partition);
    ~KafkaConsumer();
    
    void pullMessage();
    
protected:
    std::string 				m_brokers;
    std::string 				m_groupId;
    std::vector<std::string> 	m_topicVector;		//一个消费者可以同时订阅多个主题,所有用vector
    int 						m_partition;
    
    RdKafka::Conf* 				m_config;			//GLOBAL 级别的配置(Consumer客户端级别)
    RdKafka::Conf* 				m_topicConfig;		//TOPIC	级别的配置
    
    RdKafka::KafkaConsumer* 	m_consumer;			//消费者客户端实例
    
    RdKafka::EventCb* 			m_event_cb;			//Event事件回调
    RdKafka::RebalanceCb* 		m_rebalance_cb;		//再均衡 回调
};


class ConsumerEventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event& event) {
        switch (event.type())
        {
	        case RdKafka::Event::EVENT_ERROR:
	            if (event.fatal()) 				//判断是否为FATAL错误
	                std::cerr << "FATAL ";
	            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl;
	            break;
	        case RdKafka::Event::EVENT_STATS:
	            std::cerr << "\"STATS\": " << event.str() << std::endl;
	            break;
	        case RdKafka::Event::EVENT_LOG:
	            fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str());
	            break;
	        case RdKafka::Event::EVENT_THROTTLE:
	            std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " << event.broker_name() << " id " << (int)event.broker_id() << std::endl;
	            break;
	        default:
	            std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " <<  event.str() << std::endl;
	            break;
        }
    }
};

class ConsumerRebalanceCb : public RdKafka::RebalanceCb {
public:
    void rebalance_cb( RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
                       std::vector<RdKafka::TopicPartition*> &partitions)		//Kafka服务端通过 err参数传入再均衡的具体事件(发生前、发生后),通过partitions参数传入再均衡 前/后,旧的/新的 分区信息
    {
        std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": " << printTopicPartition(partitions);
        
        if(err == RdKafka::ERR__ASSIGN_PARTITIONS) {		//ERR__ASSIGN_PARTITIONS: 表示“再均衡发生之后,消费者开始消费之前”,此时消费者客户端可以从broker上重新加载offset
            consumer->assign(partitions);					//再均衡后,重新 assign() 订阅这些分区
            partition_count = (int)partitions.size();
        } else if(err == RdKafka::ERR__REVOKE_PARTITIONS) {		//ERR__REVOKE_PARTITIONS: 表示“消费者停止消费之后,再均衡发生之前”,此时应用程序可以在这里提交 offset
            consumer->unassign();								//再均衡前,unassign() 退订这些分区
            partition_count = 0;								//退订所有分区后,清0
        } else {
			std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
		}
    }
    
private:
    static void printTopicPartition(const std::vector<RdKafka::TopicPartition*> &partitions) {	//打印出所有的主题、分区信息
        for(unsigned int i = 0 ; i < partitions.size() ; i++) {
            std::cerr << partitions[i]->topic() << "[" << partitions[i]->partition() << "], ";
        }
        std::cerr << "\n";
    }
private:
    int partition_count;			//保存consumer消费者客户端 当前订阅的分区数
};

/*
class RdKafka::RebalanceCb {
public:
	virtual void rebalance_cb(	RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, 
								std::vector<TopicPartition*> &partitions);	
	virtual ~RebalanceCb() { }
};

注意参数 vector<TopicPartition*> &partitions; 中 元素的类型是 TopicPartiton:
	class TopicPartitionImpl {
		string 	topic;
		int 	partition_;
	};
同时包括 主题 和 分区信息,所以 consumer.assign(); 订阅分区的方式是包括不同主题的不同分区的集合。
*/

#endif

3.3 kafka_consumer.cpp

#include "kafka_consumer.h"

KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupId, 
							 const std::vector<std::string>& topics, int partition) 
{
	m_brokers		= borker;
	m_groupId		= groupId;
	m_topicVector	= topics;
	m_partition		= partition;

	//创建Conf实例:
	m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
	if(m_config == nullptr) {
		std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
	}

	m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
	if(m_topicConfig == nullptr) {
		std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
	}

	//设置Conf的各个配置参数:
	RdKafka::Conf::ConfResult   result;
    std::string                 error_str;

    result = m_config->set("bootstrap.servers", m_brokers, error_str);
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'bootstrap.servers' failed: " << error_str << std::endl;
    }

    result = m_config->set("group.id", m_groupId, error_str);		//设置消费组名:group.id(string类型)
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'group.id' failed: " << error_str << std::endl;
    }

    result = m_config->set("max.partition.fetch.bytes", "1024000", error_str);	//消费消息的最大大小
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'max.partition.fetch.bytes' failed: " << error_str << std::endl;
    }

    result = m_config->set("enable.partition.eof", "false", error_str);		//enable.partition.eof: 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件,默认值 true
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'enable.partition.eof' failed: " << error_str << std::endl;
    }


    m_event_cb = new ConsumerEventCb;
    result = m_config->set("event_cb", m_event_cb, error_str);
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'event_cb' failed: " << error_str << std::endl;
    }

    m_reblance_cb = new ConsumerRebalanceCb;
    result = m_config->set("rebalance_cb", m_reblance_cb, error_str);
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'rebalance_cb' failed: " << error_str << std::endl;
    }


    //设置 topic_conf的配置项:
    result = m_topicConfig->set("auto.offset.reset", "latest", error_str);
    if(result != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Topic Conf set 'auto.offset.reset' failed: " << error_str << std::endl;
    }

    result = m_config->set("default_topic_conf", m_topicConfig, error_str);
    if(result != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set 'default_topic_conf' failed: " << error_str << std::endl;
    }


    //创建消费者客户端:
    m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
    if(m_consumer == nullptr) {
    	std::cout << "Create KafkaConsumer failed: " << error_str << std::endl;
    }
    std::cout << "Create KafkaConsumer succeed, consumer name : " << m_consumer->name() << std::endl;
}


void RdKafkaConsumer::pullMessage() {

}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐