1、rxjava异步Consumer

导入fxjava包:

implementation 'io.reactivex.rxjava3:rxjava:3.0.6'
// rxBinding已经依赖了rxJava,可以无需再重复依赖rxJava
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'

使用:

getAsync(res -> {
            System.out.println("返回执行:" + res);
        });

异步方法:

public void getAsync(Consumer<String> consumer) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                 try {
                    Thread.sleep(5000); // 延时5s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
                    consumer.accept("返回成功"); // 关键代码
                }
            }
        }).start();

    }

异步延时5s返回,打印 “返回执行返回成功”

2、合并两个异步都完成才往下运行

public void loadData(){
        Observable.zip(observableOne(), observableTwo(), new BiFunction<String, String, String>() {
            @Override
            public String apply(String a, String b) {
                return a + b;//合并数据
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String o) {
                Log.d("TAG==>", o);//两个异步都运行结束才执行这里
            }
        });
    }



    /**
     * 异步一
     * @return
     */
    private Observable observableOne(){
        Observable observable1 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                SystemClock.sleep(5000);
                emitter.onNext("我是第一");
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        return observable1;
    }

    /**
     * 异步二
     * @return
     */
    private Observable observableTwo(){
        Observable observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("我是第二");
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        return  observable2;
    }

3、实现数据缓存优先加载

分别从缓存,硬盘,网络后台获取数据,只要其中一个有数据,后续就停止进行

String memoryCache;
String diskCache="我是硬盘数据";


 
public void loadData() {
        Observable.concat(memoryData(), diskData(), networkData())
                .firstElement()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String o) {
                        Log.d("获取到的数据为==>", o);
                    }
                });
    }


    /**
     * 异步一:从缓存获取数据
     *
     * @return
     */
    private Observable memoryData() {
        Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                // 判断是否有数据
                if (!TextUtils.isEmpty(memoryCache)) {
                    emitter.onNext(memoryCache);
                } else {
                    emitter.onComplete();
                }
            }
        });
        return memory;
    }

    /**
     * 异步二:从硬盘获取数据
     *
     * @return
     */
    private Observable diskData() {
        Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                // 判断是否有数据
                if (!TextUtils.isEmpty(diskCache)) {
                    emitter.onNext(diskCache);
                } else {
                    emitter.onComplete();
                }
            }
        });
        return disk;
    }

    /**
     * 异步三:模仿网络请求
     *
     * @return
     */
    private Observable networkData() {
        Observable<String> network = Observable.defer(new Supplier<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> get() {
               SystemClock.sleep(5000);//延时5秒,模仿网络请求
                return Observable.just("我是网络数据");
            }
        });
        return network;
    }

4、网络请求失败重试

方式一(协程):

    fun main() = runBlocking {
        val maxRetries = 3
        val retryDelayMillis = 1000L
        try {
            val result = retryWhen<String>(maxRetries, retryDelayMillis) {
                performNetworkRequest()
            }
            println("成功Result: $result")
        } catch (e: Throwable) {
            Log.e("TAG", "重试3次都失败了:" + e.message)
            return@runBlocking
        }
    }

    suspend fun performNetworkRequest(): String {
        // 模拟网络请求
        delay(1000)
        // 假设请求失败
        throw IOException("Network request failed")
    }

    suspend inline fun <T> retryWhen(
        maxRetries: Int = 3,
        delayMillis: Long,
        crossinline block: suspend () -> T
    ): T {
        var retries = 0
        var lastException: Throwable? = null

        while (retries < maxRetries) {
            try {
                return block()
            } catch (e: Throwable) {
                Log.e("TAG", "重试${retries}次,catch异常为:" + e.message)
                lastException = e
                retries++
                delay(delayMillis)
            }
        }
        throw lastException ?: IllegalStateException("Unexpected error occurred")
    }

方式二:

Single.create((SingleOnSubscribe<Integer>) emitter -> {
            //这里用网络请求
            emitter.onSuccess(1);//请求成功
//                emitter.onError(new RuntimeException("请求接口异常..."));//请求失败
        }).retry((integer, throwable) -> {
            Log.d("wangyao", "检测是否需要重试:" + integer);//失败一次integer会自动+1
            return integer < 3;
        }).subscribe(integer -> {
            Log.d("wangyao", "请求成功:" + integer);
        }, throwable -> {
            Log.d("wangyao", "请求失败:" + throwable);
        });

方式三:

infoApi.getUserInfo()
                        //总共重试3次,重试间隔3000毫秒
                        .retryWhen(new RetryWithDelay(3, 3000))
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribeOn(Schedulers.io())
                        .subscribe(new Action1<Response>() {
                            @Override
                            public void call(Response response) {
                                String content = new String(((TypedByteArray) response.getBody()).getBytes());
                                printLog(tvLogs, "", content);
                            }
                        }, new Action1<Throwable>() {
                            @Override
                            public void call(Throwable throwable) {
                                throwable.printStackTrace();
                            }
                        });

方式四:

/** 1、按钮事件 **/
    fun btTest(view: View) {
        HeartRequest().request()
    }

    /** 2、内部类:有模拟网络请求 */
    inner class HeartRequest {
        private val MAX_UPLOAD_TIMES = 3
        private var uploadTimes = 0
        private val mHandler = Handler(Looper.getMainLooper())
        private val mRunnable = Runnable {
            try {
                request()
            } catch (e: Exception) {
                throw RuntimeException(e)
            }
        }

        fun request() {
            // 模拟网络请求
            val future = asyncTask { result ->
                when (result) {
                    Result.SUCCESS -> { // 请求成功
                        Log.e("TAG", "成功===")
                        uploadTimes = 0
                        mHandler.removeCallbacks(mRunnable)
                    }
                    Result.FAILURE -> { // 请求失败
                        Log.e("TAG", "失败===")
                        if (uploadTimes < MAX_UPLOAD_TIMES) {
                            try {
                                uploadTimes++
                                mHandler.postDelayed(mRunnable, 5000)
                            } catch (e: java.lang.Exception) {
                                throw java.lang.RuntimeException(e)
                            }
                        } else { // 重试3次都失败
                            uploadTimes = 0
                            mHandler.removeCallbacks(mRunnable)
                            Log.e("TAG", "3次都失败了===")
                        }
                    }
                }
            }
            // 等待异步任务完成
            future?.get(10, TimeUnit.SECONDS)
        }
    }

    /** 3、 (模拟网络请求异步返回结果)定义一个异步任务:随机返回成功或者失败 */
    fun asyncTask(callback: (Result) -> Unit): Future<*>? {
        val executorService = Executors.newSingleThreadExecutor()
        val future = executorService.submit {
            // 模拟一些异步操作
            Thread.sleep(1000)
            val result = if (Math.random() < 0.5) {
                Result.SUCCESS
            } else {
                Result.FAILURE
            }
            callback(result)
        }
        executorService.shutdown()
        return future
    }

    // 定义一个枚举类型表示结果
    enum class Result { SUCCESS, FAILURE }

5、复杂的异步

项目要求:

刚进入页面就进行连接(异步返回结果:失败、成功、连接中),点击按钮的时候,有几种状态: 1、连接失败--重新开始连接

        1.1 连接成功 --调阅读的方法

        1.2 连接失败 --UI进行提示失败

2、连接中

         2.1 连接成功 --调阅读的方法

        2.2 连接失败 --UI进行提示失败

3、连接成功 --调阅读的方法

实现的代码为:

class MainActivity : AppCompatActivity() {
    private val subHandle = SubHandle()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        subHandle.mConsumer = null
        subHandle.connect().subscribe()
    }

    fun btRead(view: View) {
        subHandle.handleStatus {
            subHandle.read()
        }
    }

    /**
     * 刚进入页面就进行连接,点击按钮的时候,有几种状态:
     * 1、连接失败--重新开始连接,
     *      1.1 连接成功 --调阅读的方法
     *      1.2 连接失败 --UI进行提示失败
     * 2、连接中
     *      2.1 连接成功 --调阅读的方法
     *      2.2 连接失败 --UI进行提示失败
     * 3、连接成功 --调阅读的方法
     */
    class SubHandle {
        var mConsumer: ((Int) -> Unit)? = null
        private var status = AtomicInteger(-1) // 0连接失败 1正在连接中 2连接成功
        private var disposable: Disposable? = null

        fun connect(): Observable<Int> {
            status.set(1)
            Log.e("TAG", "=连接=")
            return Observable.interval(5, TimeUnit.SECONDS)
                .take(1)
                .map {
                    val random = Random(System.currentTimeMillis())
                    val randomNumber = random.nextInt(3) // 生成一个0到2之间的随机整数
                    Log.e("TAG", "==funA输出$randomNumber")
                    randomNumber
                }
                .subscribeOn(Schedulers.io())
                .doOnNext {
                    if (it == 2) {
                        status.set(2)
                        mConsumer?.invoke(status.get())
                    } else {
                        status.set(0)
                        Log.e("TAG", "连接阅读器失败,给UI提示")
                    }
                }
        }

        fun handleStatus(consumer: (Int) -> Unit) {
            mConsumer = consumer
            when (status.get()) {
                0 -> {
                    Log.e("TAG", "连接失败过,正重试连接")
                    disposable?.dispose()
                    disposable = connect().subscribe()
                }
                1 -> Log.e("TAG", "正在连接")
                2 -> mConsumer?.invoke(status.get())
            }
        }

        fun read() {
            Log.e("TAG", "开始阅读")
        }
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐