我这里是编码完成后,打包发送到集群上运行的!!!

​​​​​​​有问题可以私聊我交流

1.使用IDEA创建项目

pom配置如下 里面掺杂了支持scala和spark的配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.tledu</groupId>
  <artifactId>llll</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>${project.artifactId}</name>
  <description>My wonderfull scala app</description>
  <inceptionYear>2018</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.11</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <spec2.version>4.2.0</spec2.version>
    <flink.version>1.10.2</flink.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>1.10.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.compat.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.compat.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>

    <!--  flink包依赖配置-end  -->
    <!--  日志类引入  -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.6.6</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!-- 引入整合redis的依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.1.5</version>
    </dependency>


    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.compat.version}</artifactId>
      <version>2.3.2</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.compat.version}</artifactId>
      <version>2.3.2</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.0.2</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.23</version>
    </dependency>


    <!-- Test -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_${scala.compat.version}</artifactId>
      <version>3.0.5</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-core_${scala.compat.version}</artifactId>
      <version>${spec2.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-junit_${scala.compat.version}</artifactId>
      <version>${spec2.version}</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.21.0</version>
        <configuration>
          <!-- Tests will be run with scalatest-maven-plugin instead -->
          <skipTests>true</skipTests>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest-maven-plugin</artifactId>
        <version>2.0.0</version>
        <configuration>
          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
          <junitxml>.</junitxml>
          <filereports>TestSuiteReport.txt</filereports>
          <!-- Comma separated list of JUnit test class names to execute -->
          <jUnitClasses>samples.AppTest</jUnitClasses>
        </configuration>
        <executions>
          <execution>
            <id>test</id>
            <goals>
              <goal>test</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <!-- see http://davidb.github.com/scala-maven-plugin -->
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>

        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>assembly</goal>
            </goals>
          </execution>
        </executions>
      </plugin>


    </plugins>
  </build>
</project>

编码如下 我这里是计算实时总销售额

package com.com.tledu

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig

import java.util.Properties

object FlinkKafka {
  def main(args: Array[String]): Unit = {
    // 创建flink的环境,StreamExecutionEnvironment ,消费流式数据的环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 创建kafka配置文件
    val properties = new Properties()
    // 配置了bootstrap.servers: kafka的IP:端口
    properties.setProperty("bootstrap.servers", "127.0.0.1:26001")
    // 设置消费组
    properties.setProperty("group.id", "wzxgroup")

    val consumer = new FlinkKafkaConsumer[String](
      "wzx", new SimpleStringSchema(), properties
    )
    // 设置数据源
   val dataStream = env.addSource(consumer)

    // 拿到数据之后,就可以使用flink算子进行操作了
    dataStream.print()
    // 获取O开头的数据,之后拿到数据第四项,进行类和
    val result = dataStream
      .filter(_.startsWith("O")) // 只保留0开头的数据
      .map(_.substring(2) // 处理这里的数据,把数据的每一项的O:去掉了
        .split(",")) // 切割数据,形成一个个的字段
    // 求总金额
    val priceResult = result
      .map(item => ("price", item(3).toInt)) // 我们只关心销售额,所以将数据转成(price,销售额)
      .keyBy(0) // 根据第一项进行聚合统计
      .sum(1) // 根据第二项进行求和


    // 配置redis的连接  注意正确的redis的ip和密码
    val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost("192.168.3.89")
      .setPort(6379)
      .setPassword("123456")
      .build()

    val redisSink = new RedisSink[(String, Int)](config, new MyRedisMapper)
    priceResult.addSink(redisSink)

    env.execute()
  }
}

package com.flink

import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

class MyRedisMapper extends RedisMapper[(String,Int)]{
  override def getCommandDescription:RedisCommandDescription={
    new RedisCommandDescription(RedisCommand.SET)
  }

  override def getKeyFromData(t:(String,Int)):String = t._1

  override def getValueFromData(t:(String,Int)):String = t._2.toString

}

打包,上传到集群

我这里是传到了flink文件夹的bin目录里

redis的ip可以使用    ip addr  查看    密码在redis.conf中

2.在集群中运行

在flink的bin目录下运行如下指令

flink run -m yarn-cluster -c com.com.tledu.FlinkKafka(这是你要运行的类) ./llll.jar(这是你的jar包的位置,我是放在了bin目录下)

3.查看redis中的数据

进入redis的文件夹

输入如下指令即可开启redis服务器

./src/redis-server redis.conf

src/redis-cli -a 123456

进入之后可以用get key 来得到你的数据

我的key名是price

所以使用get price  即可看到我的数据

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐