以下代码通过实现异步IO来访问Redis,但要注意:并不是只要实现了asyncInvoke方法就是异步了,这个方法并不是异步的,而是要依靠这个方法里面所写的查询是异步的才可以。

在实现flink异步IO的时候一定要注意。官方文档也给出了相关的说明

以下这个例子中使用的是Jedis,但Jedis的实现不是异步的。所以,以下的代码只是套用了flink异步IO的框子,并没有实现真正的异步。

package DoAsyncIO;

/**
 * 通过异步io来读写redis。在进行异步读写redis时,使用的是线程池模式。
 */
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class DoAsyncIOTest1 {
		// 使用异步IO读写Redis的主流程
    public static void main(String args[]) throws Exception{
    		// 创建流执行环境
        final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置checkpoingt时间
        senv.enableCheckpointing(5000);

				// 流数据输入
        DataStream<Tuple3<String,String,Long>> input = senv.fromElements(NAMES);

				// 通过异步IO访问外部系统。注意:这里使用的是无序处理模式
        DataStream<String> resultStream = AsyncDataStream
                .unorderedWait(input, new AsyncRedisFunc(),1000, TimeUnit.MICROSECONDS, 100);
        // 打印异步IO处理的中间结果。
        // 注意:这里可以使用该结果继续后面的处理,我这里只是简单的打印
        resultStream.print("result: ");
        senv.execute("AsyncIORedis Test");
    }

    public static final Tuple3[] NAMES = new Tuple3[]{
            Tuple3.of("key1","张三",100L),
            Tuple3.of("key2","李四",78L),
            Tuple3.of("key3","王五",99L),
            Tuple3.of("key4","赵六1",81L),
            Tuple3.of("key5","小七",59L),
            Tuple3.of("key6","小八",97L),
            Tuple3.of("key7","赵六2",81L),
            Tuple3.of("key8","七七",59L),
            Tuple3.of("key9","赵六3",81L),
            Tuple3.of("key10","七七",59L),
            Tuple3.of("key11","七七",59L),
            Tuple3.of("key12","七七",59L),
    };
}


// 异步IO处理类
// 主要实现三个函数:open,close,asyncInvoke等
class AsyncRedisFunc extends RichAsyncFunction<Tuple3<String,String,Long>, String> {
    private transient JedisPool pool;
    transient ExecutorService executorService;

    final int numElements = 8;
    final long timeout = 1000L;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        executorService = Executors.newFixedThreadPool(numElements);
        pool= new JedisPool(new JedisPoolConfig(), "localhost", 6379);
    }

    @Override
    public void close() throws Exception {
        super.close();
        executorService.shutdownNow();
    }

    @Override
    public void asyncInvoke(Tuple3<String, String, Long> sdata, ResultFuture<String> resultFuture) throws Exception {
        executorService.submit(() -> {
            String key = sdata.f0;
            Long val = sdata.f2;

            if (val < 60L) { // 过滤:若数据中的元素的值<60,不处理
                return ;
            } else {
                Jedis jedis = pool.getResource();
                if (jedis.get(key) != null) {  // 获取redis中key存在的元素的值并返回
                    String oval = jedis.get(key);
                    //System.out.println("oval = " + oval);
                    resultFuture.complete(Collections.singleton(oval));
                } else {
                    jedis.set(key, String.valueOf(val));
                }
            }
        });
    }

}

注意:以上只是为了示意如何使用redis和异步IO,前面说过,该代码不一定是异步的,所以要实现真正的异步,还需要对以上代码进行优化,测试。

由于Jedis不是异步的,要想实现真正的异步,可以使用Redisson。

小结

​ 本文通过一个例子来说明如何在Flink的异步IO框架中访问redis。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐