springboot对接kafka
kafka生产消费
·
第一次做为生产端进行接入kafka,网上找了一些相关的接入方式,整理了一份比较简单的接入方式
首先是对接的工具类 KafkaUtil.java
package com.ruoyi.soaworkflow.utils.kafka;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
kafka有用户名验证的配置
*/
@Component //添加注释进行实例化
public class KafkaUtil {
//服务器地址 没密码使用PLAINTEXT前缀 有密码了使用SASL_PLAINTEXT前缀
//public static final String servers="SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024";
//要使用set方式读取配置文件里面内容将数据放到静态变量中
private static String bootstrapServers;
@Value("${spring.kafka.bpm.bootstrap-servers}")
public void setBootstrapServers(String bootstrapServers){
KafkaUtil.bootstrapServers = bootstrapServers;
}
private static Integer retries;
@Value("${spring.kafka.bpm.producer.retries}")
public void setRetries(Integer retries){
KafkaUtil.retries = retries;
}
private static Integer batchSize;
@Value("${spring.kafka.bpm.producer.batch-size}")
public void setBatchSize(Integer batchSize){
KafkaUtil.batchSize = batchSize;
}
private static Integer bufferMemory;
@Value("${spring.kafka.bpm.producer.buffer-memory}")
public void setBufferMemory(Integer bufferMemory){
KafkaUtil.bufferMemory = bufferMemory;
}
private static Integer linger;
@Value("${spring.kafka.bpm.producer.linger}")
public void setLinger(Integer linger){
KafkaUtil.linger = linger;
}
private static String acks;
@Value("${spring.kafka.bpm.producer.acks}")
public void setAcks(String acks){
KafkaUtil.acks = acks;
}
private static String username;
@Value("${spring.kafka.bpm.producer.username}")
public void setUsername(String username){
KafkaUtil.username = username;
}
private static String passwd;
@Value("${spring.kafka.bpm.producer.passwd}")
public void setPasswd(String passwd){
KafkaUtil.passwd = passwd;
}
//kafka集群生产者配置
public static KafkaProducer<String, String> getProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers );
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "SCRAM-SHA-512");
String jassc = "org.apache.kafka.common.security.scram.ScramLoginModule required username=" + username + " password=" + passwd + ";";
props.setProperty("sasl.jaas.config", jassc);
KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);
return kp;
}
public static KafkaConsumer<String, String> getConsumer(String groupId,String username,String passwd) {
Properties props = new Properties();
props.put("bootstrap.servers", "SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024");
props.put("auto.offset.reset", "earliest"); //必须要加,如果要读旧数据
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "100");
props.put("max.partition.fetch.bytes", "10240");//每次拉取的消息字节数,10K?,每次取回20条左右
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty ("security.protocol", "SASL_PLAINTEXT");
props.setProperty ("sasl.mechanism", "SCRAM-SHA-512");
String jassc = "org.apache.kafka.common.security.scram.ScramLoginModule required username=" + username + " password=" + passwd + ";";
props.setProperty("sasl.jaas.config", jassc);
KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props);
return kc;
}
}
然后是配置文件里面内容,当然也可以直接写死在工具类中
spring:
kafka:
bpm:
bootstrap-servers: SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024
producer:
retries: 1 #发送失败后的重复发送次数
batch-size: 2 #一次最多发送数据量
buffer-memory: 33554432 #32M批处理缓冲区
linger: 1000
acks: all
username: aaaaa
passwd: bbbbb
pom文件中也要引入对应的jar
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
然后就是生产者 ProducerClient.java
package com.ruoyi.soaworkflow.utils.kafka;
import java.util.concurrent.Future;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 生产者
*/
public class ProducerClient {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
}
private static Producer<String, String> producer = KafkaUtil.getProducer();
public static void sendToKafka(String topic,String processId,JSONObject bpmData) {
try {
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
processId, bpmData.toJSONString());
Future<RecordMetadata> send = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
});
System.out.println("sendToKafka-发送至Kafka:" + "d+key-" + processId);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
消费端代码 ConsumerClient.java
package com.ruoyi.soaworkflow.utils.kafka;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
消费者
*/
public class ConsumerClient {
public static KafkaConsumer<String, String> consumer = null;
public static void main(String[] args) {
fecthKafka();
}
public static void fecthKafka() {
consumer = KafkaUtil.getConsumer("testGroup1","oaadmin","NTA4YjRhZDBmYjQ3"); //group
consumer.subscribe(Arrays.asList("3_kjczxsmrtj"));//topic
int i=0;
while (true) {
ConsumerRecords<String, String> records ;
try {
records = consumer.poll(Long.MAX_VALUE);//毫秒
}catch (Exception e){
e.printStackTrace();
continue;
}
for (ConsumerRecord<String, String> record : records) {
System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key: " + record.key() + ",value:" + record.value() );
i++;
System.out.println(i);
}
try {
consumer.commitSync();
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)