因为一直处于当前管理岗,开发量减少,偶尔遇到点问题还有掉小兴奋,记录一下,供各位参考

kafka版本:kafka-0.10.2.0

部署方式:集群部署,共5个节点

认证方式:ssl认证

springboot集成kafka的代码,当前演示的是生产者

依赖项:
 

  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
  </dependency>

如果你用的是spring-kafka的话请参照这张表进行对应的版本切换

java代码:

            Properties producerProps = new Properties();

			producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口 多个ip之间用英文逗号分隔");

			producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

			producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/***/ssl/ylhh/kafka.keystore.jks");
			producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "密码");
			producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "密码");
			producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/***/ssl/ylhh/kafka.truststore.jks");
			producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "密码");

			producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
			producerProps.put(ProducerConfig.RETRIES_CONFIG, 0);
			producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
			producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
			Producer<String, String> producer = new KafkaProducer<String, String>(producerProps);
			for (int i = 0; i < 100; i++) {
				ProducerRecord<String, String> data = new ProducerRecord<String, String>(
						"填写topic", "key-" + i, "13245648");
				System.out.println("这是第几次=" + i);
				producer.send(data);
			}
			producer.close();

项目启动(正常状态):

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.4.RELEASE)

2022-05-25 14:41:59.305  INFO 36391 --- [           main] com.ProviderMain                         : No active profile set, falling back to default profiles: default
2022-05-25 14:42:01.674  WARN 36391 --- [           main] o.m.s.mapper.ClassPathMapperScanner      : No MyBatis mapper was found in '[com]' package. Please check your configuration.
2022-05-25 14:42:02.228  INFO 36391 --- [           main] o.s.cloud.context.scope.GenericScope     : BeanFactory id=2e855ab5-e169-3e52-8b58-18ae59ccee35
2022-05-25 14:42:02.353  INFO 36391 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration$$EnhancerBySpringCGLIB$$eaeeb226] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2022-05-25 14:42:02.383  INFO 36391 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$708b523] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2022-05-25 14:42:02.790  INFO 36391 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8001 (http)
2022-05-25 14:42:02.827  INFO 36391 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-05-25 14:42:02.827  INFO 36391 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.17]
2022-05-25 14:42:02.923  INFO 36391 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-05-25 14:42:02.923  INFO 36391 --- [           main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 3600 ms
2022-05-25 14:42:03.153  INFO 36391 --- [           main] c.a.d.s.b.a.DruidDataSourceAutoConfigure : Init DruidDataSource
2022-05-25 14:42:03.315 ERROR 36391 --- [           main] com.alibaba.druid.pool.DruidDataSource   : testWhileIdle is true, validationQuery not set
2022-05-25 14:42:03.322  INFO 36391 --- [           main] com.alibaba.druid.pool.DruidDataSource   : {dataSource-1} inited
2022-05-25 14:42:03.617  WARN 36391 --- [           main] c.n.c.sources.URLConfigurationSource     : No URLs will be polled as dynamic configuration sources.
2022-05-25 14:42:03.617  INFO 36391 --- [           main] c.n.c.sources.URLConfigurationSource     : To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2022-05-25 14:42:03.632  WARN 36391 --- [           main] c.n.c.sources.URLConfigurationSource     : No URLs will be polled as dynamic configuration sources.
2022-05-25 14:42:03.632  INFO 36391 --- [           main] c.n.c.sources.URLConfigurationSource     : To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2022-05-25 14:42:03.887  INFO 36391 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2022-05-25 14:42:04.530  INFO 36391 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2022-05-25 14:42:04.678  INFO 36391 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8001 (http) with context path ''
2022-05-25 14:42:04.681  INFO 36391 --- [           main] com.ProviderMain                         : Started ProviderMain in 6.675 seconds (JVM running for 7.224)

启动成功,你就可以看到控制台输出kafka连接的配置信息

本次遇到的问题是:

o.apache.kafka.common.network.Selector   : [Producer clientId=producer-1] Failed authentication with /IP (SSL handshake failed)

在排查了基础项配置没问题后,基本可以判断是当前依赖版本与kafka版本之间的冲突,将代码内依赖版本修改为与集群版本一致,问题解决。

愿你我共同成长!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐