flink-addSource和addSink分别是kafka、自定义数据、mysql、hbase的java实现
flink主程序public class FinkTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteri
·
flink主程序
public class FinkTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//摄入时间
//env.enableCheckpointing(5000);
//创建kafka-topic
Properties p = LoadResourcesUtils.getProperties("kafka.properties");
String inputTopic = p.getProperty("source.inputTopic");
String outputTopic = p.getProperty("source.outputTopic");
//kafka addSource
DataStream<String> kafkaStream = env.addSource(KafkaStreamBuilder.kafkaConsumer(inputTopic));
//kafka addSink
kafkaStream.addSink(KafkaSink.KafkaProducer(driversTopicPattern));
//mysql addSink
kafkaStream.addSink(new OrderMySqlSink());
//hbase addSink
kafkaStream..addSink(new HbaseSink(configs.topicOut));
//自定义 addSource
DataStream<String> myStream = env.addSource(new MySource());
//mysql addSource
DataStream<String> driverStream = env.addSource(new MySqlSource());
env.execute("my flink job");
}
}
addSource(kafka)
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
* @author liquan
*
* 构建KafkaStream
*
*/
public class KafkaStreamBuilder {
public static FlinkKafkaConsumer<String> kafkaConsumer(String topics) {
Properties p = LoadResourcesUtils.getProperties("application.properties");
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, p.getProperty("spring.kafka.bootstrap-servers"));
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, p.getProperty("spring.kafka.consumer.group-id"));
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, p.getProperty("spring.kafka.consumer.auto-offset-reset"));
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, p.getProperty("spring.kafka.consumer.enable-auto-commit"));
properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");
// String topics = consumerConfig.getTopics();
List<String> topicsSet = new ArrayList<String>(Arrays.asList(topics.split(",")));
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topicsSet, new SimpleStringSchema(),
properties);//test0是kafka中开启的topic
// myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
return myConsumer;
}
}
addSink(kafka)
import com.shengekeji.simulator.serialization.OutSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class KafkaSink {
public static FlinkKafkaProducer<String> KafkaProducer(String topics) {
Properties p = LoadResourcesUtils.getProperties("application.properties");
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, p.getProperty("spring.kafka.bootstrap-servers"));
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, p.getProperty("spring.kafka.consumer.group-id"));
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, p.getProperty("spring.kafka.consumer.auto-offset-reset"));
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, p.getProperty("spring.kafka.consumer.enable-auto-commit"));
properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");
return new FlinkKafkaProducer<>(topics, new OutSerializationSchema(), properties);
}
}
addSink(mysql)
import com.alibaba.fastjson.JSONObject;
import com.shengekeji.simulator.dao.OrderDao;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ibatis.session.SqlSession;
import com.shengekeji.simulator.model.OrderModel;
public class OrderMySqlSink extends RichSinkFunction<String> {
@Override
public void invoke(String value, Context context) throws Exception {
SqlSession sqlSession = null;
try {
OrderModel order= JSONObject.parseObject(value, OrderModel.class);
sqlSession = MyBatisUtil.openSqlSession();
// 通过SqlSession对象得到Mapper接口的一个代理对象
// 需要传递的参数是Mapper接口的类型
OrderDao dao = sqlSession.getMapper(OrderDao.class);
System.err.println(order);
dao.insert(order);
sqlSession.commit();
}catch (Exception e){
e.printStackTrace();
System.err.println(e.getMessage());
sqlSession.rollback();
}finally {
if (sqlSession != null){
sqlSession.close();
}
}
}
}
注,数据入库时用的mybatis方式,MyBatisUtil,OrderDao,OrderModel根据自己环境自己定义
addSource(自定义)
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;
public class MySource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while(this.isRunning) {
Thread.sleep(6000);
String order = getDriverData();
sourceContext.collect(order);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
//随机产生订单数据
public String getDriverData() {
Properties p = LoadResourcesUtils.getProperties("content.properties");
String driverJson = p.getProperty("source.driverJson");
String value = driverJson ;
if(value.indexOf("%orderId") >= 0){
value = value.replaceAll("%orderId",RandomUtil.getOrderId());
}
if(value.indexOf("%appId") >= 0){
value = value.replaceAll("%appId",RandomUtil.getAppId());
}
if(value.indexOf("%serviceId") >= 0){
value = value.replaceAll("%serviceId",RandomUtil.getServiceId());
}
if(value.indexOf("%passageId") >= 0){
value = value.replaceAll("%passageId",RandomUtil.getPassageId());
}
if(value.indexOf("%driverId") >= 0){
value = value.replaceAll("%driverId",RandomUtil.getDriverId());
}
if(value.indexOf("%startLoclatitude") >= 0){
LngLat startLoc=RandomUtil.getCoordinate();
value = value.replaceAll("%startLoclatitude",Double.toString(startLoc.latitude));
value = value.replaceAll("%startLoclongitude",Double.toString(startLoc.longitude));
}
if(value.indexOf("%endLoclatitude") >= 0){
LngLat endLoc=RandomUtil.getCoordinate();
value = value.replaceAll("%endLoclatitude",Double.toString(endLoc.latitude));
value = value.replaceAll("%endLoclongitude",Double.toString(endLoc.longitude));
}
if(value.indexOf("%loclatitude") >= 0){
LngLat loc=RandomUtil.getCoordinate();
value = value.replaceAll("%loclatitude",Double.toString(loc.latitude));
value = value.replaceAll("%loclongitude",Double.toString(loc.longitude));
}
if(value.indexOf("%flag") >= 0){
value = value.replaceAll("%flag",Integer.toString(RandomUtil.getFlag()));
}
if(value.indexOf("%pushFlag") >= 0){
value = value.replaceAll("%pushFlag",Integer.toString(RandomUtil.getPushFlag()));
}
if(value.indexOf("%state") >= 0){
value = value.replaceAll("%state",Integer.toString(RandomUtil.getState()));
}
if(value.indexOf("%d") >= 0){
value = value.replaceAll("%d", RandomUtil.getNum().toString());
}
if(value.indexOf("%s") >= 0){
value = value.replaceAll("%s", RandomUtil.getStr());
}
if(value.indexOf("%f") >= 0){
value = value.replaceAll("%f",RandomUtil.getDoubleStr());
}
if(value.indexOf("%ts") >= 0){
value = value.replaceAll("%ts",RandomUtil.getTimeStr());
}
if(value.indexOf("%tl") >= 0){
value = value.replaceAll("%tl",RandomUtil.getTimeLongStr());
}
System.out.println(value);
return value;
}
}
addSource(mysql)
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.shengekeji.simulator.dao.BestDispatchDao;
import com.shengekeji.simulator.dao.DriverDao;
import com.shengekeji.simulator.model.DispatchModel;
import com.shengekeji.simulator.model.DriverModel;
import com.shengekeji.simulator.model.GeographyOrder;
import com.shengekeji.simulator.model.PassagesModel;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.ibatis.session.SqlSession;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class MySqlDriverSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
/**
* 此处是代码的关键,要从mysql表中,把数据读取出来
* @param sourceContext
* @throws Exception
*/
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while(this.isRunning) {
Thread.sleep(5000);
System.out.println("--------------------------");
SqlSession sqlSession = null;
Map<String,Object> map = new HashMap<String, Object>();
map.put("appId","SGKJ");
try {
sqlSession = MyBatisUtil.openSqlSession();
// 通过SqlSession对象得到Mapper接口的一个代理对象
// 需要传递的参数是Mapper接口的类型
//司机信息数据
DriverDao driverdao = sqlSession.getMapper(DriverDao.class);
List<DriverModel> drivers = driverdao.selectAllActiveDriver(map);
//处理每个司机
for (DriverModel driver:drivers){
driver.setLoc(new LngLat(locLongitude,locLatitude));
driver.setSendTime(RandomUtil.getTimeStr());
String dr = JSONObject.toJSONString(driver, SerializerFeature.DisableCircularReferenceDetect);
System.out.println(dr);
sourceContext.collect(dr);
}
}catch (Exception e){
e.printStackTrace();
System.err.println(e.getMessage());
sqlSession.rollback();
}finally {
if (sqlSession != null){
sqlSession.close();
}
}
}
}
@Override
public void cancel() {
this.isRunning = false;
}
}
注,数据读取用的mybatis方式,MyBatisUtil,DriverDao,DriverModel根据自己环境自己定义
addSink(hbase)
import com.shengekeji.owl.constant.Constants;
import com.shengekeji.owl.pojo.Message;
import com.shengekeji.owl.util.HBaseUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import java.util.ArrayList;
import java.util.List;
public class HbaseSink extends RichSinkFunction<Message> {
private Integer maxSize = 1000;
private Long delayTime = 5000L;
private String tableName;
public HbaseSink(String tableName) {
this.tableName = tableName;
}
public HbaseSink(Integer maxSize, Long delayTime) {
this.maxSize = maxSize;
this.delayTime = delayTime;
}
private Connection connection;
private Long lastInvokeTime;
private List<Put> puts = new ArrayList<Put>();
// 创建连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = HBaseUtil.getConnection(Constants.ZOOKEEPER_QUORUM,Constants.ZOOKEEPER_PORT);
// 获取系统当前时间
lastInvokeTime = System.currentTimeMillis();
}
@Override
public void invoke(Message value, Context context) throws Exception {
System.out.println(value);
String rk = value.id+"-"+value.ts;
//创建put对象,并赋rk值
Put put = new Put(rk.getBytes());
// 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25
put.addColumn("cf1".getBytes(), "id".getBytes(), value.id.getBytes());
put.addColumn("cf1".getBytes(), "vals".getBytes(), value.vals.getBytes());
put.addColumn("cf1".getBytes(), "p".getBytes(), (value.p+"").getBytes());
put.addColumn("cf1".getBytes(), "ts".getBytes(), (value.ts+"").getBytes());
System.out.println("----------");
System.out.println(put);
puts.add(put);// 添加put对象到list集合
//使用ProcessingTime
long currentTime = System.currentTimeMillis();
System.out.println(currentTime - lastInvokeTime);
//开始批次提交数据
if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {
//获取一个Hbase表
Table table = connection.getTable(TableName.valueOf(tableName));
table.put(puts);//批次提交
puts.clear();
lastInvokeTime = currentTime;
table.close();
}
}
@Override
public void close() throws Exception {
connection.close();
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)