flink自定义写入clickhouse
**flink自定义写入clickhouse**首先 在pom.xml中放入所需依赖,我这里整理了一份mysql、reduis和clickhouse的所有依赖<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3
·
**
flink自定义写入clickhouse
**
首先 在pom.xml中放入所需依赖,我这里整理了一份mysql、reduis和clickhouse的所有依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.flink</groupId>
<artifactId>flink-first</artifactId>
<version>0.1</version>
<properties>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-->flink-connector-jdbc flink版本需在1.11.0之后<!-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- clickhouse的连接 ======-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-->clickhouse jdbc连接<!-->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.55</version>
</dependency>
<!-- ==========-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<!-- 打包操作的相关依赖-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
具体flink代码如下:
```java
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
//写入clickhouse
public class IntoClickhouse {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.addSource(new Example1.ClickSource())
.addSink(new RichSinkFunction<Example1.Event>() {
private Connection conn;
private PreparedStatement insertStmt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DriverManager.getConnection(
"jdbc:clickhouse://hadoop102:8123/test"
);
insertStmt = conn.prepareStatement("INSERT INTO click (user, url) VALUES (?, ?)");
}
// 来一条数据触发调用一次
@Override
public void invoke(Example1.Event value, Context context) throws Exception {
insertStmt.setString(1, value.user);
insertStmt.setString(2, value.url);
insertStmt.execute();
}
@Override
public void close() throws Exception {
super.close();
insertStmt.close();
conn.close();
}
});
env.execute();
}
}
里面用到的自定义数据源:
```java
public static class Event{
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + new Timestamp(timestamp) +
'}';
}
}
// 自定义数据源
// SourceFunction<T>, T是数据流中的元素类型
// SourceFunction只能用来产生并行度为1的数据源
// ParallelSourceFunction可以产生并行数据源
public static class ClickSource implements SourceFunction<Event> {
private boolean running=true;
private Random random=new Random();
private String[] userArr={"Mary", "Bob", "Alice", "Liz"};
private String[] urlArr={"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
// 任务开始时触发run的调用
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
while (running){
String user=userArr[random.nextInt(userArr.length)];
String url=urlArr[random.nextInt(urlArr.length)];
long timestamp = Calendar.getInstance().getTimeInMillis();// 毫秒时间戳(机器时间)
Event event=new Event(user,url,timestamp);
// 使用collect方法发射数据
sourceContext.collect(event);
Thread.sleep(100L);
}
}
// 在取消任务时触发调用cancel,例如在web ui点击任务的cancel按钮
@Override
public void cancel() {
running=false;
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)