flink读取mongodb数据源
Flink读取MongoDB
文章目录
一、普通java模式获取
1. mongodb-driver驱动
mongodb-driver是mongo官方推出的java连接mongoDB的驱动包,相当于JDBC驱动。
(1)通过maven仓库导入:https://mvnrepository.com/artifact/org.mongodb/mongodb-driver
(2)官网中下载相应的java的驱动:http://docs.mongodb.org/ecosystem/drivers/java/
(3)不同的驱动使用的jar也不相同参考:http://mongodb.github.io/mongo-java-driver/
例如:
<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>3.11.2</version>
</dependency>
</dependencies>
2. 创建方法类
2.1 查询全部,遍历打印
package mongodb.test;
import org.bson.Document;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
public class Mongodb {
/**
* 查询打印全部集合
*/
public static void mongoQueryAll() {
//1.创建链接
MongoClient client = new MongoClient("localhost");
//2.打开数据库test
MongoDatabase db = client.getDatabase("test");
//3.获取集合
MongoCollection<Document> collection = db.getCollection("stu");
//4.查询获取文档集合
FindIterable<Document> documents = collection.find();
//5.循环遍历
for (Document document : documents) {
System.out.println(document);
}
//6.关闭连接
client.close();
}
public static void main(String[] args) {
mongoQueryAll();
}
}
//打印输出stu全部数据
Document{{_id=5d7374e836a89c5a3d18b87a, name=xiaohua}}
Document{{_id=2.0, sn=002, name=xiaogang}}
Document{{_id=3.0, sn=003, name=zhangfei, job=前锋战将}}
Document{{_id=5d73782736a89c5a3d18b87b, sn=004, name=xiaobingbing}}
Document{{_id=5d7396b44ec120618b2dd0cb, name=Document{{surname=李, name=世名}}, job=[皇帝, 大人物, 大丈夫, 功成名就]}}
2.2 条件查询
/**
* 条件查询:如查询id为xxxx的学生所有信息
*/
public static void mongoConditionQuery() {
//1.创建链接
MongoClient client = new MongoClient("localhost");
//2.打开数据库test
MongoDatabase db = client.getDatabase("test");
//3.获取集合
MongoCollection<Document> collection = db.getCollection("stu");
//4.构建查询条件,按照name来查询
BasicDBObject stu = new BasicDBObject("name","zhangfei");
//5.通过id查询记录,获取文档集合
FindIterable<Document> documents = collection.find(stu);
//5.打印信息
for (Document document : documents) {
System.out.println("name:"+document.getString("name"));
System.out.println("sn:"+document.getString("sn"));
System.out.println("job:"+document.getString("job"));
}
//6.关闭连接
client.close();
}
public static void main(String[] args) {
mongoConditionQuery();
}
//执行输出
name:zhangfei
sn:003
job:前锋战将
注意:当需要查询条件+判断的时候这样写,例如查询学号sn>003的学员
//查询sum大于3的学员
BasicDBObject stu = new BasicDBObject("sum",new BasicDBObject("$gt",003));
2.3 插入语句
/**
* 插入语句
*/
public static void mongoInsert() {
//1.创建链接
MongoClient client = new MongoClient("localhost");
//2.打开数据库test
MongoDatabase db = client.getDatabase("test");
//3.获取集合
MongoCollection<Document> collection = db.getCollection("stu");
//4.准备插入数据
HashMap<String, Object> map = new HashMap<String, Object>();
map.put("sn","005");
map.put("name","xiaoA");
map.put("job","A工作");
map.put("sum",6);
//5.将map转换成document
Document document = new Document(map);
collection.insertOne(document);
//6.关闭连接
client.close();
}
//测试执行
public static void main(String[] args) {
mongoInsert();
}
批量插入,仅供参考:
//当需要插入多条文档的时候,循环进行单条插入当然是可以,但是效率不高,MongoDB提供了批量插入的方法
List<DBObject> objs = new ArrayList<DBObject>();
objs.add(new BasicDBObject("name","user29").append("age", 30).append("sex", 1));
objs.add(new BasicDBObject("name","user30").append("age", 30).append("sex", 1));
collection.insert(objs);
//这样就批量进行了插入。批量插入通过一次请求将数据传递给数据库,然后由数据库进行插入,比循环单条插入节省了每次进行请求的资源。
二、Flink 以Hadoop文件格式读取
1. pom.xml添加相关依赖
<!--hadoop compatibility-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.mongo-hadoop</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>2.0.0</version>
</dependency>
2. 以Hadoop文件格式读取MongoDB中的数据
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import example.flink.KeySelector.RecordSeclectId;
import example.flink.mapFunction.BSONMapToRecord;
import example.flink.reduceFunction.KeyedGroupReduce;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapreduce.Job;
import org.bson.BSONObject;
public class MongoSet {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
Job inputJob = Job.getInstance();
//inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:readpw@mongos01:port,mongos02:port,mongos03:port/db.collection");
//inputJob.getConfiguration().set("mongo.auth.uri", "mongodb://root:rootpw@mongos01:port,mongos02:port,mongos03:port/admin");
inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:readpw@mongos01:port,mongos02:port,mongos03:port/db.collection?&authMechanism=SCRAM-SHA-1&authSource=admin&readPreference=secondary");
inputJob.getConfiguration().set("mongo.input.split.read_shard_chunks", "true");
inputJob.getConfiguration().set("mongo.input.split.create_input_splits", "false");
inputJob.getConfiguration().set("mongo.input.split_size","16");
inputJob.getConfiguration().set("mongo.input.query", "{'createDateTime': {\"$lte\":{\"$date\":\"2019-05-27T00:00:00.000Z\"}, \"$gte\":{\"$date\":\"2010-03-17T00:00:00.000Z\"}}}");
inputJob.getConfiguration().set("mongo.input.fields", "{\"Id\":\"1\",\"saleType\":\"1\",\"saleNum\":\"1\",\"createDateTime\":\"1\"}");
HadoopInputFormat<Object, BSONObject> hdIf =
new HadoopInputFormat<>(new MongoInputFormat(), Object.class, BSONObject.class, inputJob);
DataSet<Tuple2<Object, BSONObject>> inputNew = env.createInput(hdIf);
DataSet<Tuple2<String, BSONWritable>> personInfoDataSet = inputNew
.map(new BSONMapToRecord())
.groupBy(new RecordSeclectId())
.reduceGroup(new KeyedGroupReduce());
Job outputJob = Job.getInstance();
outputJob.getConfiguration().set("mongo.output.uri", "mongodb://mongo:27017/db.collection");
outputJob.getConfiguration().set("mongo.output.batch.size", "8");
outputJob.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp");
personInfoDataSet.output(new HadoopOutputFormat<>(new MongoOutputFormat<>(), outputJob));
env.execute(MongoSet.class.getCanonicalName());
}
三、Flink CDC监控MongoDB oplog的变化(只能同步实时数据)
1、简介
MongoDB CDC连接器通过伪装一个MongoDB集群里副本,利用MongoDB集群的高可用机制,该副本可以从master节点获取完整oplog(operation log)事件流。
Flink CDC官网:https://github.com/ververica/flink-cdc-connectors
MongoDB CDC:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/mongodb-cdc.md
mongodb知识点整理:https://blog.csdn.net/penngo/article/details/124232016
2、依赖条件
-
MongoDB版本
MongoDB version >= 3.6 -
集群部署
副本集 或 分片集群 。 -
Storage Engine
WiredTiger存储引擎。 -
副本集协议版本
副本集协议版本1 (pv1) 。
从4.0版本开始,MongoDB只支持pv1。 pv1是MongoDB 3.2或更高版本创建的所有新副本集的默认值。 -
需要的权限
MongoDB Kafka Connector需要changeStream 和 read 权限。
您可以使用下面的示例进行简单授权:
更多详细授权请参考MongoDB数据库用户角色。
use admin;
db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{
role: "read", db: "admin" }, //read role includes changeStream privilege
{
role: "readAnyDatabase", db: "admin" } //for snapshot reading
]
});
3、配置MongoDB副本集
创建mongo1.conf、mongo2.conf、mongo3.conf
# mongo1.conf
dbpath=/data/mongodb-4.4.13/data1
logpath=/data/mongodb-4.4.13/mongo1.log
logappend=true
port=27017
replSet=replicaSet_penngo # 副本集名称
oplogSize=200
# mongo2.conf
dbpath=/data/mongodb-4.4.13/data2
logpath=/data/mongodb-4.4.13/mongo2.log
logappend=true
port=27018
replSet=replicaSet_penngo # 副本集名称
oplogSize=200
# mongo3.conf
dbpath=/data/mongodb-4.4.13/data3
logpath=/data/mongodb-4.4.13/mongo3.log
logappend=true
port=27019
replSet=replicaSet_penngo # 副本集名称
oplogSize=200
启动mongodb服务端
在单独的终端上分别运行以下命令:
> mongod --config ../mongo1.conf
> mongod --config ../mongo2.conf
> mongod --config ../mongo3.conf
连接mongodb,使用mongo shell配置副本集
> mongo --port 27017
# 在mongo shell中执行下边命令初始化副本集
> rsconf = {
_id: "replicaSet_penngo",
members: [
{_id: 0, host: "localhost:27017"},
{_id: 1, host: "localhost:27018"},
{_id: 2, host: "localhost:27019"}
]
}
> rs.initiate(rsconf)
mongo shell中创建数据库penngo_db和集合coll,插入1000条数据
> use penngo_db
> for (i=0; i<1000; i++) {db.coll.insert({user: "penngo" + i})}
> db.coll.count()
在mongo shell创建新用户,给Flink MongoDB CDC使用
> use admin;
> db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" }, //read role includes changeStream privilege
{ role: "readAnyDatabase", db: "admin" } //for snapshot reading
]
});
4、创建maven工程
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.penngo.flinkcdc</groupId>
<artifactId>FlickCDC</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>FlickCDC_TEST</name>
<url>https://21doc.net/</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink-version>1.13.3</flink-version>
<flink-cdc-version>2.1.1</flink-cdc-version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink-cdc-version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>${flink-cdc-version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>alimaven</id>
<name>Maven Aliyun Mirror</name>
<url>https://maven.aliyun.com/repository/central</url>
</repository>
</repositories>
</project>
MongoDBExample.java
package com.penngo.flinkcdc;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class MongoDBExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.通过FlinkCDC构建SourceFunction
SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
.hosts("127.0.0.1:27017")
.username("flinkuser")
.password("flinkpw")
.database("penngo_db")
.collection("coll")
// .databaseList("penngo_db")
// .collectionList("coll")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> dataStreamSource = env.addSource(mongoDBSourceFunction);
SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {
@Override
public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {
try {
System.out.println("processElement=====" + value);
}catch (Exception e) {
e.printStackTrace();
}
}
});
dataStreamSource.print("原始流--");
env.execute("Mongo");
}
}
运行效果
四、Flink SQL CDC 监控MongoDB
更多推荐
所有评论(0)