Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

43.4. RestController

43.4.1. Post 数据

Webflux 不支持 @RequestParam 获取 Form 表单数据

		
    @PostMapping("post")
    public String post(@RequestParam("param") String test) {
        log.info(test);
        return test;
    }
		
		

有一种方法可以获取 Form 提交过来的数据

		
    @PostMapping("post")
    public Mono<MultiValueMap<String, String>> post(ServerWebExchange exchange) {
        Mono<MultiValueMap<String, String>> fromData = exchange.getFormData();
        log.info(fromData.toString());
        return fromData;
    }
		
		
		
    @PostMapping("post")
    public Mono<MultiValueMap<String, String>> post(ServerWebExchange exchange) {
        Mono<MultiValueMap<String, String>> fromData = exchange.getFormData();
        Disposable subscribe = fromData.subscribe(multiValueMap -> {
            log.info(multiValueMap.toString());

            Map<String, String> singleValueMap = multiValueMap.toSingleValueMap();
            log.info(singleValueMap.toString());
            log.info("name: {}, nickname:{}", singleValueMap.get("name"), singleValueMap.get("nickname"));
        });

        return fromData;
    }
		
		

获取 Form 数据

		
    @PostMapping("post")
    public Mono<Map<String, String>> post(ServerWebExchange serverWebExchange) {
        return serverWebExchange.getFormData()
                .flatMap(formData -> {
                    Map<String, String> singleValueMap = formData.toSingleValueMap();
                    log.info(singleValueMap.toString());
                    return Mono.just(singleValueMap);
                });
    }
		
		

43.4.2. Thymeleaf

43.4.2.1. 模板引擎 Thymeleaf 依赖

			
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>
			
			

43.4.2.2. application.properties 相关的配置

			
spring.thymeleaf.cache=true # Enable template caching.
spring.thymeleaf.check-template=true # Check that the template exists before rendering it.
spring.thymeleaf.check-template-location=true # Check that the templates location exists.
spring.thymeleaf.enabled=true # Enable Thymeleaf view resolution for Web frameworks.
spring.thymeleaf.encoding=UTF-8 # Template files encoding.
spring.thymeleaf.excluded-view-names= # Comma-separated list of view names that should be excluded from resolution.
spring.thymeleaf.mode=HTML5 # Template mode to be applied to templates. See also StandardTemplateModeHandlers.
spring.thymeleaf.prefix=classpath:/templates/ # Prefix that gets prepended to view names when building a URL.
spring.thymeleaf.reactive.max-chunk-size= # Maximum size of data buffers used for writing to the response, in bytes.
spring.thymeleaf.reactive.media-types= # Media types supported by the view technology.
spring.thymeleaf.servlet.content-type=text/html # Content-Type value written to HTTP responses.
spring.thymeleaf.suffix=.html # Suffix that gets appended to view names when building a URL.
spring.thymeleaf.template-resolver-order= # Order of the template resolver in the chain.
spring.thymeleaf.view-names= # Comma-separated list of view names that can be resolved.
			
			

43.4.2.3. Webflux 控制器

			
	@GetMapping("/welcome")
    public Mono<String> hello(final Model model) {
        model.addAttribute("name", "Neo");
        model.addAttribute("city", "深圳");

        String path = "hello";
        return Mono.create(monoSink -> monoSink.success(path));
    }

    @GetMapping("/list")
    public String listPage(final Model model) {
        final Flux<City> citys = cityService.findAllCity();
        model.addAttribute("cityLists", citys);
        return "cityList";
    }
			
			

43.4.2.4. Tymeleaf 视图

welcome.html

			
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8"/>
    <title>欢迎页面</title>
</head>

<body>

<h1 >你好,欢迎来自<p th:text="${city}"></p>的<p th:text="${name}"></p></h1>

</body>
</html>
			
			

cityList.html

			
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8"/>
    <title>城市列表</title>
</head>

<body>

<div>


    <table>
        <legend>
            <strong>城市列表</strong>
        </legend>
        <thead>
        <tr>
            <th>城市编号</th>
            <th>省份编号</th>
            <th>名称</th>
            <th>描述</th>
        </tr>
        </thead>
        <tbody>
        <tr th:each="city : ${cityLists}">
            <td th:text="${city.id}"></td>
            <td th:text="${city.provinceId}"></td>
            <td th:text="${city.name}"></td>
            <td th:text="${city.description}"></td>
        </tr>
        </tbody>
    </table>

</div>

</body>
</html>
			
			

43.4.3. Mono

Mono(返回0或1个元素)/Flux(返回0-n个元素)

43.4.3.1. Mono.just()/Mono.justOrEmpty()

		
Mono.justOrEmpty(Optional.of("Netkiller")).subscribe(System.out::println);
		
			

43.4.3.2. MonoSink 创建 Mono

		
Mono.create(sink -> sink.success("Netkiller")).subscribe(System.out::println);
		
			
		
    @GetMapping("mono")
    public Mono<Object> mono() {
        return Mono.create(monoSink -> {
                    log.info("创建 Mono");
                    monoSink.success("hello webflux");
                })
                .doOnSubscribe(subscription -> { //当订阅者去订阅发布者的时候,该方法会调用
                    log.info("doOnSubscribe={}", subscription);
                }).doOnNext(next -> { //当订阅者收到数据时,改方法会调用
                    log.info("doOnNext={}", next);
                });
    }
		
			

43.4.3.3. Supplier 创建 Mono

从 Supplier 创建 Mono

		
    @GetMapping("/get")
    private Mono<String> get() {
        log.info("start");
        Mono<String> result = Mono.fromSupplier(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
            }
            return "netkiller";
        });
        log.info("end");
        return result;
    }
		
			

43.4.3.4. then()

		
Mono.empty()
    .then()
    .doOnSuccess(i -> System.out.println("On success: " + i))
    .doOnError(i -> System.out.println("On error: " + i))
    .block();

Mono.empty()
    .then(Mono.just("Good bye"))
    .doOnSuccess(i -> System.out.println("On success: " + i))
    .doOnError(i -> System.out.println("On error: " + i))
    .block();

Mono.just("Hello World")
    .then(Mono.just("Good bye"))
    .doOnSuccess(i -> System.out.println("On success: " + i))
    .doOnError(i -> System.out.println("On error: " + i))
    .block();

Mono.error(new RuntimeException("Something wrong"))
    .then(Mono.just("Good bye"))
    .doOnSuccess(i -> System.out.println("On success: " + i))
    .doOnError(i -> System.out.println("On error: " + i))
    .block();

Mono.error(new RuntimeException("Something wrong"))
    .then(Mono.error(new RuntimeException("Something very wrong")))
    .doOnSuccess(i -> System.out.println("On success: " + i))
    .doOnError(i -> System.out.println("On error: " + i))
    .block();
		
			

43.4.3.5. 异常处理

		
Mono.just("Netkiller")
        .concatWith(Mono.error(new IllegalStateException()))
        .subscribe(System.out::println, System.err::println);

		
			
		
Mono.just("Netkiller")
        .concatWith(Mono.error(new IllegalStateException()))
        .onErrorReturn("defaultError")
        .subscribe(System.out::println);
		
			
		
Mono.just("Netkiller")
         .concatWith(Mono.error(new IllegalStateException()))
         .switchOnError(Mono.just("defaultError"))
         .subscribe(System.out::println);

		
			

43.4.3.6. 同步阻塞等待结果

		
        Mono.just("Hello World")
                .doOnNext(i -> System.out.println("On next: " + i))
                .doOnSuccess(i -> System.out.println("On success: " + i))
                .doOnError(i -> System.out.println("On error: " + i))
                .block();

		Mono.empty()
		    .doOnNext(i -> System.out.println("On next: " + i))
		    .doOnSuccess(i -> System.out.println("On success: " + i))
		    .doOnError(i -> System.out.println("On error: " + i))
		    .block();

		Mono.just("Hello World")
		    .mapNotNull(s -> null)
		    .doOnNext(i -> System.out.println("On next: " + i))
		    .doOnSuccess(i -> System.out.println("On success: " + i))
		    .doOnError(i -> System.out.println("On error: " + i))
		    .block();

		Mono.error(new RuntimeException("Something wrong"))
		    .doOnNext(i -> System.out.println("On next: " + i))
		    .doOnSuccess(i -> System.out.println("On success: " + i))
		    .doOnError(i -> System.out.println("On error: " + i))
		    .block();

		
			
		
        Mono.delay(Duration.ofMillis(10)).map(d -> {
            System.out.println(d);
            return d;
        }).block();
        Mono.delay(Duration.ofMillis(10)).map(d -> {
            System.out.println(d);
            return d;
        }).subscribe(System.out::println);
		
			

43.4.4. Flux 返回多条数据

43.4.4.1. FluxSink 创建异步 Flux

		
        Flux<String> flux = Flux.create(sink -> {
            sink.next("Neo");
            sink.next("netkiller");
            sink.complete();
        });
        flux.subscribe(System.out::println);
		
			

43.4.4.2. SynchronousSink 创建同步 Flux

		
        Flux<String> generate = Flux.generate(synchronousSink -> {
            synchronousSink.next("Neo");
            synchronousSink.complete();
        });
        generate.subscribe(System.out::println);
		
			

多个值

		
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
    int value = random.nextInt(100);
    list.add(value);
    sink.next(value);
    if (list.size() == 10) {
        sink.complete();
    }
    return list;
}).subscribe(System.out::println);
		
			

43.4.4.3. just()

		
//单个元素
Flux.just("just").subscribe(System.out::println);
//多个元素
Flux.just("just", "just1", "just2").subscribe(System.out::println);
		
			

43.4.4.4. 从 Flux/Mono 创建 Flux

		
//Flux->Flux
Flux.from(Flux.just("just", "just1", "just2")).subscribe(System.out::println);
//Mono->Mono
Flux.from(Mono.just("just")).subscribe(System.out::println);
		
			

43.4.4.5. 消息订阅

		
Flux.just("Netkiller1", "Netkiller2", "Netkiller3")
        .subscribe(data -> System.out.println("onNext:" + data), err -> {
        }, () -> System.out.println("onComplete"));
		
			

多个订阅者

		
    List<String> list = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Flux<String> flux = Flux.fromIterable(list);
        flux.subscribe(System.out::println);
        flux.subscribe(System.out::println);
        flux.subscribe(System.out::println);
		
			

手动控制

		
	Flux<String> flux = Flux.fromIterable(list);
        ConnectableFlux<String> con = flux.publish();
        con.subscribe(System.out::println);
        con.subscribe(System.out::println);
        con.subscribe(System.out::println);
        // 手动的开启消费数据
        con.connect();
		
			

43.4.4.6. 从 Flux/Mono 创建 Flux

		
Flux.fromArray(new String[] { "arr", "arr", "arr", "arr" }).subscribe(System.out::println);
		
			

43.4.4.7. 从 Iterable 创建 Flux

		
Set<String> v = new HashSet<>();
v.add("1");
v.add("2");
v.add("3");
Flux.fromIterable(() -> v.iterator()).subscribe(System.out::println);
		
			

autoConnect(5) 表示如果订阅者达到5个 就自动开启

		
        Flux<String> auto = flux.publish().autoConnect(5);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
		
			

停止消费策略

		
		// 如果订阅者少于三个就会停止消费数据,直到订阅者达到三个为止
        Flux<String> auto = flux.publish().refCount(3);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);

        Thread.sleep(1000L);

      	// 如果订阅者少于三个且超过十秒没有新的订阅才会停止消费数据
        Flux<String> auto = flux.publish().refCount(3,Duration.ofSeconds(10));
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
		
			

43.4.4.8. 从 Stream 创建 Flux

从 Stream 返回 Flux

		
    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    private Flux<String> getWords() {
        Stream<String> items = Arrays.asList("alpha", "bravo", "charlie").stream();
        return Flux.fromStream(items);
    }
		
			

43.4.4.9. defer()

		
Flux.defer(() -> Flux.just("just", "just1", "just2")).subscribe(System.out::println);
		
			

43.4.4.10. Flux.interval()

		
Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(System.out::println);
Flux.interval(Duration.ofSeconds(2), Duration.ofMillis(200)).subscribe(System.out::println);

Flux.intervalMillis(1000).subscribe(System.out::println);
		
			

43.4.4.11. Flux.empty()

		
Flux.empty().subscribe(System.out::println);
		
			

43.4.4.12. Flux.error()

		
Flux.error(new RuntimeException()).subscribe(System.out::println);
		
			

43.4.4.13. Flux.never()

		
Flux.never().subscribe(System.out::println);
		
			

43.4.4.14. Flux.range()

		
Flux.range(0, 100).subscribe(System.out::println);
		
			

43.4.4.15. 返回数据

返回 List

		
    @GetMapping("flux")
    public Flux<Picture> flux() {
        List<Picture> list = new ArrayList<Picture>();
        IntStream.range(1, 10).forEach(i -> {
            Picture picture = new Picture();
            picture.setId(Long.valueOf(i));
            picture.setImage("https://www.netkiller.cn/images/" + i + ".png");
            list.add(picture);
        });
        return Flux.fromIterable(list);
    }
		
			

返回 Map

		
    @GetMapping("map")
    public Flux<Map.Entry<String, String>> map() {
        Map<String, String> map = new HashMap<>();
        IntStream.range(1, 10).forEach(i -> {
            map.put("key" + i, "value" + i);
        });

        return Flux.fromIterable(map.entrySet());
    }
		
			

43.4.4.16. 持续更新 Flux

	
		AtomicReference<FluxSink<String>> fluxSink = new AtomicReference<FluxSink<String>>();

        Flux<String> response = Flux.create(sink -> {
            fluxSink.set(sink);
        });
        response.subscribe(System.out::println);
        //下发元素
        fluxSink.get().next("Neo");
        fluxSink.get().next("Netkiller");
	
			
		
	Consumer<String> producer;

    @Test
    public void testFluxCreate() {
        Flux.create(sink -> {
            producer = nextData -> sink.next(nextData);
        }).subscribe(e -> System.out.println(e));

        //do something

        //下发元素
        producer.accept("Neo");

        producer.accept("Netkiller");

    }

	FluxSink<String> outSink;
    @Test
    public void testFluxCreate() {
        Flux<String> f = Flux.create(sink -> {
            outSink = sink;
        });
        f.subscribe(e -> System.out.println(e))
        //do something

        //下发元素
        outSink.next("Neo");

        outSink.next("Netkiller");
    }

		
			
	
interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}


Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register(
      new MyEventListener<String>() {

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s);
          }
        }
        public void processComplete() {
            sink.complete();
        }
    });
});
	
			

43.4.4.17. map(), flatMap()

		
        Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> {
                    return i * 2;
                })
                .subscribe(System.out::println);
		
			
		
        Flux.just(1, 2, 3, 4)
                .log()
                .flatMap(e -> {
                    return Flux.just(e * 2).delayElements(Duration.ofSeconds(1));
                })
                .subscribe(System.out::println);
		
			

43.4.4.18. 过滤

		
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        flux.map(num -> num * 2) // 将每个元素乘以2
                .filter(num -> num % 2 == 0) // 过滤掉奇数
                .subscribe(System.out::println);
		
			

43.4.4.19. 触发操作

doOnComplete 完成时执行

	
AtomicBoolean onComplete = new AtomicBoolean();
        Flux<Integer> flux = Flux.just(1, 2, 3);
        flux.log()
                .doOnComplete(() -> onComplete.set(true))
                .doOnComplete(() -> {
                    System.out.println("OK");
                })
                .subscribe(System.out::println);
        System.out.println(onComplete.get());
	
			

43.4.4.20. 并发

	
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
                .publishOn(Schedulers.parallel())
                .flatMap(integer -> {
                    System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
                    return Mono.just(integer);
                }, 5)
                .repeat()
                .subscribe();
	
			
	
    @GetMapping(value = "parallel", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Integer> parallel() {
        return Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
                .publishOn(Schedulers.parallel())
                .flatMap(integer -> {
                    log.info("val:" + integer + ", thread:" + Thread.currentThread().getId());
                    return Mono.just(integer);
                }, 2).repeat(10L);

    }
	
			
	
Flux.just(1,2,3,4,5,6,7,8)
    .parallel(2) // mention number of threads
    .runOn(Schedulers.parallel())
    .map(integer -> {
             System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
             return integer;
        })
    .subscribe();
	
			
	
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
                .publishOn(Schedulers.parallel()) // <- Each flux can be published in a different thread
                .flatMap(integer -> {
                    return Mono.fromCallable(() -> {
                        System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
                        log.info("val:" + integer + ", thread:" + Thread.currentThread().getId());
                        return integer;
                    }).publishOn(Schedulers.parallel()); // <- Each Mono processing every integer can be processed in a different thread
                })
                .repeat()
                .subscribe();
	
			
	
package cn.netkiller.test;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;


public class Test {
    public static void main(String[] args) {

        Flux.create(sink -> {
                    sink.next(Thread.currentThread().getName());
                    sink.complete();
                }).publishOn(Schedulers.single())
                .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
                .publishOn(Schedulers.immediate())
                .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
                .subscribeOn(Schedulers.parallel())
                .toStream()
                .forEach(System.out::println);

    }
}
	
			
	
Flux.just(1,2,3,4,5,6,7,8)
        .publishOn(Schedulers.parallel()) // <- Each flux can be published in a different thread
        .flatMap(integer -> {
            return Mono.fromCallable(() -> {
                 System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
                 return integer;
            }).publishOn(Schedulers.parallel()); // <- Each Mono processing every integer can be processed in a different thread
         })
        .repeat()
        .subscribe();
	
			

43.4.5. SSE

43.4.5.1. 一次性事件

			
    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> createConnectionAndSendEvents() {
        return Flux.just("Alpha", "Omega");
    }
			
			

curl 访问 SSE 需要设置HTTP头 -H "Accept: text/event-stream"

			
neo@MacBook-Pro-M2 ~ % curl -H "Accept: text/event-stream" -X 'GET' 'http://localhost:8080/mock/sse'
data:Alpha

data:Omega
			
			
[提示]提示
Safari 浏览器不支持 SSE推送,微软的 Egde 支持。

43.4.5.2. 从 Steam 返回数据

		
    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    private Flux<String> getWords() {
        Stream<String> items = Arrays.asList("alpha", "bravo", "charlie").stream();
        return Flux.fromStream(items);
    }
		
			

43.4.5.3. 周期性事件

每间隔一秒发送一次数据

			
    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    private Flux<String> getWords() {
        String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
        return Flux
                .zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
                .map(Tuple2::getT1);
    }

	@GetMapping("/random")
	public Flux<ServerSentEvent<Integer>> randomNumbers() {
		return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, ThreadLocalRandom.current().nextInt())).map(data -> ServerSentEvent.<Integer>builder().event("random").id(Long.toString(data.getT1())).data(data.getT2()).build());
	}

	@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<String> streamFlux() {
	    return Flux.interval(Duration.ofSeconds(1))
	      .map(sequence -> "Flux - " + LocalTime.now().toString());
	}
			
			

43.4.5.4. 返回 ServerSentEvent 数据结构

		
    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    private Flux<ServerSentEvent<Object>> getWords() {
        AtomicInteger seq = new AtomicInteger(1);
        return Flux.just("你好", "我是 Neo", "我的昵称是 netkiller")
                .map(data -> ServerSentEvent.builder()
                        .event("hello")
                        .id(String.valueOf(seq.getAndIncrement()))
                        .data(data)
                        .comment("测试数据")
                        .build());
    }
		
			

演示结果

		
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/test/sse
id:1
event:hello
data:你好
:测试数据

id:2
event:hello
data:我是 Neo
:测试数据

id:3
event:hello
data:我的昵称是 netkiller
:测试数据
		
			

FluxSink 发送 ServerSentEvent

		
    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> serverSentEvent() {
        AtomicInteger sequence = new AtomicInteger(1);
        Flux<ServerSentEvent<String>> flux = Flux.create(sink -> {
            sink.next(ServerSentEvent.<String>builder()
                    .id(String.valueOf(sequence.getAndIncrement()))
                    .event("LocalTime")
                    .data(String.valueOf(LocalTime.now()))
                    .build());
            sink.next(ServerSentEvent.<String>builder()
                    .id(String.valueOf(sequence.getAndIncrement()))
                    .event("test")
                    .data("Hello netkiller")
                    .comment("测试")
                    .build());
            sink.complete();
        });
        return flux;
    }
		
			

43.4.5.5. SSE 完整的例子

		
package cn.netkiller.webflux.controller;

import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;

import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

@RestController
@RequestMapping("/sse")
public class SseController {
	private int count_down = 10;

	public SseController() {

	}

	@GetMapping(value = "/launch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<ServerSentEvent<Object>> countDown() {

		return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build());
	}

	private String getCountDownSec() {
		if (count_down > 0) {
			count_down--;
			return "倒计时:" + count_down;
		}
		return "发射";
	}



	@GetMapping("/range")
	public Flux<Object> range() {
		return Flux.range(10, 1).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build());
	}

	// WebFlux 服务器推送(SSE - >Server Send Event)
	@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	private Flux<String> flux() {
		Flux<String> result = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
			}
			logger.info("sse " + i);
			return "flux data -- " + i;
		}));
		return result;
	}
}

		
			

运行结果

		
id:0
event:launch
data:倒计时:9

id:1
event:launch
data:倒计时:8

id:2
event:launch
data:倒计时:7

id:3
event:launch
data:倒计时:6

id:4
event:launch
data:倒计时:5

id:5
event:launch
data:倒计时:4

id:6
event:launch
data:倒计时:3

id:7
event:launch
data:倒计时:2

id:8
event:launch
data:倒计时:1

id:9
event:launch
data:倒计时:0

id:10
event:launch
data:发射

		
			

43.4.5.6. 判断 Flux 已经完成

			
    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> sse() {

        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> ServerSentEvent.builder("data-" + sequence).build())
                .take(10) // 发10条结束

                // ✅ 【正常完成】Flux 正常结束时触发
                .doOnComplete(() -> {
                    System.out.println("✅ SSE Flux 正常完成!");
                })

                // ✅ 【异常完成】发生错误时触发
                .doOnError(ex -> {
                    System.out.println("❌ SSE Flux 异常结束:" + ex.getMessage());
                })

                // ✅ 【最终完成】无论正常/异常/客户端断开,都会进这里!⭐⭐⭐
                .doFinally(signalType -> {
                    System.out.println("🔚 SSE 流最终关闭:" + signalType);

                    // 你可以在这里:释放锁、关闭资源、记录日志
                });
    }			
			
			

输出结果

			
✅ SSE Flux 正常完成!
🔚 SSE 流最终关闭:onComplete
			
			

43.4.5.7. SSE Client 订阅实例

			
 	@GetMapping("/server")
    public Flux<ServerSentEvent<String>> streamEvents() {
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> ServerSentEvent.<String>builder()
                        .id(String.valueOf(sequence))
                        .event("test-event")
                        .data("LocalTime: " + LocalTime.now())
                        .build());
    }

    @GetMapping("/client")
    public void consumeServerSentEvent() {
        WebClient client = WebClient.create("http://localhost:8080");
        ParameterizedTypeReference<ServerSentEvent<String>> type
                = new ParameterizedTypeReference<ServerSentEvent<String>>() {
        };

        Flux<ServerSentEvent<String>> eventStream = client.get()
                .uri("mock/server")
                .retrieve()
                .bodyToFlux(type);

        eventStream.subscribe(
                content -> log.info("Time: {} - event: name[{}], id [{}], content[{}] ",
                        LocalTime.now(), content.event(), content.id(), content.data()),
                error -> log.error("Error receiving SSE: {}", error),
                () -> log.info("Completed!!!"));
    }
			
			

43.4.6. 末尾连接字符串

43.4.6.1. concatWith

		
// 使用 concatWith() 追加另一个 Flux
Flux.just(1, 2, 3)
    .concatWith(Flux.just(4, 5, 6))
    .subscribe(System.out::println);

// 效果等同于 concatWithValues(4, 5, 6)
		
			

43.4.6.2. concatWithValues

		
Flux.just(1, 2, 0)
    .map(num -> 10 / num) // 除以 0 会触发错误
    .onErrorReturn(-1) // 错误时返回 -1
    .concatWithValues(5, 6) // 无论如何都会追加
    .subscribe(System.out::println);

// 输出结果:
// 10
// 5
// -1  <- 错误处理结果
// 5   <- 追加的元素
// 6
		
			
	
public Flux<String> deepseek(String device, String taskId, String speakId, List<Map<String, String>> messages) {

        WebClient webClient = WebClient.builder()
                .baseUrl(url)
                .defaultHeader("Content-Type", "application/json")
                .defaultHeader("Authorization", "Bearer " + authorization)
                .build();

//        List<Map<String, String>> messages = new ArrayList<Map<String, String>>();
//        messages.add(Map.of("role", "user", "content", content));

        log.debug(messages.toString());

        Map<String, Object> requestBody = Map.of(
                "model", model,
                "messages", messages,
                "stream", true
        );

        StringBuffer sentence = new StringBuffer();
        Set<String> symbol = Set.of(".", "!", "?", ",", "。", "!", "?");

        return webClient.post()
                .uri("/chat/completions")
                .bodyValue(requestBody)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(String.class)
//                .filter(chunk -> !chunk.trim().isEmpty())
                .map(chunk -> {
//                    if (chunk.trim().equals("data: [DONE]")) {
//                        return "[END]";
//                    }
                    try {
//                        log.debug("chunk: " + chunk);
                        String jsonString = chunk.replaceFirst("^data: ", "");
                        JsonNode jsonNode = new ObjectMapper().readTree(jsonString);
                        String result = jsonNode.path("choices").path(0).path("delta").path("content").asText();
                        result = result.replace(" ", "\\x20").replace("\n", "\\x0A");
//                        return result != null ? result : "";
//                        log.debug("[" + result + "]");
                        return result;
                    } catch (JsonProcessingException e) {
                        return "";
                    }
                }).concatWithValues("。")
//                .filter(text -> text != null && !text.equals("null") && !text.isEmpty())
                .onErrorResume(e -> {
                    log.error("Error processing stream", e);
                    return Flux.just("[ERROR]");
                });
    }
	
			

43.4.7. Flux scan

计数

	
import reactor.core.publisher.Flux;

public class ScanExample {
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5)
            .scan((total, current) -> total + current) // 累加
            .subscribe(System.out::println);
    }
}

// 输出结果:
// 1    <- 第一个元素直接输出(作为初始值)
// 3    <- 1 + 2
// 6    <- 3 + 3
// 10   <- 6 + 4
// 15   <- 10 + 5
	
		

字符串追加

	
Flux.just("a", "b", "c", "d")
    .scan("", (acc, current) -> acc + current) // 初始值为空字符串
    .subscribe(System.out::println);

// 输出结果:
// ""   <- 初始值
// "a"  <- "" + "a"
// "ab" <- "a" + "b"
// "abc"
// "abcd"
	
		

	
class Counter {
    int evenCount = 0;
    int oddCount = 0;

    @Override
    public String toString() {
        return "偶数: " + evenCount + ", 奇数: " + oddCount;
    }
}

Flux.just(1, 2, 3, 4, 5)
    .scan(new Counter(), (counter, num) -> {
        if (num % 2 == 0) counter.evenCount++;
        else counter.oddCount++;
        return counter;
    })
    .subscribe(System.out::println);

// 输出结果:
// 偶数: 0, 奇数: 0   <- 初始状态
// 偶数: 0, 奇数: 1   <- 处理 1
// 偶数: 1, 奇数: 1   <- 处理 2
// 偶数: 1, 奇数: 2   <- 处理 3
// 偶数: 2, 奇数: 2   <- 处理 4
// 偶数: 2, 奇数: 3   <- 处理 5
	
		

	
Flux.just("a", "b", "c", "d")
                .scan(new StringBuffer(), (sentence, current) -> sentence.append(current)) // 初始值为空字符串
                .skip(1)
                .subscribe(System.out::println);
	
		

43.4.8. ConnectableFlux

	
        AtomicBoolean onComplete = new AtomicBoolean();
        ConnectableFlux<Integer> f = Flux.just(1)
                .doOnComplete(() -> onComplete.set(true))
                .publish();
	
		

43.4.9. 上传文件并保存

		
@PostMapping("/upload/files")
Mono upload(@RequestPart("files") Flux<FilePart> filePartFlux) {
    return filePartFlux.flatMap(file -> file.transferTo(Paths.get(file.filename())))
      .then(Mono.just("OK"))
      .onErrorResume(error -> Mono.just("Error uploading files"));
}		
		
		

获取文件 bytes 字节码

		
@PostMapping("/upload/files/save")
Mono uploadFileSave(@RequestPart("files") Flux<FilePart> filePartFlux) {
    File file = new File();

    return filePartFlux.flatMap(filePart -> filePart.transferTo(Paths.get(filePart.filename()))
      .then(Mono.just(filePart.filename())))
      .collectList()
      .flatMap(filenames -> {
          file.setFilenames(filenames);
          return fileService.save(file);
      })
      .onErrorResume(error -> Mono.error(error));
}		
		
		
		
@PostMapping(value = "upload/{androidId}", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
    public Mono<String> upload(@PathVariable("androidId") String androidId, @RequestPart("file") Mono<FilePart> filePart) {

        return filePart.flatMap(file -> {
                    Mono<byte[]> fileBytes = file.content().map(buffer -> {
                                byte[] bytes = new byte[buffer.readableByteCount()];
                                buffer.read(bytes);
                                DataBufferUtils.release(buffer);
                                return bytes;
                            })
                            .reduce(new ByteArrayOutputStream(), (os, b) -> {
                                os.write(b, 0, b.length);
                                return os;
                            })
                            .map(ByteArrayOutputStream::toByteArray);

                    return fileBytes;
                }).then(Mono.just("OK"))
                .onErrorResume(error -> Mono.just("Error uploading files"));
		
		

获取文件输入流 InputStream

		
@PostMapping(value = "upload/{androidId}", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
    public Mono<JsonResponse<Object>> upload(@PathVariable("androidId") String androidId, @RequestPart("files") Flux<FilePart> fileParts) {
        final Marker marker = MarkerFactory.getMarker(androidId);

        return fileParts.flatMap(filePart -> {
                    return filePart.content()
                            .map(dataBuffer -> {
                                // 将 DataBuffer 转换为 InputStream
                                InputStream inputStream = dataBuffer.asInputStream();
                                // 注意:需要正确处理 InputStream 的使用和关闭
                                return inputStream;
                            }).map(inputStream -> {
                                String datetime = LocalDateTime.now().format(dateTimeFormatter);
                                String objectName = String.format("%s/%s/%s/%s", "cordyceps", datetime, androidId, filePart.filename());
                                Optional<String> optional = aliyunComponent.ossUploadFromInputStream(objectName, inputStream);
                                optional.ifPresent(oss -> {
                                    log.info(marker, oss);
                                });
                                return objectName;
                            });

                }).collectList()
                .map(results -> {
//                    StringBuilder sb = new StringBuilder("上传结果:\n");
//                    results.forEach(r -> sb.append(r).append("\n"));
//                    log.info(marker, sb.toString());
                    return JsonResponse.builder().status(true).code(Code.SUCCESS.name()).reason("上传成功").data(results).build();
                })
                .onErrorResume(error -> Mono.just(JsonResponse.builder().status(false).code(Code.FAIL.name()).reason(error.getMessage()).data(null).build()));
    }