引入消息队列的作用:

  1. 解耦,不同服务组件通过消息队列实现接口
  2. 缓冲,利于消费者和生产者速度匹配,也利于流量削峰
  3. 异步,消息队列可以实现将

Partition:为了实现扩展性,提并发能 Topic 可以分布到多个 Broker(即服务器)上, Topic 可以分为多个 Partitiontopic在不同的分区的数据是不重复的,每个 Partition 个有序的队列,其表现形式就是个的件夹
 

Replication个分区都有多个副本,副本的作是做备胎。当主分区(Leader)故障的时候会选择个备胎(Follower)上位,成为Leader。在kafka中默认副本的最量是10个,且副本的数量不能Broker的数量,followerleader绝对是在不同的机器,同机器对同个分区也只可能存放个副本(包括⾃⼰

Leader:每个分区多个副本的副本,产者发送数据的对象,以及消费者消费数据的对象,都是 Leader


Follower:每个分区多个副本的副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 故障时,某个 Follower 还会成为新的 Leader

以下,展示kafka中,一个主题TopicA中,三个patition在不同的broker中的分布,以及生产者和消费者是如何分配的

C/C++接口实现

libkafka C接口实现:

kafkaConsumer.h

#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H

#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>

#include "rdkafka.h"

using namespace std;

class KafkaConsumer
{
public:/**
     * @brief KafkaConsumer
     * @param brokers
     * @param groupID
     * @param topics
     * @param partition
     */
    KafkaConsumer();
    virtual ~KafkaConsumer();

    int InitCfg(const string& strBrokers, const string& strGroupID, const vector<string>& vTopics);


    void pullMessage();

    static void RebalanceCb(rd_kafka_t *rk,
        rd_kafka_resp_err_t err,
        rd_kafka_topic_partition_list_t *partitions,
        void *opaque);

    static void log_cb(const rd_kafka_t *rk, int level,
        const char *fac, const char *buf);

    static void  EventErrorCb(rd_kafka_t *rk, int err,
        const char *reason,
        void *opaque);

    static int EventStatsCb(rd_kafka_t *rk,
        char *json,
        size_t json_len,
        void *opaque);

protected:
    string m_strBrokers;
    string m_groupID;
    vector<string> m_topicVector;
    int m_partition;

    rd_kafka_t *m_pkafka;
};

#endif // KAFKACONSUMER_H

kafkaConsumer.c

#include "KafkaConsumer.h"

KafkaConsumer::KafkaConsumer()
    : m_pkafka(nullptr)
{
}

int KafkaConsumer::InitCfg(const string& strBrokers, const string& strGroupID, const vector<string>& vTopics)
{
    m_strBrokers = strBrokers;
    m_groupID = strGroupID;
    m_topicVector = vTopics;

    rd_kafka_conf_t *pConf = rd_kafka_conf_new();
    if(!pConf)
    {
        return -1;
    }

    char szErr[512] = { 0};
    if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", szErr);
        rd_kafka_conf_destroy(pConf);
        return -1;
    }

    if (rd_kafka_conf_set(pConf, "group.id", m_groupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
            fprintf(stderr, "%s\n", szErr);
            rd_kafka_conf_destroy(pConf);
            return -1;
    }

    if (rd_kafka_conf_set(pConf, "statistics.interval.ms", "10000", szErr, sizeof(szErr))!= RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", szErr);
        rd_kafka_conf_destroy(pConf);
        return -1;
    }
    // rd_kafka_conf_set_stats_cb(pConf, &KafkaConsumer::EventStatsCb);
    // 
    // rd_kafka_conf_set_error_cb(pConf, &KafkaConsumer::EventErrorCb);
    // rd_kafka_conf_set_log_cb(pConf, &KafkaConsumer::log_cb);
    
    // rd_kafka_conf_set_rebalance_cb(pConf, &KafkaConsumer::RebalanceCb);

    // topic配置
    rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
    if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "earliest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
            fprintf(stderr, "%s\n", szErr);
            rd_kafka_topic_conf_destroy(pTopicConf);
            return -1;
    }

    // rd_kafka_conf_set_default_topic_conf(pConf, pTopicConf);

    m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
    if (!m_pkafka) {
        fprintf(stderr, "%% Failed to create new consumer: %s\n", szErr);
        return -1;
    }

    rd_kafka_poll_set_consumer(m_pkafka);

    /* Convert the list of topics to a format suitable for librdkafka */
    rd_kafka_topic_partition_list_t *subscription = rd_kafka_topic_partition_list_new(m_topicVector.size());
    for (int i = 0 ; i < m_topicVector.size() ; i++)
    {
        rd_kafka_topic_partition_list_add(subscription, m_topicVector[i].c_str(), RD_KAFKA_PARTITION_UA);
    }

    /* Subscribe to the list of topics */
    rd_kafka_resp_err_t err = rd_kafka_subscribe(m_pkafka, subscription);
    if (err) {
            fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n", subscription->cnt, rd_kafka_err2str(err));
            rd_kafka_topic_partition_list_destroy(subscription);
            rd_kafka_destroy(m_pkafka);
            return -1;
    }

    fprintf(stderr, "%% Subscribed to %d topic(s), " "waiting for rebalance and messages...\n", subscription->cnt);
    rd_kafka_topic_partition_list_destroy(subscription);

    return 0;
}

void KafkaConsumer::pullMessage()
{
    rd_kafka_message_t *rkm;
    rkm = rd_kafka_consumer_poll(m_pkafka, 2000);
    if (!rkm)
    {
        return ; /* Timeout: no message within 100ms,
                        *  try again. This short timeout allows
                        *  checking for `run` at frequent intervals.
                        */
    }
    /* consumer_poll() will return either a proper message
        * or a consumer error (rkm->err is set). */
    if (rkm->err) {
            /* Consumer errors are generally to be considered
                * informational as the consumer will automatically
                * try to recover from all types of errors. */
            fprintf(stderr,
                    "%% Consumer error: %s\n",
                    rd_kafka_message_errstr(rkm));
            rd_kafka_message_destroy(rkm);
            return;
    }

    /* Print the message value/payload. */
    if (rkm->payload)
    {
        printf(" Value: %.*s\n", (int)rkm->len, (const char *)rkm->payload);
    }
    rd_kafka_message_destroy(rkm);
}

void KafkaConsumer::RebalanceCb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
{
    if(partitions)
    {
        for (int i=0; i<partitions->cnt; i++)
        {
            partitions->elems[i].offset = 0;
        }
    }

    rd_kafka_assign(rk, partitions);
}

#include <iostream>
#include <stdio.h>

void KafkaConsumer::log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
    cout<< "<   EventErrorCb    >" << level << "  " << fac << " " << buf <<endl;
}

void KafkaConsumer::EventErrorCb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
    cout<< "<   EventErrorCb    >" << err << "  " << reason <<endl;
}

int KafkaConsumer::EventStatsCb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
    cout<< "<   EventStatsCb    >" << json <<endl;
    return 0;
}

KafkaConsumer::~KafkaConsumer()
{
    rd_kafka_consumer_close(m_pkafka);
    /* Destroy the consumer */
    rd_kafka_destroy(m_pkafka);
    m_pkafka = nullptr;
}

libkafkacpp C++消费者接口实现:

#include "KafkaConsumer.h"

KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
                             const std::vector<std::string>& topics, int partition)
{
    m_brokers = brokers;
    m_groupID = groupID;
    m_topicVector = topics;
    m_partition = partition;

    std::string errorStr;
    RdKafka::Conf::ConfResult errorCode;
    m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

    m_event_cb = new ConsumerEventCb;
    errorCode = m_config->set("event_cb", m_event_cb, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    m_rebalance_cb = new ConsumerRebalanceCb;
    errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    errorCode = m_config->set("enable.partition.eof", "false", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    errorCode = m_config->set("group.id", m_groupID, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    // partition.assignment.strategy  range,roundrobin

    m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    // 获取最新的消息数据
    errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Topic Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
    if(m_consumer == NULL)
    {
        std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
    }
    std::cout << "Created consumer " << m_consumer->name() << std::endl;
}

void msg_consume(RdKafka::Message* msg, void* opaque)
{
    switch (msg->err())
    {
    case RdKafka::ERR__TIMED_OUT:
        std::cerr << "Consumer error: " << msg->errstr() << std::endl;
        break;
    case RdKafka::ERR_NO_ERROR:     // 有消息进来
        std::cout << " Message in " << msg->topic_name() << " ["
                  << msg->partition() << "] at offset " << msg->offset()
                  << " key: " << msg->key() << " payload: "
                  << (char*)msg->payload() << std::endl;
        break;
    default:
        std::cerr << "Consumer error: " << msg->errstr() << std::endl;
        break;
    }
}

void KafkaConsumer::pullMessage()
{
    // 订阅Topic
    RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
    if (errorCode != RdKafka::ERR_NO_ERROR)
    {
        std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
    }
    // 消费消息
    while(true)
    {
        RdKafka::Message *msg = m_consumer->consume(1000);
        msg_consume(msg, NULL);
        delete msg;
    }
}

KafkaConsumer::~KafkaConsumer()
{
    m_consumer->close();
    delete m_config;
    delete m_topicConfig;
    delete m_consumer;
    delete m_event_cb;
    delete m_rebalance_cb;

}

libkafkacpp C++生产者接口实现:

#include "KafkaProducer.h"

KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition)
{
    m_brokers = brokers;
    m_topicStr = topic;
    m_partition = partition;
    // 创建Kafka Conf对象
    m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if(m_config == NULL)
    {
        std::cout << "Create RdKafka Conf failed." << std::endl;
    }
    // 创建Topic Conf对象
    m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if(m_topicConfig == NULL)
    {
        std::cout << "Create RdKafka Topic Conf failed." << std::endl;
    }
    // 设置Broker属性
    RdKafka::Conf::ConfResult errCode;
    m_dr_cb = new ProducerDeliveryReportCb;
    std::string errorStr;
    errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    m_event_cb = new ProducerEventCb;
    errCode = m_config->set("event_cb", m_event_cb, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }

    m_partitioner_cb = new HashPartitionerCb;
    errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }

    errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }

    errCode = m_config->set("message.max.bytes", "10240000", errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }

    // request.required.acks  0=Broker does not send any response/ack to client, 
    //                        -1 or all=Broker will block until message is committed by all in sync replicas (ISRs)

    // 创建Producer
    m_producer = RdKafka::Producer::create(m_config, errorStr);
    if(m_producer == NULL)
    {
        std::cout << "Create Producer failed:" << errorStr << std::endl;
    }
    // 创建Topic对象
    m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
    if(m_topic == NULL)
    {
        std::cout << "Create Topic failed:" << errorStr << std::endl;
    }
}

void KafkaProducer::pushMessage(const std::string& str, const std::string& key)
{
    int32_t len = str.length();
    void* payload = const_cast<void*>(static_cast<const void*>(str.data()));
    RdKafka::ErrorCode errorCode = m_producer->produce(m_topic, RdKafka::Topic::PARTITION_UA,
                                   RdKafka::Producer::RK_MSG_COPY,
                                   payload, len, &key, NULL);
    m_producer->poll(0);
    if (errorCode != RdKafka::ERR_NO_ERROR)
    {
        std::cerr << "Produce failed: " << RdKafka::err2str(errorCode) << std::endl;
        if(errorCode ==  RdKafka::ERR__QUEUE_FULL)
        {
            m_producer->poll(100);
        }
    }
}

KafkaProducer::~KafkaProducer()
{
    while (m_producer->outq_len() > 0)
    {
        std::cerr << "Waiting for " << m_producer->outq_len() << std::endl;
        m_producer->flush(5000);
    }
    delete m_config;
    delete m_topicConfig;
    delete m_topic;
    delete m_producer;
    delete m_dr_cb;
    delete m_event_cb;
    delete m_partitioner_cb;
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐