Spark Streaming + Kafka 实现实时数据传输

版本说明:
Spark 3.0.0
Kafka 2.12
zookeeper 3.5.7

一、集群端

前提:配置好并启动三台节点的zookeeper

  1. 在三个结点分别配置Kafka

    ①解压安装包,在安装目录/home/kafka-2.12下创建logs文件夹

    ②修改./config/vi server.properties配置文件

    -----修改部分------
    ##broke全局唯一编号,不能重复
    broker.id=0 ## 将另外两个节点分别改称1,2
    ##开启topic删除功能
    delete.topic.enable=true
    ##kafka运行日志存放路径
    log.dirs=/home/kafka-2.12/logs
    ##配置Zookeeper集群地址
    zookeerper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
    
    
    
    -----以下不用修改----
    ##处理网络请求的线程数量
    num.network.threads=3
    ##处理磁盘IO线程数量
    num.io.threads=8
    ##发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    ##接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    ##请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    
    ##kafka运行日志存放路径
    log.dirs=/opt/module/kafka/logs
    ##topic在当前broker上的分区个数
    num.partition=1
    ##用来恢复和清理data下数据的线程数量
    num.recovery.threads.per.data.dir=1
    ##segment文件保留的最长时间,超时将被删除
    log.retention.hours=168
    

    ③配置环境变量

    sudo vim /etc/profile
    
    ##KAFKA_HOME
    export KAFKA_HOME=/home/kafka-2.12
    export PATH=$PATH:$KAFKA_HOME/bin
    
    source vim /etc/profile
    

    ps:如果修改profile不规范导致所有指令失效,可用

    export PATH=/usr/bin:/usr/sbin:/bin:/sbin:/usr/X11R6/bin
    

    暂时恢复数据,然后重新进入/etc/profile检查是否有语法错误,进行修改。

    ④使用预先写好的xsync群发脚本向其他节点分发kafka-2.12

    xsync kafka-2.12
    

    注意要修改另外节点的brokerid为1,2

    ⑤在三台节点分别启动kafka

    ##hadoop01中
    ./bin/kafka-server-start.sh -daemon config/server.properties
    ##hadoop02中
    ./bin/kafka-server-start.sh -daemon config/server.properties
    ##hadoop03中
    ./bin/kafka-server-start.sh -daemon config/server.properties
    

    可写好shell群发脚本方便群起

    for i in hadoop01 hadoop02 hadoop03
    	do
    	echo	"======$i======"
    	ssh $i "/home/kafka-2.12/bin/kafka-server-start.sh -daemon config/server.properties"
    	done
    

    ⑥创建一个新的topic,向streaming发送数据

    kafka2.12创建新topic指令要加入 --bootstrap-server,老版本可能是–zookeeper

    [root@hadoop01 ~]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --create --topic atguiguNew
    

    查看kafka中存在的list

    [root@hadoop01 ~]# bin/kafka-topics.sh --list --bootstrap-server hadoop01:9092
    

    ⑦等待IDEA端Spark Streaming与集群段kafka连接

    ⑧Idea程序运行后等待producer产生数据

    producer产生数据

    [root@hadoop01 kafka-2.12]# bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic atguiguNew
    >hello
    >hello
    >hey
    

二、IDEA端

前提:配置好spark环境,添加了Spark Streaming依赖等

①在maven项目中的pom.xml文件中添加spark-streaming-kafka依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

②创建scala object,根据所需方法导入对应jar包

package com.atguigu.bigdata.spark.streaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming04_Kafka {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        val ssc = new StreamingContext(sparkConf, Seconds(3))
		//kafka相关参数
        val kafkaPara: Map[String, Object] = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
            ConsumerConfig.GROUP_ID_CONFIG -> "skTest",//consumer group
            "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
        )

        val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
            ssc,//上下文环境对象
            LocationStrategies.PreferConsistent,//采集节点和计算节点的位置匹配
            ConsumerStrategies.Subscribe[String, String](Set("skTest"), kafkaPara)//set中是kafka中topic名称
        )
        kafkaDataDS.map(_.value()).print()


        ssc.start()
        ssc.awaitTermination()
    }

}

③在集群端输入数据,观察idea端控制台采集结果
在这里插入图片描述

内容参考:尚硅谷大数据Spark教程从入门到精通、尚硅谷大数据kafka快速入门

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐