flink主程序

public class FinkTest {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//摄入时间
        //env.enableCheckpointing(5000);
        //创建kafka-topic
        Properties p = LoadResourcesUtils.getProperties("kafka.properties");
        String inputTopic = p.getProperty("source.inputTopic");
        String outputTopic = p.getProperty("source.outputTopic");

        //kafka addSource
        DataStream<String> kafkaStream = env.addSource(KafkaStreamBuilder.kafkaConsumer(inputTopic));
        
        //kafka addSink
        kafkaStream.addSink(KafkaSink.KafkaProducer(driversTopicPattern));
        
        //mysql addSink
        kafkaStream.addSink(new OrderMySqlSink());
        
        //hbase addSink
       kafkaStream..addSink(new HbaseSink(configs.topicOut));
       
		//自定义 addSource
        DataStream<String> myStream = env.addSource(new MySource());

		//mysql addSource
        DataStream<String> driverStream = env.addSource(new MySqlSource());
        
        env.execute("my flink job");

    }
}

addSource(kafka)

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * @author liquan
 *
 *	构建KafkaStream
 *
 */
public class KafkaStreamBuilder {

	public static FlinkKafkaConsumer<String> kafkaConsumer(String topics) {
		Properties p = LoadResourcesUtils.getProperties("application.properties");
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
		properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, p.getProperty("spring.kafka.bootstrap-servers"));
		properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, p.getProperty("spring.kafka.consumer.group-id"));
		properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, p.getProperty("spring.kafka.consumer.auto-offset-reset"));
		properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, p.getProperty("spring.kafka.consumer.enable-auto-commit"));
		properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");
//		String topics = consumerConfig.getTopics();
		List<String> topicsSet = new ArrayList<String>(Arrays.asList(topics.split(",")));
		FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topicsSet, new SimpleStringSchema(),
				properties);//test0是kafka中开启的topic
//		myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
		return myConsumer;
	}
}

addSink(kafka)

import com.shengekeji.simulator.serialization.OutSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class KafkaSink {

    public static FlinkKafkaProducer<String> KafkaProducer(String topics) {
        Properties p = LoadResourcesUtils.getProperties("application.properties");
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, p.getProperty("spring.kafka.bootstrap-servers"));
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, p.getProperty("spring.kafka.consumer.group-id"));
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, p.getProperty("spring.kafka.consumer.auto-offset-reset"));
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, p.getProperty("spring.kafka.consumer.enable-auto-commit"));
        properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");

        return new FlinkKafkaProducer<>(topics, new OutSerializationSchema(), properties);
    }
}

addSink(mysql)

import com.alibaba.fastjson.JSONObject;
import com.shengekeji.simulator.dao.OrderDao;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ibatis.session.SqlSession;
import com.shengekeji.simulator.model.OrderModel;

public class OrderMySqlSink extends RichSinkFunction<String> {

    @Override
    public void invoke(String value, Context context) throws Exception {
        SqlSession sqlSession = null;
        try {
            OrderModel order= JSONObject.parseObject(value, OrderModel.class);
            sqlSession = MyBatisUtil.openSqlSession();
            // 通过SqlSession对象得到Mapper接口的一个代理对象
            // 需要传递的参数是Mapper接口的类型
            OrderDao dao = sqlSession.getMapper(OrderDao.class);
            System.err.println(order);
            dao.insert(order);
            sqlSession.commit();

        }catch (Exception e){
            e.printStackTrace();
            System.err.println(e.getMessage());
            sqlSession.rollback();

        }finally {

            if (sqlSession != null){
                sqlSession.close();
            }
        }
    }
}

注,数据入库时用的mybatis方式,MyBatisUtil,OrderDao,OrderModel根据自己环境自己定义

addSource(自定义)

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Properties;

public class MySource implements SourceFunction<String> {

    private static final long serialVersionUID = 1L;

    private volatile boolean isRunning = true;


    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        while(this.isRunning) {
            Thread.sleep(6000);
            String order = getDriverData();
            sourceContext.collect(order);
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    //随机产生订单数据
    public String getDriverData() {
        Properties p = LoadResourcesUtils.getProperties("content.properties");

        String driverJson = p.getProperty("source.driverJson");

        String value = driverJson ;
        if(value.indexOf("%orderId") >= 0){
            value = value.replaceAll("%orderId",RandomUtil.getOrderId());
        }
        if(value.indexOf("%appId") >= 0){
            value = value.replaceAll("%appId",RandomUtil.getAppId());
        }
        if(value.indexOf("%serviceId") >= 0){
            value = value.replaceAll("%serviceId",RandomUtil.getServiceId());
        }
        if(value.indexOf("%passageId") >= 0){
            value = value.replaceAll("%passageId",RandomUtil.getPassageId());
        }
        if(value.indexOf("%driverId") >= 0){
            value = value.replaceAll("%driverId",RandomUtil.getDriverId());
        }
        if(value.indexOf("%startLoclatitude") >= 0){
            LngLat startLoc=RandomUtil.getCoordinate();
            value = value.replaceAll("%startLoclatitude",Double.toString(startLoc.latitude));
            value = value.replaceAll("%startLoclongitude",Double.toString(startLoc.longitude));
        }
        if(value.indexOf("%endLoclatitude") >= 0){
            LngLat endLoc=RandomUtil.getCoordinate();
            value = value.replaceAll("%endLoclatitude",Double.toString(endLoc.latitude));
            value = value.replaceAll("%endLoclongitude",Double.toString(endLoc.longitude));
        }
        if(value.indexOf("%loclatitude") >= 0){
            LngLat loc=RandomUtil.getCoordinate();
            value = value.replaceAll("%loclatitude",Double.toString(loc.latitude));
            value = value.replaceAll("%loclongitude",Double.toString(loc.longitude));
        }
        if(value.indexOf("%flag") >= 0){
            value = value.replaceAll("%flag",Integer.toString(RandomUtil.getFlag()));
        }
        if(value.indexOf("%pushFlag") >= 0){
            value = value.replaceAll("%pushFlag",Integer.toString(RandomUtil.getPushFlag()));
        }
        if(value.indexOf("%state") >= 0){
            value = value.replaceAll("%state",Integer.toString(RandomUtil.getState()));
        }
        if(value.indexOf("%d") >= 0){
            value = value.replaceAll("%d", RandomUtil.getNum().toString());
        }
        if(value.indexOf("%s") >= 0){
            value = value.replaceAll("%s", RandomUtil.getStr());
        }
        if(value.indexOf("%f") >= 0){
            value = value.replaceAll("%f",RandomUtil.getDoubleStr());
        }
        if(value.indexOf("%ts") >= 0){
            value = value.replaceAll("%ts",RandomUtil.getTimeStr());
        }
        if(value.indexOf("%tl") >= 0){
            value = value.replaceAll("%tl",RandomUtil.getTimeLongStr());
        }

        System.out.println(value);

        return value;
    }
}

addSource(mysql)

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.shengekeji.simulator.dao.BestDispatchDao;
import com.shengekeji.simulator.dao.DriverDao;
import com.shengekeji.simulator.model.DispatchModel;
import com.shengekeji.simulator.model.DriverModel;
import com.shengekeji.simulator.model.GeographyOrder;
import com.shengekeji.simulator.model.PassagesModel;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.ibatis.session.SqlSession;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


public class MySqlDriverSource implements SourceFunction<String> {

    private static final long serialVersionUID = 1L;

    private volatile boolean isRunning = true;

    /**
     * 此处是代码的关键,要从mysql表中,把数据读取出来
     * @param sourceContext
     * @throws Exception
     */

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        while(this.isRunning) {
            Thread.sleep(5000);
            System.out.println("--------------------------");
            SqlSession sqlSession = null;
            Map<String,Object> map = new HashMap<String, Object>();
            map.put("appId","SGKJ");
            try {
                sqlSession = MyBatisUtil.openSqlSession();
                // 通过SqlSession对象得到Mapper接口的一个代理对象
                // 需要传递的参数是Mapper接口的类型
                //司机信息数据
                DriverDao driverdao = sqlSession.getMapper(DriverDao.class);
                List<DriverModel> drivers = driverdao.selectAllActiveDriver(map);
                //处理每个司机
                for (DriverModel driver:drivers){
                    driver.setLoc(new LngLat(locLongitude,locLatitude));
                    driver.setSendTime(RandomUtil.getTimeStr());
                    String dr = JSONObject.toJSONString(driver, SerializerFeature.DisableCircularReferenceDetect);
                    System.out.println(dr);
                    sourceContext.collect(dr);
                }

            }catch (Exception e){
                e.printStackTrace();
                System.err.println(e.getMessage());
                sqlSession.rollback();
            }finally {

                if (sqlSession != null){
                    sqlSession.close();
                }

            }
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

}

注,数据读取用的mybatis方式,MyBatisUtil,DriverDao,DriverModel根据自己环境自己定义

addSink(hbase)

import com.shengekeji.owl.constant.Constants;
import com.shengekeji.owl.pojo.Message;
import com.shengekeji.owl.util.HBaseUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.util.ArrayList;
import java.util.List;

public class HbaseSink extends RichSinkFunction<Message> {
    private Integer maxSize = 1000;
    private Long delayTime = 5000L;
    private String tableName;

    public HbaseSink(String tableName) {
        this.tableName = tableName;
    }

    public HbaseSink(Integer maxSize, Long delayTime) {
        this.maxSize = maxSize;
        this.delayTime = delayTime;
    }

    private Connection connection;
    private Long lastInvokeTime;
    private List<Put> puts = new ArrayList<Put>();

    // 创建连接
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        connection = HBaseUtil.getConnection(Constants.ZOOKEEPER_QUORUM,Constants.ZOOKEEPER_PORT);
        // 获取系统当前时间
        lastInvokeTime = System.currentTimeMillis();
    }

    @Override
    public void invoke(Message value, Context context) throws Exception {

        System.out.println(value);
        String rk = value.id+"-"+value.ts;
        //创建put对象,并赋rk值
        Put put = new Put(rk.getBytes());

        // 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25
        put.addColumn("cf1".getBytes(), "id".getBytes(), value.id.getBytes());
        put.addColumn("cf1".getBytes(), "vals".getBytes(), value.vals.getBytes());
        put.addColumn("cf1".getBytes(), "p".getBytes(), (value.p+"").getBytes());
        put.addColumn("cf1".getBytes(), "ts".getBytes(), (value.ts+"").getBytes());
        System.out.println("----------");
        System.out.println(put);
        puts.add(put);// 添加put对象到list集合

        //使用ProcessingTime
        long currentTime = System.currentTimeMillis();

        System.out.println(currentTime - lastInvokeTime);
        //开始批次提交数据
        if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {

            //获取一个Hbase表
            Table table = connection.getTable(TableName.valueOf(tableName));
            table.put(puts);//批次提交

            puts.clear();

            lastInvokeTime = currentTime;
            table.close();
        }
    }

    @Override
    public void close() throws Exception {
        connection.close();
    }

}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐