| 知乎专栏 |
Webflux 不支持 @RequestParam 获取 Form 表单数据
@PostMapping("post")
public String post(@RequestParam("param") String test) {
log.info(test);
return test;
}
有一种方法可以获取 Form 提交过来的数据
@PostMapping("post")
public Mono<MultiValueMap<String, String>> post(ServerWebExchange exchange) {
Mono<MultiValueMap<String, String>> fromData = exchange.getFormData();
log.info(fromData.toString());
return fromData;
}
@PostMapping("post")
public Mono<MultiValueMap<String, String>> post(ServerWebExchange exchange) {
Mono<MultiValueMap<String, String>> fromData = exchange.getFormData();
Disposable subscribe = fromData.subscribe(multiValueMap -> {
log.info(multiValueMap.toString());
Map<String, String> singleValueMap = multiValueMap.toSingleValueMap();
log.info(singleValueMap.toString());
log.info("name: {}, nickname:{}", singleValueMap.get("name"), singleValueMap.get("nickname"));
});
return fromData;
}
获取 Form 数据
@PostMapping("post")
public Mono<Map<String, String>> post(ServerWebExchange serverWebExchange) {
return serverWebExchange.getFormData()
.flatMap(formData -> {
Map<String, String> singleValueMap = formData.toSingleValueMap();
log.info(singleValueMap.toString());
return Mono.just(singleValueMap);
});
}
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
spring.thymeleaf.cache=true # Enable template caching. spring.thymeleaf.check-template=true # Check that the template exists before rendering it. spring.thymeleaf.check-template-location=true # Check that the templates location exists. spring.thymeleaf.enabled=true # Enable Thymeleaf view resolution for Web frameworks. spring.thymeleaf.encoding=UTF-8 # Template files encoding. spring.thymeleaf.excluded-view-names= # Comma-separated list of view names that should be excluded from resolution. spring.thymeleaf.mode=HTML5 # Template mode to be applied to templates. See also StandardTemplateModeHandlers. spring.thymeleaf.prefix=classpath:/templates/ # Prefix that gets prepended to view names when building a URL. spring.thymeleaf.reactive.max-chunk-size= # Maximum size of data buffers used for writing to the response, in bytes. spring.thymeleaf.reactive.media-types= # Media types supported by the view technology. spring.thymeleaf.servlet.content-type=text/html # Content-Type value written to HTTP responses. spring.thymeleaf.suffix=.html # Suffix that gets appended to view names when building a URL. spring.thymeleaf.template-resolver-order= # Order of the template resolver in the chain. spring.thymeleaf.view-names= # Comma-separated list of view names that can be resolved.
@GetMapping("/welcome")
public Mono<String> hello(final Model model) {
model.addAttribute("name", "Neo");
model.addAttribute("city", "深圳");
String path = "hello";
return Mono.create(monoSink -> monoSink.success(path));
}
@GetMapping("/list")
public String listPage(final Model model) {
final Flux<City> citys = cityService.findAllCity();
model.addAttribute("cityLists", citys);
return "cityList";
}
welcome.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8"/>
<title>欢迎页面</title>
</head>
<body>
<h1 >你好,欢迎来自<p th:text="${city}"></p>的<p th:text="${name}"></p></h1>
</body>
</html>
cityList.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8"/>
<title>城市列表</title>
</head>
<body>
<div>
<table>
<legend>
<strong>城市列表</strong>
</legend>
<thead>
<tr>
<th>城市编号</th>
<th>省份编号</th>
<th>名称</th>
<th>描述</th>
</tr>
</thead>
<tbody>
<tr th:each="city : ${cityLists}">
<td th:text="${city.id}"></td>
<td th:text="${city.provinceId}"></td>
<td th:text="${city.name}"></td>
<td th:text="${city.description}"></td>
</tr>
</tbody>
</table>
</div>
</body>
</html>
Mono(返回0或1个元素)/Flux(返回0-n个元素)
Mono.justOrEmpty(Optional.of("Netkiller")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Netkiller")).subscribe(System.out::println);
@GetMapping("mono")
public Mono<Object> mono() {
return Mono.create(monoSink -> {
log.info("创建 Mono");
monoSink.success("hello webflux");
})
.doOnSubscribe(subscription -> { //当订阅者去订阅发布者的时候,该方法会调用
log.info("doOnSubscribe={}", subscription);
}).doOnNext(next -> { //当订阅者收到数据时,改方法会调用
log.info("doOnNext={}", next);
});
}
从 Supplier 创建 Mono
@GetMapping("/get")
private Mono<String> get() {
log.info("start");
Mono<String> result = Mono.fromSupplier(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
return "netkiller";
});
log.info("end");
return result;
}
Mono.empty()
.then()
.doOnSuccess(i -> System.out.println("On success: " + i))
.doOnError(i -> System.out.println("On error: " + i))
.block();
Mono.empty()
.then(Mono.just("Good bye"))
.doOnSuccess(i -> System.out.println("On success: " + i))
.doOnError(i -> System.out.println("On error: " + i))
.block();
Mono.just("Hello World")
.then(Mono.just("Good bye"))
.doOnSuccess(i -> System.out.println("On success: " + i))
.doOnError(i -> System.out.println("On error: " + i))
.block();
Mono.error(new RuntimeException("Something wrong"))
.then(Mono.just("Good bye"))
.doOnSuccess(i -> System.out.println("On success: " + i))
.doOnError(i -> System.out.println("On error: " + i))
.block();
Mono.error(new RuntimeException("Something wrong"))
.then(Mono.error(new RuntimeException("Something very wrong")))
.doOnSuccess(i -> System.out.println("On success: " + i))
.doOnError(i -> System.out.println("On error: " + i))
.block();
Mono.just("Netkiller")
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);
Mono.just("Netkiller")
.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn("defaultError")
.subscribe(System.out::println);
Mono.just("Netkiller")
.concatWith(Mono.error(new IllegalStateException()))
.switchOnError(Mono.just("defaultError"))
.subscribe(System.out::println);
Mono.just("Hello World")
.doOnNext(i -> System.out.println("On next: " + i))
.doOnSuccess(i -> System.out.println("On success: " + i))
.doOnError(i -> System.out.println("On error: " + i))
.block();
Mono.empty()
.doOnNext(i -> System.out.println("On next: " + i))
.doOnSuccess(i -> System.out.println("On success: " + i))
.doOnError(i -> System.out.println("On error: " + i))
.block();
Mono.just("Hello World")
.mapNotNull(s -> null)
.doOnNext(i -> System.out.println("On next: " + i))
.doOnSuccess(i -> System.out.println("On success: " + i))
.doOnError(i -> System.out.println("On error: " + i))
.block();
Mono.error(new RuntimeException("Something wrong"))
.doOnNext(i -> System.out.println("On next: " + i))
.doOnSuccess(i -> System.out.println("On success: " + i))
.doOnError(i -> System.out.println("On error: " + i))
.block();
Mono.delay(Duration.ofMillis(10)).map(d -> {
System.out.println(d);
return d;
}).block();
Mono.delay(Duration.ofMillis(10)).map(d -> {
System.out.println(d);
return d;
}).subscribe(System.out::println);
Flux<String> flux = Flux.create(sink -> {
sink.next("Neo");
sink.next("netkiller");
sink.complete();
});
flux.subscribe(System.out::println);
Flux<String> generate = Flux.generate(synchronousSink -> {
synchronousSink.next("Neo");
synchronousSink.complete();
});
generate.subscribe(System.out::println);
多个值
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);
//单个元素
Flux.just("just").subscribe(System.out::println);
//多个元素
Flux.just("just", "just1", "just2").subscribe(System.out::println);
//Flux->Flux
Flux.from(Flux.just("just", "just1", "just2")).subscribe(System.out::println);
//Mono->Mono
Flux.from(Mono.just("just")).subscribe(System.out::println);
Flux.just("Netkiller1", "Netkiller2", "Netkiller3")
.subscribe(data -> System.out.println("onNext:" + data), err -> {
}, () -> System.out.println("onComplete"));
多个订阅者
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Flux<String> flux = Flux.fromIterable(list);
flux.subscribe(System.out::println);
flux.subscribe(System.out::println);
flux.subscribe(System.out::println);
手动控制
Flux<String> flux = Flux.fromIterable(list);
ConnectableFlux<String> con = flux.publish();
con.subscribe(System.out::println);
con.subscribe(System.out::println);
con.subscribe(System.out::println);
// 手动的开启消费数据
con.connect();
Flux.fromArray(new String[] { "arr", "arr", "arr", "arr" }).subscribe(System.out::println);
Set<String> v = new HashSet<>();
v.add("1");
v.add("2");
v.add("3");
Flux.fromIterable(() -> v.iterator()).subscribe(System.out::println);
autoConnect(5) 表示如果订阅者达到5个 就自动开启
Flux<String> auto = flux.publish().autoConnect(5);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
停止消费策略
// 如果订阅者少于三个就会停止消费数据,直到订阅者达到三个为止
Flux<String> auto = flux.publish().refCount(3);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
Thread.sleep(1000L);
// 如果订阅者少于三个且超过十秒没有新的订阅才会停止消费数据
Flux<String> auto = flux.publish().refCount(3,Duration.ofSeconds(10));
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
从 Stream 返回 Flux
@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> getWords() {
Stream<String> items = Arrays.asList("alpha", "bravo", "charlie").stream();
return Flux.fromStream(items);
}
Flux.defer(() -> Flux.just("just", "just1", "just2")).subscribe(System.out::println);
Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(System.out::println); Flux.interval(Duration.ofSeconds(2), Duration.ofMillis(200)).subscribe(System.out::println); Flux.intervalMillis(1000).subscribe(System.out::println);
返回 List
@GetMapping("flux")
public Flux<Picture> flux() {
List<Picture> list = new ArrayList<Picture>();
IntStream.range(1, 10).forEach(i -> {
Picture picture = new Picture();
picture.setId(Long.valueOf(i));
picture.setImage("https://www.netkiller.cn/images/" + i + ".png");
list.add(picture);
});
return Flux.fromIterable(list);
}
返回 Map
@GetMapping("map")
public Flux<Map.Entry<String, String>> map() {
Map<String, String> map = new HashMap<>();
IntStream.range(1, 10).forEach(i -> {
map.put("key" + i, "value" + i);
});
return Flux.fromIterable(map.entrySet());
}
AtomicReference<FluxSink<String>> fluxSink = new AtomicReference<FluxSink<String>>();
Flux<String> response = Flux.create(sink -> {
fluxSink.set(sink);
});
response.subscribe(System.out::println);
//下发元素
fluxSink.get().next("Neo");
fluxSink.get().next("Netkiller");
Consumer<String> producer;
@Test
public void testFluxCreate() {
Flux.create(sink -> {
producer = nextData -> sink.next(nextData);
}).subscribe(e -> System.out.println(e));
//do something
//下发元素
producer.accept("Neo");
producer.accept("Netkiller");
}
FluxSink<String> outSink;
@Test
public void testFluxCreate() {
Flux<String> f = Flux.create(sink -> {
outSink = sink;
});
f.subscribe(e -> System.out.println(e))
//do something
//下发元素
outSink.next("Neo");
outSink.next("Netkiller");
}
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
Flux.just(1, 2, 3, 4)
.log()
.map(i -> {
return i * 2;
})
.subscribe(System.out::println);
Flux.just(1, 2, 3, 4)
.log()
.flatMap(e -> {
return Flux.just(e * 2).delayElements(Duration.ofSeconds(1));
})
.subscribe(System.out::println);
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
flux.map(num -> num * 2) // 将每个元素乘以2
.filter(num -> num % 2 == 0) // 过滤掉奇数
.subscribe(System.out::println);
doOnComplete 完成时执行
AtomicBoolean onComplete = new AtomicBoolean();
Flux<Integer> flux = Flux.just(1, 2, 3);
flux.log()
.doOnComplete(() -> onComplete.set(true))
.doOnComplete(() -> {
System.out.println("OK");
})
.subscribe(System.out::println);
System.out.println(onComplete.get());
Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.publishOn(Schedulers.parallel())
.flatMap(integer -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return Mono.just(integer);
}, 5)
.repeat()
.subscribe();
@GetMapping(value = "parallel", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> parallel() {
return Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.publishOn(Schedulers.parallel())
.flatMap(integer -> {
log.info("val:" + integer + ", thread:" + Thread.currentThread().getId());
return Mono.just(integer);
}, 2).repeat(10L);
}
Flux.just(1,2,3,4,5,6,7,8)
.parallel(2) // mention number of threads
.runOn(Schedulers.parallel())
.map(integer -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return integer;
})
.subscribe();
Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.publishOn(Schedulers.parallel()) // <- Each flux can be published in a different thread
.flatMap(integer -> {
return Mono.fromCallable(() -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
log.info("val:" + integer + ", thread:" + Thread.currentThread().getId());
return integer;
}).publishOn(Schedulers.parallel()); // <- Each Mono processing every integer can be processed in a different thread
})
.repeat()
.subscribe();
package cn.netkiller.test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Test {
public static void main(String[] args) {
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
}).publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.immediate())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
}
}
Flux.just(1,2,3,4,5,6,7,8)
.publishOn(Schedulers.parallel()) // <- Each flux can be published in a different thread
.flatMap(integer -> {
return Mono.fromCallable(() -> {
System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId());
return integer;
}).publishOn(Schedulers.parallel()); // <- Each Mono processing every integer can be processed in a different thread
})
.repeat()
.subscribe();
@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> createConnectionAndSendEvents() {
return Flux.just("Alpha", "Omega");
}
curl 访问 SSE 需要设置HTTP头 -H "Accept: text/event-stream"
neo@MacBook-Pro-M2 ~ % curl -H "Accept: text/event-stream" -X 'GET' 'http://localhost:8080/mock/sse' data:Alpha data:Omega
![]() | 提示 |
|---|---|
| Safari 浏览器不支持 SSE推送,微软的 Egde 支持。 |
@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> getWords() {
Stream<String> items = Arrays.asList("alpha", "bravo", "charlie").stream();
return Flux.fromStream(items);
}
每间隔一秒发送一次数据
@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> getWords() {
String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
return Flux
.zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1);
}
@GetMapping("/random")
public Flux<ServerSentEvent<Integer>> randomNumbers() {
return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, ThreadLocalRandom.current().nextInt())).map(data -> ServerSentEvent.<Integer>builder().event("random").id(Long.toString(data.getT1())).data(data.getT2()).build());
}
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now().toString());
}
@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<ServerSentEvent<Object>> getWords() {
AtomicInteger seq = new AtomicInteger(1);
return Flux.just("你好", "我是 Neo", "我的昵称是 netkiller")
.map(data -> ServerSentEvent.builder()
.event("hello")
.id(String.valueOf(seq.getAndIncrement()))
.data(data)
.comment("测试数据")
.build());
}
演示结果
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/test/sse id:1 event:hello data:你好 :测试数据 id:2 event:hello data:我是 Neo :测试数据 id:3 event:hello data:我的昵称是 netkiller :测试数据
FluxSink 发送 ServerSentEvent
@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> serverSentEvent() {
AtomicInteger sequence = new AtomicInteger(1);
Flux<ServerSentEvent<String>> flux = Flux.create(sink -> {
sink.next(ServerSentEvent.<String>builder()
.id(String.valueOf(sequence.getAndIncrement()))
.event("LocalTime")
.data(String.valueOf(LocalTime.now()))
.build());
sink.next(ServerSentEvent.<String>builder()
.id(String.valueOf(sequence.getAndIncrement()))
.event("test")
.data("Hello netkiller")
.comment("测试")
.build());
sink.complete();
});
return flux;
}
package cn.netkiller.webflux.controller;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
@RestController
@RequestMapping("/sse")
public class SseController {
private int count_down = 10;
public SseController() {
}
@GetMapping(value = "/launch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Object>> countDown() {
return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build());
}
private String getCountDownSec() {
if (count_down > 0) {
count_down--;
return "倒计时:" + count_down;
}
return "发射";
}
@GetMapping("/range")
public Flux<Object> range() {
return Flux.range(10, 1).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build());
}
// WebFlux 服务器推送(SSE - >Server Send Event)
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux() {
Flux<String> result = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
logger.info("sse " + i);
return "flux data -- " + i;
}));
return result;
}
}
运行结果
id:0 event:launch data:倒计时:9 id:1 event:launch data:倒计时:8 id:2 event:launch data:倒计时:7 id:3 event:launch data:倒计时:6 id:4 event:launch data:倒计时:5 id:5 event:launch data:倒计时:4 id:6 event:launch data:倒计时:3 id:7 event:launch data:倒计时:2 id:8 event:launch data:倒计时:1 id:9 event:launch data:倒计时:0 id:10 event:launch data:发射
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> sse() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.builder("data-" + sequence).build())
.take(10) // 发10条结束
// ✅ 【正常完成】Flux 正常结束时触发
.doOnComplete(() -> {
System.out.println("✅ SSE Flux 正常完成!");
})
// ✅ 【异常完成】发生错误时触发
.doOnError(ex -> {
System.out.println("❌ SSE Flux 异常结束:" + ex.getMessage());
})
// ✅ 【最终完成】无论正常/异常/客户端断开,都会进这里!⭐⭐⭐
.doFinally(signalType -> {
System.out.println("🔚 SSE 流最终关闭:" + signalType);
// 你可以在这里:释放锁、关闭资源、记录日志
});
}
输出结果
✅ SSE Flux 正常完成! 🔚 SSE 流最终关闭:onComplete
@GetMapping("/server")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("test-event")
.data("LocalTime: " + LocalTime.now())
.build());
}
@GetMapping("/client")
public void consumeServerSentEvent() {
WebClient client = WebClient.create("http://localhost:8080");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {
};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("mock/server")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> log.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> log.error("Error receiving SSE: {}", error),
() -> log.info("Completed!!!"));
}
// 使用 concatWith() 追加另一个 Flux
Flux.just(1, 2, 3)
.concatWith(Flux.just(4, 5, 6))
.subscribe(System.out::println);
// 效果等同于 concatWithValues(4, 5, 6)
Flux.just(1, 2, 0)
.map(num -> 10 / num) // 除以 0 会触发错误
.onErrorReturn(-1) // 错误时返回 -1
.concatWithValues(5, 6) // 无论如何都会追加
.subscribe(System.out::println);
// 输出结果:
// 10
// 5
// -1 <- 错误处理结果
// 5 <- 追加的元素
// 6
public Flux<String> deepseek(String device, String taskId, String speakId, List<Map<String, String>> messages) {
WebClient webClient = WebClient.builder()
.baseUrl(url)
.defaultHeader("Content-Type", "application/json")
.defaultHeader("Authorization", "Bearer " + authorization)
.build();
// List<Map<String, String>> messages = new ArrayList<Map<String, String>>();
// messages.add(Map.of("role", "user", "content", content));
log.debug(messages.toString());
Map<String, Object> requestBody = Map.of(
"model", model,
"messages", messages,
"stream", true
);
StringBuffer sentence = new StringBuffer();
Set<String> symbol = Set.of(".", "!", "?", ",", "。", "!", "?");
return webClient.post()
.uri("/chat/completions")
.bodyValue(requestBody)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
// .filter(chunk -> !chunk.trim().isEmpty())
.map(chunk -> {
// if (chunk.trim().equals("data: [DONE]")) {
// return "[END]";
// }
try {
// log.debug("chunk: " + chunk);
String jsonString = chunk.replaceFirst("^data: ", "");
JsonNode jsonNode = new ObjectMapper().readTree(jsonString);
String result = jsonNode.path("choices").path(0).path("delta").path("content").asText();
result = result.replace(" ", "\\x20").replace("\n", "\\x0A");
// return result != null ? result : "";
// log.debug("[" + result + "]");
return result;
} catch (JsonProcessingException e) {
return "";
}
}).concatWithValues("。")
// .filter(text -> text != null && !text.equals("null") && !text.isEmpty())
.onErrorResume(e -> {
log.error("Error processing stream", e);
return Flux.just("[ERROR]");
});
}
计数
import reactor.core.publisher.Flux;
public class ScanExample {
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5)
.scan((total, current) -> total + current) // 累加
.subscribe(System.out::println);
}
}
// 输出结果:
// 1 <- 第一个元素直接输出(作为初始值)
// 3 <- 1 + 2
// 6 <- 3 + 3
// 10 <- 6 + 4
// 15 <- 10 + 5
字符串追加
Flux.just("a", "b", "c", "d")
.scan("", (acc, current) -> acc + current) // 初始值为空字符串
.subscribe(System.out::println);
// 输出结果:
// "" <- 初始值
// "a" <- "" + "a"
// "ab" <- "a" + "b"
// "abc"
// "abcd"
class Counter {
int evenCount = 0;
int oddCount = 0;
@Override
public String toString() {
return "偶数: " + evenCount + ", 奇数: " + oddCount;
}
}
Flux.just(1, 2, 3, 4, 5)
.scan(new Counter(), (counter, num) -> {
if (num % 2 == 0) counter.evenCount++;
else counter.oddCount++;
return counter;
})
.subscribe(System.out::println);
// 输出结果:
// 偶数: 0, 奇数: 0 <- 初始状态
// 偶数: 0, 奇数: 1 <- 处理 1
// 偶数: 1, 奇数: 1 <- 处理 2
// 偶数: 1, 奇数: 2 <- 处理 3
// 偶数: 2, 奇数: 2 <- 处理 4
// 偶数: 2, 奇数: 3 <- 处理 5
Flux.just("a", "b", "c", "d")
.scan(new StringBuffer(), (sentence, current) -> sentence.append(current)) // 初始值为空字符串
.skip(1)
.subscribe(System.out::println);
AtomicBoolean onComplete = new AtomicBoolean();
ConnectableFlux<Integer> f = Flux.just(1)
.doOnComplete(() -> onComplete.set(true))
.publish();
@PostMapping("/upload/files")
Mono upload(@RequestPart("files") Flux<FilePart> filePartFlux) {
return filePartFlux.flatMap(file -> file.transferTo(Paths.get(file.filename())))
.then(Mono.just("OK"))
.onErrorResume(error -> Mono.just("Error uploading files"));
}
获取文件 bytes 字节码
@PostMapping("/upload/files/save")
Mono uploadFileSave(@RequestPart("files") Flux<FilePart> filePartFlux) {
File file = new File();
return filePartFlux.flatMap(filePart -> filePart.transferTo(Paths.get(filePart.filename()))
.then(Mono.just(filePart.filename())))
.collectList()
.flatMap(filenames -> {
file.setFilenames(filenames);
return fileService.save(file);
})
.onErrorResume(error -> Mono.error(error));
}
@PostMapping(value = "upload/{androidId}", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
public Mono<String> upload(@PathVariable("androidId") String androidId, @RequestPart("file") Mono<FilePart> filePart) {
return filePart.flatMap(file -> {
Mono<byte[]> fileBytes = file.content().map(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
DataBufferUtils.release(buffer);
return bytes;
})
.reduce(new ByteArrayOutputStream(), (os, b) -> {
os.write(b, 0, b.length);
return os;
})
.map(ByteArrayOutputStream::toByteArray);
return fileBytes;
}).then(Mono.just("OK"))
.onErrorResume(error -> Mono.just("Error uploading files"));
获取文件输入流 InputStream
@PostMapping(value = "upload/{androidId}", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
public Mono<JsonResponse<Object>> upload(@PathVariable("androidId") String androidId, @RequestPart("files") Flux<FilePart> fileParts) {
final Marker marker = MarkerFactory.getMarker(androidId);
return fileParts.flatMap(filePart -> {
return filePart.content()
.map(dataBuffer -> {
// 将 DataBuffer 转换为 InputStream
InputStream inputStream = dataBuffer.asInputStream();
// 注意:需要正确处理 InputStream 的使用和关闭
return inputStream;
}).map(inputStream -> {
String datetime = LocalDateTime.now().format(dateTimeFormatter);
String objectName = String.format("%s/%s/%s/%s", "cordyceps", datetime, androidId, filePart.filename());
Optional<String> optional = aliyunComponent.ossUploadFromInputStream(objectName, inputStream);
optional.ifPresent(oss -> {
log.info(marker, oss);
});
return objectName;
});
}).collectList()
.map(results -> {
// StringBuilder sb = new StringBuilder("上传结果:\n");
// results.forEach(r -> sb.append(r).append("\n"));
// log.info(marker, sb.toString());
return JsonResponse.builder().status(true).code(Code.SUCCESS.name()).reason("上传成功").data(results).build();
})
.onErrorResume(error -> Mono.just(JsonResponse.builder().status(false).code(Code.FAIL.name()).reason(error.getMessage()).data(null).build()));
}