第一次做为生产端进行接入kafka,网上找了一些相关的接入方式,整理了一份比较简单的接入方式

首先是对接的工具类 KafkaUtil.java

package com.ruoyi.soaworkflow.utils.kafka;

import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


/**
 kafka有用户名验证的配置
*/
@Component //添加注释进行实例化
public class KafkaUtil {

    //服务器地址 没密码使用PLAINTEXT前缀 有密码了使用SASL_PLAINTEXT前缀
    //public static final String servers="SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024";
    //要使用set方式读取配置文件里面内容将数据放到静态变量中
    private static String bootstrapServers;
    @Value("${spring.kafka.bpm.bootstrap-servers}")
    public void setBootstrapServers(String bootstrapServers){
        KafkaUtil.bootstrapServers = bootstrapServers;
    }

    private static Integer retries;
    @Value("${spring.kafka.bpm.producer.retries}")
    public void setRetries(Integer retries){
        KafkaUtil.retries = retries;
    }
    private static Integer batchSize;
    @Value("${spring.kafka.bpm.producer.batch-size}")
    public void setBatchSize(Integer batchSize){
        KafkaUtil.batchSize = batchSize;
    }
    private static Integer bufferMemory;
    @Value("${spring.kafka.bpm.producer.buffer-memory}")
    public void setBufferMemory(Integer bufferMemory){
        KafkaUtil.bufferMemory = bufferMemory;
    }

    private static Integer linger;
    @Value("${spring.kafka.bpm.producer.linger}")
    public void setLinger(Integer linger){
        KafkaUtil.linger = linger;
    }

    private static String acks;
    @Value("${spring.kafka.bpm.producer.acks}")
    public void setAcks(String acks){
        KafkaUtil.acks = acks;
    }
    private static String username;
    @Value("${spring.kafka.bpm.producer.username}")
    public void setUsername(String username){
        KafkaUtil.username = username;
    }
    private static String passwd;
    @Value("${spring.kafka.bpm.producer.passwd}")
    public void setPasswd(String passwd){
        KafkaUtil.passwd = passwd;
    }
    //kafka集群生产者配置
    public static KafkaProducer<String, String> getProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers );
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.setProperty("security.protocol", "SASL_PLAINTEXT");
        props.setProperty("sasl.mechanism", "SCRAM-SHA-512");
        String jassc = "org.apache.kafka.common.security.scram.ScramLoginModule required username=" + username + " password=" + passwd + ";";
        props.setProperty("sasl.jaas.config", jassc);
        KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);
        return kp;
    }


    public static KafkaConsumer<String, String> getConsumer(String groupId,String username,String passwd) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024");
        props.put("auto.offset.reset", "earliest"); //必须要加,如果要读旧数据
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "100");
        props.put("max.partition.fetch.bytes", "10240");//每次拉取的消息字节数,10K?,每次取回20条左右
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty ("security.protocol", "SASL_PLAINTEXT");
        props.setProperty ("sasl.mechanism", "SCRAM-SHA-512");
        String jassc = "org.apache.kafka.common.security.scram.ScramLoginModule required username=" + username + " password=" + passwd + ";";
        props.setProperty("sasl.jaas.config", jassc);
        KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props);
        return kc;
    }
}

然后是配置文件里面内容,当然也可以直接写死在工具类中

spring:
  kafka:
    
    bpm:
      bootstrap-servers: SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024
      producer:
        retries: 1     #发送失败后的重复发送次数
        batch-size: 2 #一次最多发送数据量
        buffer-memory: 33554432 #32M批处理缓冲区
        linger: 1000
        acks: all
        username: aaaaa  
        passwd: bbbbb

pom文件中也要引入对应的jar

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

然后就是生产者 ProducerClient.java

package com.ruoyi.soaworkflow.utils.kafka;

import java.util.concurrent.Future;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;


/**
 * 生产者
 */
public class ProducerClient {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
    }

    private static Producer<String, String> producer = KafkaUtil.getProducer();
    public static void sendToKafka(String topic,String processId,JSONObject bpmData) {
        try {
            final ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
                    processId, bpmData.toJSONString());
            Future<RecordMetadata> send = producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("sendToKafka-发送至Kafka:" + "d+key-" + processId);
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

消费端代码 ConsumerClient.java

package com.ruoyi.soaworkflow.utils.kafka;

import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/** 
消费者
*/
public class ConsumerClient {

    public  static KafkaConsumer<String, String> consumer = null;

    public static void main(String[] args) {
        fecthKafka();
    }

    public static void fecthKafka() {
        consumer = KafkaUtil.getConsumer("testGroup1","oaadmin","NTA4YjRhZDBmYjQ3"); //group
        consumer.subscribe(Arrays.asList("3_kjczxsmrtj"));//topic

        int i=0;
        while (true) {
            ConsumerRecords<String, String> records ;
            try {
                records = consumer.poll(Long.MAX_VALUE);//毫秒
            }catch (Exception e){
                e.printStackTrace();
                continue;
            }

            for (ConsumerRecord<String, String> record : records) {
                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key: " + record.key() + ",value:" + record.value() );
                i++;
                System.out.println(i);
            }

            try {
                consumer.commitSync();
            } catch (Exception e) {
                e.printStackTrace();
                continue;
            }



        }
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐