Kafka Connect 是一种用于在 Apache Kafka® 和其他数据系统之间可扩展且可靠地流式传输数据的工具。它使快速定义将大型数据集移入和移出 Kafka 的连接器变得简单。Kafka Connect 可以摄取整个数据库或从应用程序服务器收集指标到 Kafka 主题中,使数据可用于低延迟的流处理。导出连接器可以将数据从 Kafka 主题传送到二级索引(如 Elasticsearch)或批处理系统(如 Hadoop)以进行离线分析。

一、Kafka主要概念

  • 连接器——通过管理任务来协调数据流的高级抽象
  • 任务——如何将数据复制到 Kafka 或从 Kafka 复制数据的实现
  • 工人- 执行连接器和任务的运行进程
  • 转换器——用于在 Connect 和发送或接收数据的系统之间转换数据的代码
  • 转换- 改变由连接器产生或发送到连接器的每条消息的简单逻辑
  • 死信队列– Connect 如何处理连接器错误

1.1 连接器

     

        Kafka Connect 中的连接器定义了数据应该复制到哪里和从哪里复制。阿连接器实例是负责管理和卡夫卡其他系统间的数据的复制的逻辑工作。连接器实现或使用的所有类都在连接器插件中定义。连接器实例和连接器插件都可以称为“连接器”,但从所引用的上下文中应该始终清楚(例如,“安装连接器”指的是插件,“检查连接器的状态”) ”指的是一个连接器实例)。

我们鼓励用户利用现有的连接器。但是,可以从头开始编写新的连接器插件。概括地说,希望编写新连接器插件的开发人员遵循以下工作流程。开发人员指南中提供了更多信息。

1.2 任务

任务是 Connect 数据模型中的主要参与者。每个连接器实例协调一组实际复制数据的任务。通过允许连接器将单个作业分解为多个任务,Kafka Connect 以很少的配置提供了对并行性和可扩展数据复制的内置支持。这些任务中没有存储状态。任务状态存储在 Kafka 中的特殊主题中config.storage.topic,status.storage.topic 并由关联的连接器管理。因此,可以随时启动、停止或重新启动任务,以提供弹性、可扩展的数据管道。

通过 Connect 源任务传入 Kafka 的数据的高级表示。请注意,内部偏移量存储在 Kafka 或磁盘中,而不是存储在任务本身中。

1.3任务再平衡

当连接器首次提交到集群时,工作人员会重新平衡集群中的全套连接器及其任务,以便每个工作人员的工作量大致相同。当连接器增加或减少它们需要的任务数量时,或者当连接器的配置发生更改时,也会使用相同的重新平衡过程。当工作人员失败时,任务会在活动工作人员之间重新平衡。当任务失败时,不会触发重新平衡,因为任务失败被视为例外情况。因此,失败的任务不会由框架自动重新启动,而应通过REST API重新启动。

任务故障转移示例显示了在工作器故障时任务如何重新平衡。

1.4 工人

连接器和任务是工作的逻辑单元,必须安排在流程中执行。Kafka Connect 将这些进程称为工作人员,并且有两种类型的工作人员:独立的和分布式的。

1.4.1 独立工作者

独立模式是最简单的模式,其中一个进程负责执行所有连接器和任务。

由于它是单个进程,因此需要最少的配置。独立模式便于入门、开发期间以及某些只有一个进程有意义的情况,例如从主机收集日志。但是,因为只有一个进程,所以它的功能也更加有限:可扩展性仅限于单个进程,除了您添加到单个进程的任何监视之外,没有容错能力。

1.4.2 分布式工作者

分布式模式为 Kafka Connect 提供了可扩展性和自动容错能力。在分布式模式下,您可以使用相同的方式启动许多工作进程group.id,它们会自动协调以安排所有可用工作人员之间的连接器和任务的执行。如果您添加工作人员、关闭工作人员或工作人员意外失败,其余工作人员会检测到这一点并自动协调以在更新的可用工作人员集之间重新分配连接器和任务。请注意与消费者组重新平衡的相似性。在幕后,连接工作人员正在使用消费者群体进行协调和重新平衡。

注意:

所有具有相同的工作人员group.id将在同一个连接集群中。例如,如果 worker-a group.id=connect-cluster-a和 worker-b 具有相同的group.id,则 worker-a 和 worker-b 将形成一个名为 的集群connect-cluster-a。

 三节点 Kafka Connect 分布式模式集群。连接器(监控源或接收器系统的更改需要重新配置任务)和任务(复制连接器数据的子集)在活动工作人员之间自动平衡。任务之间的工作分工由每个任务分配的分区表示。

二、Connector 单机版部署连接JDBC

2.1 下载kafka-connect-jdbc

 下载安装JDBC SOURCE CONNECTOR 网址:https://www.confluent.io/hub/  解压到kafka的下载目录中的plugins文件夹下,解压后里面有一下文件

2.2 整理文件

将etc文件夹中的两个文件复制到kafka的下载目录中的config文件夹下,并且将两个分别重命名为“connect-mysql-sink.properties” 和“connect-mysql-source.properties”

将lib文件中的kafka-connect-jdbc-10.2.0.jar的文件在kafka的下载目录中libs文件中新建一个connector的文件,将这个文件放入,并将lib下的所有文件复制到kafka的libs目录下,并将jdbc的连接驱动jar包拷贝一份复制到kafka的libs文件中一份

2.2.1修改connect-mysql-sink.properties里面的配置

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=orders
connection.url=jdbc:mysql://localhost:3306/kafkatest?user=root&password=root
auto.create=true
pk.mode = record_value
pk.fields = kafkacol
table.name.format=kafkatable
security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

2.2.2 修改connect-mysql-source.properties里面的配置

name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1

connection.url=jdbc:mysql://localhost:3306/kafkatest?user=root&password=root
mode=incrementing
incrementing.column.name=id
topic.prefix=test-sqlite-jdbc-

2.2.3 修改connect-standalone.properties配置

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
#配置jdbc在kafka配置目录中的位置
plugin.path=E:\kafka_2.12-2.4.0\libs\connector

2.2、测试connect-jdbc

2.2.1 分别启动zkserver和各个kafka

2.2.2 启动kafka-connect

bin\windows\connect-standalone.bat config/connect-standalone.properties config\connect-mysql-source.properties

2.2.3 测试

在被连接的数据库的表中插入一条数据,观察kafka是否有新的主题生成,并且查看里面的数据

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐