| 知乎专栏 |
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
@Configuration
public class WebConfig {
@Bean
public WebClient webClient() {
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.defaultCookie("cookie-name", "cookie-value")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
}
超时设置
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
.responseTimeout(Duration.ofSeconds(60))
.doOnConnected(conn -> conn
.addHandler(new ReadTimeoutHandler(60, TimeUnit.SECONDS))
.addHandler(new WriteTimeoutHandler(60, TimeUnit.SECONDS))
);
WebClient client = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl(url)
.build();
@GetMapping("webclient")
public Mono<String> webclient() {
WebClient webClient = WebClient.create("http://localhost:8080");
Mono<String> response = webClient
.get().uri("/mock/mono")
.retrieve()
.bodyToMono(String.class);
response.subscribe(System.out::println);
return response;
}
会返结果
public Mono<ResponseEntity<Employee>> createEmployee(Employee newEmployee) {
return webClient.post()
.uri("/employees")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(newEmployee)
.retrieve()
.toEntity(Employee.class);
}
@PostMapping("/create")
public Mono<ResponseEntity<?>> createEmployee(@RequestBody Employee newEmployee) {
return employeeService.createEmployee(newEmployee)
.map(responseEntity -> {
if (responseEntity.getStatusCode().is2xxSuccessful()) {
return ResponseEntity.ok(responseEntity.getBody());
} else {
return ResponseEntity.status(responseEntity.getStatusCode())
.body("Failed to create employee");
}
})
.onErrorResume(exception -> {
return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Internal Server Error: " + exception.getMessage()));
});
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
public class WebClientConfig {
// 配置 WebClient,指定基础地址(本地服务地址)
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("http://localhost:8080") // 本地服务的基础地址
.build();
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@RestController
public class MyController {
// 注入 WebClient(需提前配置)
@Autowired
private WebClient webClient;
@GetMapping("/hello")
public String hello() {
return "Hello, Spring Boot!";
}
// 调用 /hello 接口(非阻塞方式)
@GetMapping("/query")
public Mono<String> query() {
// 发起 GET 请求到 /hello 接口
return webClient.get()
.uri("/hello") // 相对路径(已在 WebClient 配置中指定基础地址)
.retrieve()
.bodyToMono(String.class) // 转换返回值为 String 类型
.map(helloResult -> "从 /hello 接口获取到的内容:" + helloResult); // 处理结果
}
}
WebClient.create("http://localhost:8080")
.get()
.uri("/students")
.retrieve()
.bodyToFlux(Student.class);
字符串拼接方式
WebClient.create("http://localhost:8080")
.get()
.uri("/students/" + studentId)
.retrieve()
.bodyToMono(Student.class);
通过 uriBuilder 组装 Uri 参数
String endpoint = "/employees";
UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromPath(endpoint)
.queryParam("param1", "value1")
.queryParam("param2", "value2");
webClient.post()
.uri(uriBuilder.build().toUri())
.bodyValue(new Employee(...))
.retrieve()
.bodyToMono(Employee.class);
WebClient.create("http://localhost:8080")
.get()
.uri(uriBuilder -> uriBuilder
.path("/student/{studentId}")
.build(studentId))
.retrieve()
.bodyToMono(Student.class);
WebClient.create("http://localhost:8080")
.get()
.uri(uriBuilder -> uriBuilder
.path("/student/{studentId}/assignments/{assignmentId}")
.build(studentId, assignmentId))
.retrieve()
.bodyToMono(Student.class);
uriTemplate 组装 Uri 参数
UriTemplate uriTemplate = new UriTemplate(
"/student/{studentId}/assignments/{assignmentId}");
WebClient.create("http://localhost:8080")
.get()
.uri(uriTemplate.expand(studentId, assignmentId))
.retrieve()
.bodyToMono(Student.class);
http://localhost:8080/students?firstName=Jon&year=1996
String firstName = "Jon";
String year = "1996";
WebClient.create("http://localhost:8080")
.get()
.uri(uriBuilder -> uriBuilder.path("/students")
.queryParam("firstName", firstName)
.queryParam("year", year)
.build())
.retrieve()
.bodyToMono(Student.class);
http://localhost:8080/students?year=1995,1996,1997
WebClient.create("http://localhost:8080")
.get()
.uri(uriBuilder -> uriBuilder.path("/students")
.queryParam("year", String.join(",", "1995", "1996", "1997"))
.build())
.retrieve()
.bodyToMono(Student.class);
"/products/?category=Phones&category=Tablets"
webClient.get()
.uri(uriBuilder - > uriBuilder
.path("/products/")
.queryParam("category", "Phones", "Tablets")
.build())
.retrieve()
.bodyToMono(String.class)
.onErrorResume(e -> Mono.empty())
.block();
Employee newEmployee = ...; //Create a new employee object
webClient.post()
.uri("/employees")
.bodyValue(BodyInserters.fromValue(newEmployee))
.retrieve()
.toEntity(Employee.class) //Change here
.subscribe(
responseEntity -> {
// Handle success response here
HttpStatusCode status = responseEntity.getStatusCode();
URI location = responseEntity.getHeaders().getLocation();
Employee createdEmployee = responseEntity.getBody(); // Response body
// handle response as necessary
},
error -> {
// Handle the error here
if (error instanceof WebClientResponseException) {
WebClientResponseException ex = (WebClientResponseException) error;
HttpStatusCode status = ex.getStatusCode();
System.out.println("Error Status Code: " + status.value());
//...
} else {
// Handle other types of errors
System.err.println("An unexpected error occurred: " + error.getMessage());
}
}
);
@Service
public class EmployeeService {
private final WebClient webClient;
@Autowired
public EmployeeService(WebClient webClient) {
this.webClient = webClient;
}
public Mono<Employee> createEmployee(Map<String, String> formParams) {
return webClient.post()
.uri("/employees")
.body(BodyInserters.fromFormData("id", formParams.get("id"))
.with("name", formParams.get("name"))
.with("status", formParams.get("status")))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> {
// Handle 4xx client errors here
})
.onStatus(HttpStatus::is5xxServerError, clientResponse -> {
// Handle 5xx server errors here
})
.toEntity(Employee.class)
.flatMap(responseEntity -> Mono.justOrEmpty(responseEntity.getBody()));
}
}
WebClient client = WebClient.create("https://www.netkiller.cn");
FormInserter formInserter = fromMultipartData("name","neo")
.with("age",19)
.with("map",ImmutableMap.of("sex","F"))
.with("file",new File("/tmp/netkiler.doc"));
Mono<String> result = client.post()
.uri("/article/index/{id}.html", 256)
.contentType(MediaType.APPLICATION_JSON)
.body(formInserter)
//.bodyValue(ImmutableMap.of("name","neo"))
.retrieve()
.bodyToMono(String.class);
result.subscribe(System.err::println);
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("https://www.netkiller.cn/article")
.build();
@GetMapping("/test")
public void test() {
//提交参数设置
MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
map.add("title", "linux 手札");
map.add("body", "测试");
// 发送请求
Mono<String> mono = webClient
.post() // POST 请求
.uri("/posts") // 请求路径
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
.body(BodyInserters.fromFormData(map))
.retrieve() // 获取响应体
.bodyToMono(String.class); //响应数据类型转换
// 输出结果
System.out.println(mono.block());
return;
}
}
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("file", new FileSystemResource("/tmp/file.txt"));
builder.part("id", "190001", MediaType.TEXT_PLAIN);
builder.part("name", "Lokesh", MediaType.TEXT_PLAIN);
builder.part("status", "active", MediaType.TEXT_PLAIN);
Then we can submit the multipart form data by using the method BodyInserters.fromMultipartData(builder.build()) and send a normal request as in the previous examples.
webClient.post()
.uri("/employees")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(builder.build()))
.retrieve()
.toEntity(Employee.class)
.doOnError(WriteTimeoutException.class, ex -> {
System.err.println("WriteTimeout");
})
.subscribe(responseEntity -> {
System.out.println("Status: " + responseEntity.getStatusCode().value());
System.out.println("Location URI: " + responseEntity.getHeaders().getLocation().toString());
System.out.println("Created New Employee : " + responseEntity.getBody());
});
webClient.get()
.uri("/employees")
.bodyValue(new Employee(...))
.header("Authorization", "Bearer auth-token")
.header("User-Agent", "Mobile App 1.0")
.retrieve()
WebClient.builder()
.defaultCookie("session","f1d83210-0fc9-4689-82ab-05df70da3367")
.defaultUriVariables(ImmutableMap.of("name","kl"))
.defaultHeader("header","neo")
.defaultHeaders(httpHeaders -> {
httpHeaders.add("header1","neo");
httpHeaders.add("header2","chen");
})
.defaultCookies(cookie ->{
cookie.add("cookie1","neo");
cookie.add("cookie2","netkiller");
})
.baseUrl("https://www.netkiller.cn")
.build();
If-None-Match、If-Modified-Since
Mono<String> mono = webClient
.get() // GET 请求
.uri("/") // 请求路径
.header("Authorization", "Basic ".concat(authorization))
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML)
.acceptCharset(StandardCharsets.UTF_8)
.ifNoneMatch("*")
.ifModifiedSince(ZonedDateTime.now())
.retrieve() // 获取响应体
.bodyToMono(String.class); //响应数据类型转换
WebClient webClient = WebClient.create("https://www.netkiller.cn");
Mono<String> mono = webClient
.get() // GET 请求
.uri("/spring/index.html") // 请求路径
.retrieve() // 获取响应体
.bodyToMono(String.class); //响应数据类型转换
System.out.println(mono.block());
在同步模式下使用WebClient
WebClient client = WebClient.create("http://www.netkiller.cn");
String result = client .get()
.uri("/article/index/arcid/{id}.html", 256)
.retrieve()
.bodyToMono(String.class)
.block();
System.err.println(result);
Person person = client.get().uri("/person/{id}", i).retrieve()
.bodyToMono(Person.class)
.block();
List<Person> persons = client.get().uri("/persons").retrieve()
.bodyToFlux(Person.class)
.collectList()
.block();
避免单独阻塞每个同步响应
WebClient client = WebClient.create("http://www.netkiller.cn");
Mono<String> result1Mono = client .get()
.uri("/article/index/arcid/{id}.html", 255)
.retrieve()
.bodyToMono(String.class);
Mono<String> result2Mono = client .get()
.uri("/article/index/arcid/{id}.html", 254)
.retrieve()
.bodyToMono(String.class);
Map<String,String> map = Mono.zip(result1Mono, result2Mono, (result1, result2) -> {
Map<String, String> map = new HashMap<>();
arrayList.put("result1", result1);
arrayList.put("result2", result2);
return map;
}).block();
System.err.println(map.toString());
Mono<Person> personMono = client.get().uri("/person/{id}", personId)
.retrieve().bodyToMono(Person.class);
Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
.retrieve().bodyToFlux(Hobby.class).collectList();
Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
Map<String, String> map = new LinkedHashMap<>();
map.put("person", person);
map.put("hobbies", hobbies);
return map;
})
.block();
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
WebClient webClient = WebClient.create("https://www.netkiller.cn");
Mono<ClientResponse> mono = webClient
.get()
.uri("/spring/index.html")
.acceptCharset(StandardCharsets.UTF_8)
.exchange();
// 获取完整的响应对象
ClientResponse response = mono.block();
HttpStatus statusCode = (HttpStatus) response.statusCode(); // 获取响应码
MultiValueMap<String, ResponseCookie> cookies = response.cookies();
ClientResponse.Headers headers = response.headers(); // 获取响应头
// 获取响应体
Mono<String> resultMono = response.bodyToMono(String.class);
String body = resultMono.block();
// 输出结果
System.out.println("HttpStatus:" + statusCode);
System.out.println("Cookie:" + cookies.toString());
System.out.println("Headers:" + headers.asHttpHeaders());
System.out.println("body:" + body);
WebClient client = WebClient.builder()
.baseUrl("https://www.netkiller.cn")
.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("admin", "uPQKFe98bIwZCzgVGjbIQRyRyyecW2Ha"))
// .defaultHeaders(httpHeaders -> httpHeaders.setBearerAuth("<bearer token>"))
.build();
Mono<String> response = client.get().uri("/")
.retrieve().bodyToMono(String.class);
log.info(response.block());
WebClient webClient = WebClient.create("https://www.netkiller.cn");
final String flux = webClient.get()
.uri("/")
.headers(httpHeaders -> httpHeaders.setBasicAuth("admin", "uPQKFyRyyecbIwZCzgVGjbW2e98IQRHa"))
.retrieve()
.bodyToMono(String.class)
.block();
System.out.println(flux);
ExchangeFilterFunctions 方案
WebClient client = WebClient.builder()
.baseUrl("https://www.netkiller.cn")
.filter(ExchangeFilterFunctions.basicAuthentication("admin", "uPQcbIwZKFe98IGjbW2QRyRyyeCzgVHa"))
.build();
Mono<String> response = client.get().uri("/")
.retrieve().bodyToMono(String.class);
log.info(response.block());
public void consumeServerSentEvent() {
WebClient client = WebClient.create("https://www.netkiller.cn:8080/sse/server");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
同步阻塞等待 SSE 执行完成
List<String> posts = eventStream.collectList().block();
log.info("结果数:{}, {}" , posts.size(),posts.toString());
public Flux<String> stream(String prompt) {
String format = "请帮我写一篇《%s》故事。";
String content = String.format(format, prompt);
WebClient client = WebClient.create(url);
Flux<String> flux = client.get()
.uri("/chatgpt/stream?content={content}", content)
.headers(httpHeaders -> httpHeaders.setBasicAuth(username, password))
.retrieve()
.bodyToFlux(String.class);
// .map(content -> {
// log.info("data: {}", content);
// return content;
// });
return flux;
}
public Flux<String> streamSentence(String prompt, Callback1 callback) {
// Set<String> symbol = Set.of(",", ".", "!", "?", ",", "。", "!", "?");
Set<String> symbol = Set.of(".", "!", "?", "。", "!", "?");
return Flux.create(sink -> {
StringBuffer word = new StringBuffer();
List<String> sentence = new ArrayList<String>();
Flux<String> flux = this.stream(prompt);
// ParallelFlux<String> flux = stream.parallel(5).runOn(Schedulers.parallel());
flux.subscribe(
content -> {
word.append(content);
boolean contains = symbol.stream().anyMatch(content::contains);
if (contains) {
sentence.add(word.toString());
// new Thread(() -> {
sink.next(word.toString());
// }).start();
log.debug("sentence: {}", sentence.get(sentence.size() - 1));
word.setLength(0);
}
// log.info("word: {}", content);
},
error -> log.error("Error receiving SSE: {}", error),
() -> {
sink.complete();
sentence.add(word.toString());
String segment = String.join("", sentence);
log.info("Story: {}", segment);
if (callback != null) {
callback.onCallback(segment);
}
});
});
}
服务端
public Flux<String> deepseek(String device, String taskId, String speakId, List<Map<String, String>> messages) {
WebClient webClient = WebClient.builder()
.baseUrl(url)
.defaultHeader("Content-Type", "application/json")
.defaultHeader("Authorization", "Bearer " + authorization)
.build();
// List<Map<String, String>> messages = new ArrayList<Map<String, String>>();
// messages.add(Map.of("role", "user", "content", content));
log.debug(messages.toString());
Map<String, Object> requestBody = Map.of(
"model", model,
"messages", messages,
"stream", true
);
return webClient.post()
.uri("/chat/completions")
.bodyValue(requestBody)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
// .filter(chunk -> !chunk.trim().isEmpty())
.map(chunk -> {
// if (chunk.trim().equals("data: [DONE]")) {
// return "[END]";
// }
try {
// log.debug("chunk: " + chunk);
String jsonString = chunk.replaceFirst("^data: ", "");
JsonNode jsonNode = new ObjectMapper().readTree(jsonString);
String result = jsonNode.path("choices").path(0).path("delta").path("content").asText();
result = result.replace(" ", "\\x20").replace("\n", "\\x0A");
// return result != null ? result : "";
log.debug("[" + result + "]");
return result;
} catch (JsonProcessingException e) {
return "";
}
})
// .filter(text -> text != null && !text.equals("null") && !text.isEmpty())
.onErrorResume(e -> {
log.error("Error processing stream", e);
return Flux.just("[ERROR]");
});
}
解决方案
# 传输前
result = result.replace(" ", "\\x20").replace("\n", "\\x0A");
# 接收端
String text = data.replace("\\x20", " ").replace("\\x0A", "\n");
客户端
public void summary(Map<String, String> messages, BiConsumer<String, String> biConsumer) throws InterruptedException {
String jsonString = gson.toJson(messages);
httpRestful.postStream("meeting/" + Config.Cloud.appId + "/" + Config.Android.androidId + "/summary", null, jsonString, new EventSourceListener() {
@Override
public void onClosed(@NonNull EventSource eventSource) {
super.onClosed(eventSource);
biConsumer.accept("onClosed", "");
}
@Override
public void onEvent(@NonNull EventSource eventSource, @Nullable String id, @Nullable String type, @NonNull String data) {
super.onEvent(eventSource, id, type, data);
Log.d(TAG, "Stream Id: " + id + " Type: " + type + " Data: [" + data + "]"); // 请求到的数据
String text = data.replace("\\x20", " ").replace("\\x0A", "\n");
biConsumer.accept(type, text);
}
@Override
public void onFailure(@NonNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) {
super.onFailure(eventSource, t, response);
networkFailure();
Log.w(TAG, "Throwable: " + t + " Response: " + response);
}
@Override
public void onOpen(@NonNull EventSource eventSource, @NonNull Response response) {
super.onOpen(eventSource, response);
biConsumer.accept("onOpen", "");
}
});
}
package cn.netkiller.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
@Service
@Slf4j
public class DeepSeekService {
private final Gson gson = new Gson();
@Value("${deepseek.url}")
private String url;
@Value("${deepseek.authorization}")
private String authorization;
@Value("${deepseek.model}")
private String model;
public Flux<String> deepseek(String device, String taskId, String speakId, List<Map<String, String>> messages) {
WebClient webClient = WebClient.builder()
.baseUrl(url)
.defaultHeader("Content-Type", "application/json")
.defaultHeader("Authorization", "Bearer " + authorization)
.build();
// List<Map<String, String>> messages = new ArrayList<Map<String, String>>();
// messages.add(Map.of("role", "user", "content", content));
log.debug(messages.toString());
Map<String, Object> requestBody = Map.of(
"model", model,
"messages", messages,
"stream", true
);
Flux<String> flux = webClient.post()
.uri("/chat/completions")
.bodyValue(requestBody)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
// .filter(chunk -> !chunk.trim().isEmpty())
.map(chunk -> {
// if (chunk.trim().equals("data: [DONE]")) {
// return "[END]";
// }
try {
// log.debug("chunk: " + chunk);
String jsonString = chunk.replaceFirst("^data: ", "");
JsonNode jsonNode = new ObjectMapper().readTree(jsonString);
String result = jsonNode.path("choices").path(0).path("delta").path("content").asText();
result = result.replace(" ", "\\x20").replace("\n", "\\x0A");
// return result != null ? result : "";
// log.debug("[" + result + "]");
return result;
} catch (JsonProcessingException e) {
return "";
}
})
// .filter(text -> text != null && !text.equals("null") && !text.isEmpty())
.onErrorResume(e -> {
log.error("Error processing stream", e);
return Flux.just("[ERROR]");
});
Set<String> symbol = Set.of(".", "!", "?", ",", "。", "!", "?", "EOT");
return flux.concatWithValues("EOT")
.scan(new Sentence(), (sentence, characters) -> {
// log.info("characters " + characters);
sentence.words.append(characters);
boolean contains = symbol.stream().anyMatch(characters::contains);
if (contains) {
sentence.segment = true;
// log.info("current " + state.current);
}
return sentence;
})
.filter(state -> state.segment)
.map(state -> state.getSentence());
}
public String deepseek(List<Map<String, String>> messages) {
try {
if (messages == null || messages.isEmpty()) {
throw new RuntimeException("摘要内容为空");
}
// 4. 构建请求体(示例)
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("model", model);
requestBody.put("messages", messages);
// 5. 发送请求并获取响应(使用WebClient)
String response = WebClient.create(url)
.post()
.uri("/chat/completions")
.header("Authorization", "Bearer " + authorization)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(requestBody)
.retrieve()
.bodyToMono(String.class)
.block();
// 6. 解析响应
JsonNode responseNode = new ObjectMapper().readTree(response);
return responseNode.path("choices").path(0).path("message").path("content").asText();
} catch (Exception e) {
// 异常处理
log.error("获取答案失败", e);
return "无法生成答案:" + e.getMessage();
}
}
public Flux<String> streamSegment(Flux<String> flux, Consumer<String> consumer) {
StringBuffer sentence = new StringBuffer();
// StringBuffer segment = new StringBuffer();
// AtomicInteger length = new AtomicInteger(sequence.poll());
// StringBuffer text = new StringBuffer();
Set<String> symbol = Set.of(".", "!", "?", ",", "。", "!", "?");
flux.share();
flux.subscribe(
word -> {
sentence.append(word);
boolean contains = symbol.stream().anyMatch(word::contains);
// log.debug(marker,"Word: {} ({}) Symbol:{}", word, word.length(), contains);
if (contains) {
// log.debug(marker, "Sentence: {} ({})", sentence, sentence.length());
consumer.accept(sentence.toString());
sentence.setLength(0);
// log.info(marker,"Segment length: {}", length);
}
}, error -> {
// log.error(marker, "stream error: " + error);
// biConsumer.accept(ChatGPTComponent.ChatGPTText.ERROR, error.toString());
}, () -> {
if (!sentence.isEmpty()) {
consumer.accept(sentence.toString());
}
// log.debug("Text: {} ({})", text, text.length());
});
return flux;
}
static class Sentence {
StringBuilder words = new StringBuilder();
boolean segment = false;
public String getSentence() {
String tmp = words.toString();
words.setLength(0);
segment = false;
return tmp.replace("EOT", "");
}
}
//
@Data
public static class DeepSeekResponse {
}
}
webClient.get()
.uri("https://www.netkiller.cn/index.html")
.retrieve()
.bodyToFlux(JsonNode.class)
.timeout(Duration.ofSeconds(5));
WebClient 只能被订阅一次,如果你尝试多次订阅,就是重复请求
WebClient client = WebClient.create(url);
Flux<String> flux = client.get()
.uri("/chatgpt/stream/sentence?content={content}&symbol=。", content)
.headers(httpHeaders -> httpHeaders.setBasicAuth(username, password))
.retrieve()
.bodyToFlux(String.class)
flux.subscribe(
value -> {
log.warn(value);
});
flux.subscribe(System.out::println);
flux.subscribe(System.out::println);
解决方案是加入 .share() 共享订阅数据
WebClient client = WebClient.create(url);
Flux<String> flux = client.get()
.uri("/chatgpt/stream/sentence?content={content}&symbol=。", content)
.headers(httpHeaders -> httpHeaders.setBasicAuth(username, password))
.retrieve()
.bodyToFlux(String.class)
.share();
.log() 会打印运行期间的所有信息
Flux<String> flux = client.get()
.uri("/chatgpt/stream?content={content}", prompt)
.headers(httpHeaders -> httpHeaders.setBasicAuth(username, password))
.retrieve()
.bodyToFlux(String.class).log();
https://prod-tingwu-paas-common-beijing.oss-cn-beijing.aliyuncs.com/tingwu/output/1744644768465976/b0ef5830bfe94e9881ee08b0a2dab21b/b0ef5830bfe94e9881ee08b0a2dab21b_20250708103257.mp3?Expires=1754920073&OSSAccessKeyId=LTAI5tMzZ1D4o1drkJN1TfCr&Signature=PhUBnEuKW1VOhBMeyTaqJugqt9U%3D
mp3后面,问号以及参数会被 WebClient 截断,解决方案
@SneakyThrows
public String audio(String taskId, String filename) {
Optional<GetTaskInfoResponseBody.GetTaskInfoResponseBodyData> optional = meetingService.getTaskInfo(taskId);
if (optional.isEmpty()) {
return null;
}
String mp3file = optional.get().getOutputMp3Path();
log.info("下载音频文件 " + mp3file);
if (mp3file == null) return null;
WebClient client = WebClient.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(128 * 1024 * 1024))
.build();
Flux<DataBuffer> flux = client.get()
.uri(URI.create(mp3file))
.retrieve()
.bodyToFlux(DataBuffer.class);
String datetime = LocalDateTime.now().format(dateTimeFormatter);
File file = new File(String.format("%s/%s/%s", WORKDIR, datetime, taskId));
if (!file.isDirectory()) {
file.mkdirs();
}
String filepath = String.format("%s/%s", file.getAbsoluteFile(), filename);
Path path = Paths.get(filepath);
DataBufferUtils.write(flux, path).block();
long size = Files.size(path);
log.info("文件下载完成: " + filepath + " 大小:" + size + " 字节");
return String.format("%s/%s/%s", datetime, taskId, filename);
}
针对特定 HTTP 状态码自定义处理逻辑,避免直接抛出异常。
WebClientResponseException:HTTP 响应状态码为 4xx 或 5xx 时抛出,包含响应状态码、 headers、响应体等信息。子类:HttpClientErrorException(4xx)、HttpServerErrorException(5xx)。
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
public class WebClientExceptionExample {
private final WebClient webClient = WebClient.create("https://api.example.com");
public Mono<String> fetchData() {
return webClient.get()
.uri("/data")
.retrieve()
// 处理 4xx 客户端错误
.onStatus(status -> status.is4xxClientError(), response -> {
// 可获取响应体、状态码等信息
return response.bodyToMono(String.class)
.flatMap(body -> Mono.error(
new RuntimeException("客户端错误: " + response.statusCode() + ", 响应体: " + body)
));
})
// 处理 5xx 服务端错误
.onStatus(status -> status.is5xxServerError(), response -> {
return Mono.error(new RuntimeException("服务端错误: " + response.statusCode()));
})
.bodyToMono(String.class);
}
}
仅用于日志记录,不影响异常的传播(异常仍会向上抛出)。
public Mono<String> fetchData() {
return webClient.get()
.uri("/data")
.retrieve()
.bodyToMono(String.class)
.doOnError(WebClientResponseException.class, e ->
log.error("HTTP 错误: 状态码={}, 响应体={}", e.getStatusCode(), e.getResponseBodyAsString())
)
.doOnError(IOException.class, e ->
log.error("网络异常: {}", e.getMessage(), e)
);
}
对所有可能的异常统一处理(包括网络异常、解析异常等)。
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.io.IOException;
public class WebClientExceptionExample {
public Mono<String> fetchData() {
return webClient.get()
.uri("/data")
.retrieve()
.bodyToMono(String.class)
// 捕获特定异常并返回默认值或转换异常
.onErrorResume(WebClientResponseException.class, e -> {
System.err.println("HTTP 错误: " + e.getStatusCode());
return Mono.just("默认数据(因HTTP错误)"); // 返回默认值
})
.onErrorResume(IOException.class, e -> {
System.err.println("网络异常: " + e.getMessage());
return Mono.error(new RuntimeException("网络连接失败,请重试", e)); // 转换异常
})
.onErrorResume(Exception.class, e -> {
System.err.println("其他异常: " + e.getMessage());
return Mono.empty(); // 返回空结果
});
}
}
如果使用 block() 将响应转为同步结果,需用 try-catch 捕获异常:
public String fetchDataSync() {
try {
return webClient.get()
.uri("/data")
.retrieve()
.bodyToMono(String.class)
.block(); // 同步阻塞获取结果
} catch (WebClientResponseException e) {
// 处理 HTTP 响应异常
log.error("HTTP 错误: {}", e.getStatusCode());
return "默认值";
} catch (Exception e) {
// 处理其他异常(网络、解析等)
log.error("调用失败", e);
throw new RuntimeException("服务调用异常", e);
}
}