一、背景:做物联网项目,需要用MQ做消息队列做缓冲,选择了使用kafka,部署在公网环境,部署kafka后没有设置登录验证,刚刚部署就被恶意扫描到,向kafka里面push超过1G的文件,直接导致kafka宕掉,虽然无法获取主机权限,但是比较膈应,因此设置了登录 验证,记录下安装、设置及项目程序代码实现整个过程

二、部署环境:Linux+Ubuntu+kafka(2.8.0)+zookeeper(3.5.9,单机版kafka默认自带) 

三、部署安装

1、从官网下载kafka,Apache Kafka,并上传到服务器指定目录,执行解压缩命令tar -zxvf kafka_2.13-2.8.0.tgz

2、进入kafka安装目录/config,修改配置文件server.properties文件

# Kafka Broker的Listener的配置项,需要如下配置,不要填写IP,如果仅仅内网访问可以填写IP,且不用配置advertised.listeners
listeners=SASL_PLAINTEXT://:9092
#kafkabroker注册zookeeper,外网可以通过这个配置进行访问kafka
advertised.listeners=SASL_PLAINTEXT://114.114.114.114:9092
#设置zookeeper监听地址
zookeeper.connect=localhost:2181
log.dirs=/home/ubuntu/kafka/logs/kafka-logs
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=false
#需要开启设置超级管理员,设置visitor用户为超级管理员
super.users=User:visitor

3、为server创建登录验证文件,可以根据自己爱好命名文件,如kafka_server_jaas.conf,文件内容如下

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
        username="visitor"
        password="qaz@123"
        user_visitor="qaz@123";
};

 修改kafka安装目录bin/kafka-server-start.sh,在文件最上面添加变量

export KAFKA_OPTS=" -Djava.security.auth.login.config=/opt/kafka/kafka_2.13-2.8.0/config/kafka_server_jaas.conf"

4、为consumer和producer创建登录验证文件,可以根据爱好命名文件,如kafka_client_jaas.conf,文件内容如下(如果是程序访问,如springboot访问,可以不配置

KafkaClient {  
org.apache.kafka.common.security.plain.PlainLoginModule required  
    username="visitor"  
    password="qaz@123";  
};

在consumer.properties和producer.properties里分别加上如下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

 修改kafka安装目录bin/kafka-console-producer.sh和kafka-console-consumer.sh,在文件最上面添加变量

export KAFKA_OPTS=" -Djava.security.auth.login.config=/opt/kafka/kafka_2.13-2.8.0/config/kafka_client_jaas.conf"

5、kafka启动.进入bin目录,分别启动zookeeper和kafka,至此服务端kafka用户登录验证配置完成

./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

./bin/kafka-server-start.sh -daemon ../config/server.properties

说明:如果kafka访问zookeeper也需要登录验证,则需要为zookeeper添加登录验证配置,如下

zookeeper {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="visitor"
        password="qaz@123";
};

6、创建及查看主题进入bin目录

创建主题,执行命令

./kafka-topics.sh  --zookeeper localhost:2181 --create --topic demo-hehe-topic --partitions 1 --replication-factor 1

 查看主题,执行命令

./kafka-topics.sh --list --zookeeper localhost:2181

四、springboot项目配置kafka登录验证

1、application.yml配置如下

spring:
  kafka:
    # 指定kafka 代理地址,可以多个
    bootstrap-servers: 114.114.114.144:9092 
    producer: # 生产者
      retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #修改最大向kafka推送消息大小
      properties:
        max.request.size: 52428800
    consumer:
      #手动提交offset保证数据一定被消费
      enable-auto-commit: false
      #指定从最近地方开始消费(earliest)
      auto-offset-reset: latest
      #消费者组
      #group-id: dev
    properties:
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: PLAIN
        jaas:
          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="visitor" password="qaz@123";'

2、创建kafka消费者工厂类,在Application或者单独Configuration配置文件里添加如下代码都可以

/**
     * 配置kafka手动提交offset
     *
     * @param consumerFactory 消费者factory
     * @return 监听factory
     */
    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        //消费者并发启动个数,最好跟kafka分区数量一致,不能超过分区数量
        //factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(1500);
        //设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

3、向kafka发送消息

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
/**
 * 向kafka推送数据
 *
 * @author wangfenglei
 */
@Slf4j
@Component
public class KafkaDataProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 向kafka push表数据,同步到本地
     *
     * @param msg   消息
     * @param topic 主题
     * @throws Exception 异常
     */
    public RecordMetadata sendMsg(String msg, String topic) throws Exception {
        try {
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, msg);
            return future.get().getRecordMetadata();
        } catch (Exception e) {
            log.error("sendMsg to kafka failed!", e);
            throw e;
        }
    }
}

4、接收kafka消息

import com.wfl.firefighting.kafka.strategy.BaseConsumerStrategy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


/**
 * kafka消息消费
 *
 * @author wangfenglei
 */
@Component
@Slf4j
public class KafkaDataConsumer {
    private final Map<String, BaseConsumerStrategy> strategyMap = new ConcurrentHashMap<>();

    public KafkaDataConsumer(Map<String, BaseConsumerStrategy> strategyMap){
        this.strategyMap.clear();
        strategyMap.forEach((k, v)-> this.strategyMap.put(k, v));
    }
    /**
     * @param record 消息
     */
    @KafkaListener(topics = {"#{'${customer.kafka.kafka-topic-names}'.split(',')}"}, containerFactory = "kafkaListenerContainerFactory", autoStartup = "${customer.kafka.kafka-receive-flag}")
    public void receiveMessage(ConsumerRecord record, Acknowledgment ack) throws Exception {
        String message = (String) record.value();
        //接收消息
        log.info("Receive from kafka topic[" + record.topic() + "]:" + message);

        try{
            BaseConsumerStrategy strategy = strategyMap.get(record.topic());

            if(null != strategy){
                strategy.consumer(message);
            }
        }finally {
            //手动提交保证数据被消费
            ack.acknowledge();
        }
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐