一、普通java模式获取

1. mongodb-driver驱动

mongodb-driver是mongo官方推出的java连接mongoDB的驱动包,相当于JDBC驱动。
(1)通过maven仓库导入:https://mvnrepository.com/artifact/org.mongodb/mongodb-driver
(2)官网中下载相应的java的驱动:http://docs.mongodb.org/ecosystem/drivers/java/
(3)不同的驱动使用的jar也不相同参考:http://mongodb.github.io/mongo-java-driver/
例如:

 <dependencies>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-sync</artifactId>
        <version>3.11.2</version>
    </dependency>
  </dependencies>

2. 创建方法类

2.1 查询全部,遍历打印

package mongodb.test;

import org.bson.Document;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;

public class Mongodb {
    
    /**
     * 查询打印全部集合
     */
    public static void mongoQueryAll() {
        //1.创建链接
        MongoClient client = new MongoClient("localhost");
        //2.打开数据库test
        MongoDatabase db = client.getDatabase("test");
        //3.获取集合
        MongoCollection<Document> collection = db.getCollection("stu");
        //4.查询获取文档集合
        FindIterable<Document> documents = collection.find();
        //5.循环遍历
        for (Document document : documents) {
            System.out.println(document);
        }
        //6.关闭连接
        client.close();
    }
    
    public static void main(String[] args) {
        mongoQueryAll();
    }
}
//打印输出stu全部数据
Document{{_id=5d7374e836a89c5a3d18b87a, name=xiaohua}}
Document{{_id=2.0, sn=002, name=xiaogang}}
Document{{_id=3.0, sn=003, name=zhangfei, job=前锋战将}}
Document{{_id=5d73782736a89c5a3d18b87b, sn=004, name=xiaobingbing}}
Document{{_id=5d7396b44ec120618b2dd0cb, name=Document{{surname=, name=世名}}, job=[皇帝, 大人物, 大丈夫, 功成名就]}}

2.2 条件查询


 	/**
     * 条件查询:如查询id为xxxx的学生所有信息
     */
    public static void mongoConditionQuery() {
        //1.创建链接
        MongoClient client = new MongoClient("localhost");
        //2.打开数据库test
        MongoDatabase db = client.getDatabase("test");
        //3.获取集合
        MongoCollection<Document> collection = db.getCollection("stu");
        //4.构建查询条件,按照name来查询
        BasicDBObject stu = new BasicDBObject("name","zhangfei");
        //5.通过id查询记录,获取文档集合
        FindIterable<Document> documents = collection.find(stu);
        //5.打印信息
        for (Document document : documents) {
            System.out.println("name:"+document.getString("name"));
            System.out.println("sn:"+document.getString("sn"));
            System.out.println("job:"+document.getString("job"));
        }
        //6.关闭连接
        client.close();
    }
    
    public static void main(String[] args) {
        mongoConditionQuery();
    }
//执行输出
name:zhangfei
sn:003
job:前锋战将

注意:当需要查询条件+判断的时候这样写,例如查询学号sn>003的学员

//查询sum大于3的学员
BasicDBObject stu = new BasicDBObject("sum",new BasicDBObject("$gt",003));

2.3 插入语句

 	 /**
     * 插入语句
     */
    public static void mongoInsert() {
        //1.创建链接
        MongoClient client = new MongoClient("localhost");
        //2.打开数据库test
        MongoDatabase db = client.getDatabase("test");
        //3.获取集合
        MongoCollection<Document> collection = db.getCollection("stu");
        //4.准备插入数据
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put("sn","005");
        map.put("name","xiaoA");
        map.put("job","A工作");
        map.put("sum",6);
        //5.将map转换成document
        Document document = new Document(map);
        collection.insertOne(document);
        //6.关闭连接
        client.close();
    }
    //测试执行
    public static void main(String[] args) {
        mongoInsert();
    }

批量插入,仅供参考:

//当需要插入多条文档的时候,循环进行单条插入当然是可以,但是效率不高,MongoDB提供了批量插入的方法
List<DBObject> objs = new ArrayList<DBObject>();
objs.add(new BasicDBObject("name","user29").append("age", 30).append("sex", 1));
objs.add(new BasicDBObject("name","user30").append("age", 30).append("sex", 1));
collection.insert(objs);
//这样就批量进行了插入。批量插入通过一次请求将数据传递给数据库,然后由数据库进行插入,比循环单条插入节省了每次进行请求的资源。

二、Flink 以Hadoop文件格式读取

1. pom.xml添加相关依赖

<!--hadoop compatibility-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>org.mongodb.mongo-hadoop</groupId>
    <artifactId>mongo-hadoop-core</artifactId>
    <version>2.0.0</version>
</dependency>

2. 以Hadoop文件格式读取MongoDB中的数据

import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import example.flink.KeySelector.RecordSeclectId;
import example.flink.mapFunction.BSONMapToRecord;
import example.flink.reduceFunction.KeyedGroupReduce;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapreduce.Job;
import org.bson.BSONObject;
 
public class MongoSet {
	public static void main(String[] args) throws Exception {
	    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	    env.setParallelism(4);
        Job inputJob = Job.getInstance();
        //inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:readpw@mongos01:port,mongos02:port,mongos03:port/db.collection");
        //inputJob.getConfiguration().set("mongo.auth.uri", "mongodb://root:rootpw@mongos01:port,mongos02:port,mongos03:port/admin");
           
        inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:readpw@mongos01:port,mongos02:port,mongos03:port/db.collection?&authMechanism=SCRAM-SHA-1&authSource=admin&readPreference=secondary");
        inputJob.getConfiguration().set("mongo.input.split.read_shard_chunks", "true");
        inputJob.getConfiguration().set("mongo.input.split.create_input_splits", "false");
        inputJob.getConfiguration().set("mongo.input.split_size","16");
        inputJob.getConfiguration().set("mongo.input.query", "{'createDateTime': {\"$lte\":{\"$date\":\"2019-05-27T00:00:00.000Z\"}, \"$gte\":{\"$date\":\"2010-03-17T00:00:00.000Z\"}}}");
        inputJob.getConfiguration().set("mongo.input.fields", "{\"Id\":\"1\",\"saleType\":\"1\",\"saleNum\":\"1\",\"createDateTime\":\"1\"}");
 
        HadoopInputFormat<Object, BSONObject> hdIf =
				new HadoopInputFormat<>(new MongoInputFormat(), Object.class, BSONObject.class, inputJob);
 
	    DataSet<Tuple2<Object, BSONObject>> inputNew = env.createInput(hdIf);
 
	    DataSet<Tuple2<String, BSONWritable>> personInfoDataSet = inputNew
				.map(new BSONMapToRecord())
				.groupBy(new RecordSeclectId())
				.reduceGroup(new KeyedGroupReduce());
 
	    Job outputJob = Job.getInstance();
	    outputJob.getConfiguration().set("mongo.output.uri", "mongodb://mongo:27017/db.collection");
	    outputJob.getConfiguration().set("mongo.output.batch.size", "8");
	    outputJob.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp");
	    personInfoDataSet.output(new HadoopOutputFormat<>(new MongoOutputFormat<>(), outputJob));
 
	    env.execute(MongoSet.class.getCanonicalName());
}

三、Flink CDC监控MongoDB oplog的变化(只能同步实时数据)

1、简介

MongoDB CDC连接器通过伪装一个MongoDB集群里副本,利用MongoDB集群的高可用机制,该副本可以从master节点获取完整oplog(operation log)事件流。

Flink CDC官网:https://github.com/ververica/flink-cdc-connectors
MongoDB CDC:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/mongodb-cdc.md
mongodb知识点整理:https://blog.csdn.net/penngo/article/details/124232016

2、依赖条件

  • MongoDB版本
    MongoDB version >= 3.6

  • 集群部署
    副本集 或 分片集群 。

  • Storage Engine
    WiredTiger存储引擎。

  • 副本集协议版本
    副本集协议版本1 (pv1) 。
    从4.0版本开始,MongoDB只支持pv1。 pv1是MongoDB 3.2或更高版本创建的所有新副本集的默认值。

  • 需要的权限
    MongoDB Kafka Connector需要changeStream 和 read 权限。
    您可以使用下面的示例进行简单授权:
    更多详细授权请参考MongoDB数据库用户角色。

use admin;
db.createUser({
      
  user: "flinkuser",
  pwd: "flinkpw",
  roles: [
    {
       role: "read", db: "admin" }, //read role includes changeStream privilege 
    {
       role: "readAnyDatabase", db: "admin" } //for snapshot reading
  ]
});

3、配置MongoDB副本集

创建mongo1.conf、mongo2.conf、mongo3.conf

# mongo1.conf
dbpath=/data/mongodb-4.4.13/data1
logpath=/data/mongodb-4.4.13/mongo1.log
logappend=true
port=27017
replSet=replicaSet_penngo  # 副本集名称
oplogSize=200
# mongo2.conf
dbpath=/data/mongodb-4.4.13/data2
logpath=/data/mongodb-4.4.13/mongo2.log
logappend=true
port=27018
replSet=replicaSet_penngo  # 副本集名称
oplogSize=200
# mongo3.conf
dbpath=/data/mongodb-4.4.13/data3
logpath=/data/mongodb-4.4.13/mongo3.log
logappend=true
port=27019
replSet=replicaSet_penngo  # 副本集名称
oplogSize=200

启动mongodb服务端
在单独的终端上分别运行以下命令:

> mongod --config ../mongo1.conf
> mongod --config ../mongo2.conf
> mongod --config ../mongo3.conf

连接mongodb,使用mongo shell配置副本集

> mongo --port 27017
# 在mongo shell中执行下边命令初始化副本集
> rsconf = {
 _id: "replicaSet_penngo",
 members: [
 {_id: 0, host: "localhost:27017"},
 {_id: 1, host: "localhost:27018"},
 {_id: 2, host: "localhost:27019"}
 ]
 }
> rs.initiate(rsconf)

mongo shell中创建数据库penngo_db和集合coll,插入1000条数据

> use penngo_db
> for (i=0; i<1000; i++) {db.coll.insert({user: "penngo" + i})}
> db.coll.count()

在这里插入图片描述

在mongo shell创建新用户,给Flink MongoDB CDC使用

> use admin;
> db.createUser({
  user: "flinkuser",
  pwd: "flinkpw",
  roles: [
    { role: "read", db: "admin" }, //read role includes changeStream privilege 
    { role: "readAnyDatabase", db: "admin" } //for snapshot reading
  ]
});

4、创建maven工程

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.penngo.flinkcdc</groupId>
  <artifactId>FlickCDC</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>FlickCDC_TEST</name>
  <url>https://21doc.net/</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink-version>1.13.3</flink-version>
    <flink-cdc-version>2.1.1</flink-cdc-version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>${flink-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>${flink-cdc-version}</version>
    </dependency>
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mongodb-cdc</artifactId>
      <version>${flink-cdc-version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>${maven.compiler.source}</source>
          <target>${maven.compiler.target}</target>
          <encoding>${project.build.sourceEncoding}</encoding>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <repositories>
    <repository>
      <id>alimaven</id>
      <name>Maven Aliyun Mirror</name>
      <url>https://maven.aliyun.com/repository/central</url>
    </repository>
  </repositories>
</project>

MongoDBExample.java

package com.penngo.flinkcdc;

import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class MongoDBExample {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.通过FlinkCDC构建SourceFunction
        SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
                .hosts("127.0.0.1:27017")
                .username("flinkuser")
                .password("flinkpw")
                .database("penngo_db")
                .collection("coll")
//                .databaseList("penngo_db")
//                .collectionList("coll")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        DataStreamSource<String> dataStreamSource = env.addSource(mongoDBSourceFunction);

        SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {
            @Override
            public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {
                try {
                    System.out.println("processElement=====" + value);
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        dataStreamSource.print("原始流--");
        env.execute("Mongo");
    }
}

运行效果

四、Flink SQL CDC 监控MongoDB

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐