Java Flux流 从0到精通

Handsome
2025-02-13
点 赞
2
热 度
143
评 论
1

文章摘要

智阅GPT

初识Flux:数据世界的自来水管

1.1 当数据变成水流


想象你家的自来水管就是Flux的化身。传统的水桶打水(List)需要一次性搬完所有水,而Flux就像安装了智能水龙头:

- 打开开关就持续出水(数据流)

- 可以随时调节水流大小(背压控制)

- 水压不足时会自动通知你(异步回调)

Flux<String> 厨房水管 = Flux.just("清水", "洗洁精", "洗碗水");
厨房水管.subscribe(水分子 -> System.out.println("正在处理:" + 水分子));

1.2 快递站 vs 实体店

传统集合像超市购物(List):

  • 必须等所有商品上架才能购买

  • 买牛奶也得先等面包生产完

Flux则像现代化快递站:

Flux<快递包裹> 每日快递 = Flux.create(快递站 -> {
    快递站.next(晨报包裹());
    快递站.next(鲜奶包裹());
    快递站.next(网购包裹());
});

每日快递中的包裹就像Flux数据流中的元素,快递员每准备好一个包裹就立即派送,不需要等待所有货物都到达快递站后才进行配送,这样更加灵活高效。

Flux核心操作

2.1 咖啡店的点单艺术

假设你是星巴克的咖啡师Flux:

操作符

现实对应

代码示例

map

美式→卡布奇诺

.map(咖啡豆 → 研磨(咖啡豆))

filter

拒绝过期的牛奶

.filter(牛奶 → 牛奶.保质期 > 今天)

buffer

外卖订单批量处理

.buffer(5) → 每5杯打包一次

Flux.just("拿铁","摩卡","浓缩")
    .map(订单 -> 订单 + "加糖")
    .filter(订单 -> !订单.contains("无糖"))
    .buffer(2)
    .subscribe(批量订单 -> System.out.println("制作:" + 批量订单));

2.2 异常处理三连招

像经验丰富的外卖骑手处理突发状况:

  1. onErrorReturn:爆胎时换备用自行车

    .onErrorReturn("外卖延误通知单")
  2. onErrorResume:改派其他骑手接单

    .onErrorResume(e -> 备用骑手.get订单流())
  3. retry:重新尝试配送

    .retry(3) // 最多重试3次

深入数据快递站:订阅者的智慧

3.1 杂志订阅的四种信号

订阅Flux就像订杂志:

厨房水管.subscribe(
    最新期刊 -> System.out.println("收到:" + 最新期刊), // onNext
    退订通知 -> System.err.println("因为:" + 退订通知),  // onError
    () -> System.out.println("本年期刊已全部送达"),       // onComplete
    订阅订单 -> {                                     // onSubscribe
        订阅订单.request(3); // 每次只收3期杂志
    }
);

3.2 背压危机处理:快递仓库的智慧

当消费者处理速度跟不上时:

  1. 直接丢弃(DROP):爆仓时扔掉旧包裹

    .onBackpressureDrop(包裹 -> 记录丢弃(包裹))
  2. 缓存策略(BUFFER):租用临时仓库

    .onBackpressureBuffer(100) // 最大缓存100个
  3. 最新优先(LATEST):只保留最新包裹

    .onBackpressureLatest()

进阶技巧:Flux的七十二变

4.1 冷热Flux之争:电影院 vs 录像带

类型

类比

特点

创建方式

冷Flux

DVD租赁店

每次播放都是全新开始

Flux.just()

热Flux

电影院直播

后来观众只能看实时画面

Flux.share()

实例场景:

Flux<String> 冷直播 = Flux.interval(Duration.ofSeconds(1))
                          .map(i -> "比赛第"+i+"分钟");

// 第一个观众看到完整比赛
冷直播.subscribe(观众A);

Thread.sleep(5000);

// 第二个观众从第5分钟开始看
冷直播.subscribe(观众B); 

4.2 Flux组合套餐:数据交响乐团

  1. zip操作:咖啡+甜点套餐

    Flux<String> 咖啡 = Flux.just("美式", "拿铁");
    Flux<String> 甜点 = Flux.just("蛋糕", "马卡龙");
    
    Flux.zip(咖啡, 甜点)
        .map(tuple -> tuple.getT1() + "+" + tuple.getT2())
        .subscribe(System.out::println);
    // 输出:美式+蛋糕,拿铁+马卡龙

merge操作:双厨房出餐

Flux.merge(中餐厨房, 西餐厨房)
    .subscribe(服务员::上菜);

实战演练:外卖平台订单系统

场景需求:

  • 实时接收订单(每秒100+)

  • 智能过滤无效订单

  • 批量处理(每10单打包)

  • 动态限流(根据餐厅产能)

Flux<订单> 订单流 = 骑手APP.get实时订单流()
    .filter(订单 -> 订单.验证有效性())
    .map(订单 -> 订单.添加时间戳())
    .bufferTimeout(10, Duration.ofSeconds(5))
    .onBackpressureDrop(爆单处理::记录丢弃订单)
    .doOnNext(批量订单 -> 厨房系统.接收订单(批量订单))
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));

系统效果:

  1. 像智能水流控制阀门一样管理订单

  2. 高峰期自动开启限流保护

  3. 网络波动时自动重试

  4. 厨房永远只处理合理数量的订单

结语:成为数据管道的掌控者

Flux就像城市地下的智能水管网络:

  • 知道何时加速(concatMap

  • 懂得何时分流(flatMap

  • 具备自我修复能力(retry

  • 拥有流量感知(背压控制)

当你真正理解Flux时,数据将不再是呆板的0和1,而是可以像水流一样被精确调控的生命体。记住,好的响应式系统应该像优秀的市政供水系统——既不会让用户渴死,也不会让水管爆裂。


心若有所向往,何惧道阻且长

Handsome

infp 调停者

站长

具有版权性

请您在转载、复制时注明本文 作者、链接及内容来源信息。 若涉及转载第三方内容,还需一同注明。

具有时效性

目录

欢迎来到Handsome的站点,为您导航全站动态

17 文章数
4 分类数
21 评论数
17标签数

访问统计