Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

68.6. WebClient

	
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>	
	
	

68.6.1. 配置 WebClient

			
@Configuration
public class WebConfig {

  @Bean
  public WebClient webClient() {

    WebClient webClient = WebClient.builder()
      .baseUrl("http://localhost:8080")
      .defaultCookie("cookie-name", "cookie-value")
      .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
      .build();
  }
}			
			
		

超时设置

		
HttpClient httpClient = HttpClient.create()
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
                .responseTimeout(Duration.ofSeconds(60))
                .doOnConnected(conn -> conn
                        .addHandler(new ReadTimeoutHandler(60, TimeUnit.SECONDS))
                        .addHandler(new WriteTimeoutHandler(60, TimeUnit.SECONDS))
                );
        WebClient client = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .baseUrl(url)
                .build();
		
		

68.6.2. @Controller/@RestController 实例

		
    @GetMapping("webclient")
    public Mono<String> webclient() {
        WebClient webClient = WebClient.create("http://localhost:8080");
        Mono<String> response = webClient
                .get().uri("/mock/mono")
                .retrieve()
                .bodyToMono(String.class);
        response.subscribe(System.out::println);
        return response;
    }
		
		

会返结果

			
public Mono<ResponseEntity<Employee>> createEmployee(Employee newEmployee) {

  return webClient.post()
    .uri("/employees")
    .contentType(MediaType.APPLICATION_JSON)
    .bodyValue(newEmployee)
    .retrieve()
    .toEntity(Employee.class);
}



@PostMapping("/create")
public Mono<ResponseEntity<?>> createEmployee(@RequestBody Employee newEmployee) {

  return employeeService.createEmployee(newEmployee)
    .map(responseEntity -> {
      if (responseEntity.getStatusCode().is2xxSuccessful()) {
        return ResponseEntity.ok(responseEntity.getBody());
      } else {
        return ResponseEntity.status(responseEntity.getStatusCode())
          .body("Failed to create employee");
      }
    })
    .onErrorResume(exception -> {
      return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
        .body("Internal Server Error: " + exception.getMessage()));
    });
}			
			
		

		
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;

@Configuration
public class WebClientConfig {
    // 配置 WebClient,指定基础地址(本地服务地址)
    @Bean
    public WebClient webClient() {
        return WebClient.builder()
                .baseUrl("http://localhost:8080")  // 本地服务的基础地址
                .build();
    }
}		
		
		
		
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

@RestController
public class MyController {

    // 注入 WebClient(需提前配置)
    @Autowired
    private WebClient webClient;

    @GetMapping("/hello")
    public String hello() {
        return "Hello, Spring Boot!";
    }

    // 调用 /hello 接口(非阻塞方式)
    @GetMapping("/query")
    public Mono<String> query() {
        // 发起 GET 请求到 /hello 接口
        return webClient.get()
                .uri("/hello")  // 相对路径(已在 WebClient 配置中指定基础地址)
                .retrieve()
                .bodyToMono(String.class)  // 转换返回值为 String 类型
                .map(helloResult -> "从 /hello 接口获取到的内容:" + helloResult);  // 处理结果
    }
}
		
		
		

68.6.3. Get 请求实例

		
WebClient.create("http://localhost:8080")
    .get()
    .uri("/students")
    .retrieve()
    .bodyToFlux(Student.class);		
			
		

68.6.4. URI 参数

字符串拼接方式

			
WebClient.create("http://localhost:8080")
    .get()
    .uri("/students/" + studentId)
    .retrieve()
    .bodyToMono(Student.class);
			
		

通过 uriBuilder 组装 Uri 参数

			
String endpoint = "/employees";
        
UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromPath(endpoint)
    .queryParam("param1", "value1")
    .queryParam("param2", "value2");

webClient.post()
        .uri(uriBuilder.build().toUri())
        .bodyValue(new Employee(...))
        .retrieve()
        .bodyToMono(Employee.class);
			
		
			
WebClient.create("http://localhost:8080")
    .get()
    .uri(uriBuilder -> uriBuilder
        .path("/student/{studentId}")
        .build(studentId))
    .retrieve()
    .bodyToMono(Student.class);		
    
WebClient.create("http://localhost:8080")
    .get()
    .uri(uriBuilder -> uriBuilder
        .path("/student/{studentId}/assignments/{assignmentId}")
        .build(studentId, assignmentId))
    .retrieve()
    .bodyToMono(Student.class);
			
		

uriTemplate 组装 Uri 参数

			
UriTemplate uriTemplate = new UriTemplate(
    "/student/{studentId}/assignments/{assignmentId}");

WebClient.create("http://localhost:8080")
    .get()
    .uri(uriTemplate.expand(studentId, assignmentId))
    .retrieve()
    .bodyToMono(Student.class);			
			
		

68.6.5. 查询参数

http://localhost:8080/students?firstName=Jon&year=1996

			
String firstName = "Jon";
String year = "1996";

WebClient.create("http://localhost:8080")
    .get()
    .uri(uriBuilder -> uriBuilder.path("/students")
        .queryParam("firstName", firstName)
        .queryParam("year", year)
        .build())
    .retrieve()
    .bodyToMono(Student.class);			
			
		

http://localhost:8080/students?year=1995,1996,1997

			
WebClient.create("http://localhost:8080")
    .get()
    .uri(uriBuilder -> uriBuilder.path("/students")
        .queryParam("year", String.join(",", "1995", "1996", "1997"))
        .build())
    .retrieve()
    .bodyToMono(Student.class);			
			
		

"/products/?category=Phones&category=Tablets"

			
webClient.get()
  .uri(uriBuilder - > uriBuilder
    .path("/products/")
    .queryParam("category", "Phones", "Tablets")
    .build())
  .retrieve()
  .bodyToMono(String.class)
  .onErrorResume(e -> Mono.empty())
  .block();			
			
		

68.6.6. Post 操作演示

			
Employee newEmployee = ...;  //Create a new employee object
			
webClient.post()
  .uri("/employees")
  .bodyValue(BodyInserters.fromValue(newEmployee))
  .retrieve()
  .toEntity(Employee.class)   //Change here
  .subscribe(
    responseEntity -> {
      // Handle success response here
      HttpStatusCode status = responseEntity.getStatusCode();
      URI location = responseEntity.getHeaders().getLocation();
      Employee createdEmployee = responseEntity.getBody();    // Response body
      // handle response as necessary
    },
    error -> {
      // Handle the error here
      if (error instanceof WebClientResponseException) {
        WebClientResponseException ex = (WebClientResponseException) error;
        HttpStatusCode status = ex.getStatusCode();
        System.out.println("Error Status Code: " + status.value());
        //...
      } else {
        // Handle other types of errors
        System.err.println("An unexpected error occurred: " + error.getMessage());
      }
    }
  );			
			
		

68.6.7. Post 表单数据

			
@Service
public class EmployeeService {

  private final WebClient webClient;

  @Autowired
  public EmployeeService(WebClient webClient) {
      this.webClient = webClient;
  }

  public Mono<Employee> createEmployee(Map<String, String> formParams) {
    return webClient.post()
      .uri("/employees")
      .body(BodyInserters.fromFormData("id", formParams.get("id"))
        .with("name", formParams.get("name"))
        .with("status", formParams.get("status")))
      .retrieve()
      .onStatus(HttpStatus::is4xxClientError, clientResponse -> {
          // Handle 4xx client errors here
      })
      .onStatus(HttpStatus::is5xxServerError, clientResponse -> {
          // Handle 5xx server errors here
      })
      .toEntity(Employee.class)
      .flatMap(responseEntity -> Mono.justOrEmpty(responseEntity.getBody()));
  }
}			
			
		
			
        WebClient client = WebClient.create("https://www.netkiller.cn");
        FormInserter formInserter = fromMultipartData("name","neo")
                .with("age",19)
                .with("map",ImmutableMap.of("sex","F"))
                .with("file",new File("/tmp/netkiler.doc"));
        Mono<String> result = client.post()
                .uri("/article/index/{id}.html", 256)
                .contentType(MediaType.APPLICATION_JSON)
                .body(formInserter)
                //.bodyValue(ImmutableMap.of("name","neo"))
                .retrieve()
                .bodyToMono(String.class);
        result.subscribe(System.err::println);			
			
		
		
@RestController
public class HelloController {
 
    // 创建 WebClient 对象
    private WebClient webClient = WebClient.builder()
            .baseUrl("https://www.netkiller.cn/article")
            .build();
 
    @GetMapping("/test")
    public void test() {
        //提交参数设置
        MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
        map.add("title", "linux 手札");
        map.add("body", "测试");
 
        // 发送请求
        Mono<String> mono = webClient
                .post() // POST 请求
                .uri("/posts")  // 请求路径
                .contentType(MediaType.APPLICATION_FORM_URLENCODED)
                .body(BodyInserters.fromFormData(map))
                .retrieve() // 获取响应体
                .bodyToMono(String.class); //响应数据类型转换
 
        // 输出结果
        System.out.println(mono.block());
        return;
    }
}
		
		

68.6.8. 上传文件

			
MultipartBodyBuilder builder = new MultipartBodyBuilder();

builder.part("file", new FileSystemResource("/tmp/file.txt"));
builder.part("id", "190001", MediaType.TEXT_PLAIN);
builder.part("name", "Lokesh", MediaType.TEXT_PLAIN);
builder.part("status", "active", MediaType.TEXT_PLAIN);

Then we can submit the multipart form data by using the method BodyInserters.fromMultipartData(builder.build()) and send a normal request as in the previous examples.

webClient.post()
    .uri("/employees")
    .contentType(MediaType.MULTIPART_FORM_DATA)
    .body(BodyInserters.fromMultipartData(builder.build()))
    .retrieve()
    .toEntity(Employee.class)
    .doOnError(WriteTimeoutException.class, ex -> {
      System.err.println("WriteTimeout");
    })
    .subscribe(responseEntity -> {
      System.out.println("Status: " + responseEntity.getStatusCode().value());
      System.out.println("Location URI: " + responseEntity.getHeaders().getLocation().toString());
      System.out.println("Created New Employee : " + responseEntity.getBody());
    });			
			
		

68.6.9. 设置 HTTP 头

			
webClient.get()
    .uri("/employees")
    .bodyValue(new Employee(...))
    .header("Authorization", "Bearer auth-token")
    .header("User-Agent", "Mobile App 1.0")
    .retrieve()			
			
		
			
	WebClient.builder()
        .defaultCookie("session","f1d83210-0fc9-4689-82ab-05df70da3367")
        .defaultUriVariables(ImmutableMap.of("name","kl"))
        .defaultHeader("header","neo")
        .defaultHeaders(httpHeaders -> {
            httpHeaders.add("header1","neo");
            httpHeaders.add("header2","chen");
        })
        .defaultCookies(cookie ->{
            cookie.add("cookie1","neo");
            cookie.add("cookie2","netkiller");
        })
        .baseUrl("https://www.netkiller.cn")
        .build();			
			
		

If-None-Match、If-Modified-Since

		
            Mono<String> mono = webClient
                    .get() // GET 请求
                    .uri("/")  // 请求路径
                    .header("Authorization", "Basic ".concat(authorization))
                    .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                    .accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML)
                    .acceptCharset(StandardCharsets.UTF_8)
                    .ifNoneMatch("*")
                    .ifModifiedSince(ZonedDateTime.now())
                    .retrieve() // 获取响应体
                    .bodyToMono(String.class); //响应数据类型转换		
		
		

68.6.10. 同步阻塞等待返回结果

		
        WebClient webClient = WebClient.create("https://www.netkiller.cn");

        Mono<String> mono = webClient
                .get() // GET 请求
                .uri("/spring/index.html")  // 请求路径
                .retrieve() // 获取响应体
                .bodyToMono(String.class); //响应数据类型转换

        System.out.println(mono.block());		
		
		

68.6.10.1. 同步阻塞等待结果

在同步模式下使用WebClient

			
 WebClient client =  WebClient.create("http://www.netkiller.cn");
      String result = client .get()
                .uri("/article/index/arcid/{id}.html", 256)
                .retrieve()
                .bodyToMono(String.class)
                .block();
        System.err.println(result);			
			
			
		

Person person = client.get().uri("/person/{id}", i).retrieve()
    .bodyToMono(Person.class)
    .block();

List<Person> persons = client.get().uri("/persons").retrieve()
    .bodyToFlux(Person.class)
    .collectList()
    .block();
		
		
			

避免单独阻塞每个同步响应

			
      WebClient client =  WebClient.create("http://www.netkiller.cn");
        Mono<String> result1Mono = client .get()
                .uri("/article/index/arcid/{id}.html", 255)
                .retrieve()
                .bodyToMono(String.class);
        Mono<String> result2Mono = client .get()
                .uri("/article/index/arcid/{id}.html", 254)
                .retrieve()
                .bodyToMono(String.class);
        Map<String,String>  map = Mono.zip(result1Mono, result2Mono, (result1, result2) -> {
            Map<String, String> map = new HashMap<>();
            arrayList.put("result1", result1);
            arrayList.put("result2", result2);
            return map;
        }).block();
        System.err.println(map.toString());		
        
		Mono<Person> personMono = client.get().uri("/person/{id}", personId)
		        .retrieve().bodyToMono(Person.class);
		
		Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
		        .retrieve().bodyToFlux(Hobby.class).collectList();
		
		Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
		            Map<String, String> map = new LinkedHashMap<>();
		            map.put("person", person);
		            map.put("hobbies", hobbies);
		            return map;
		        })
		        .block();

			
			

68.6.11. websocket

			
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
        session.receive()
                .doOnNext(System.out::println)
                .then());			
			
		

68.6.12. 获取 HTTP 链接状态

		
			WebClient webClient = WebClient.create("https://www.netkiller.cn");
            Mono<ClientResponse> mono = webClient
                    .get() 
                    .uri("/spring/index.html")
                    .acceptCharset(StandardCharsets.UTF_8)
                    .exchange();

            // 获取完整的响应对象
            ClientResponse response = mono.block();
            
            HttpStatus statusCode = (HttpStatus) response.statusCode(); // 获取响应码
            MultiValueMap<String, ResponseCookie> cookies = response.cookies();
            ClientResponse.Headers headers = response.headers(); // 获取响应头

            // 获取响应体
            Mono<String> resultMono = response.bodyToMono(String.class);
            String body = resultMono.block();

            // 输出结果
            System.out.println("HttpStatus:" + statusCode);
            System.out.println("Cookie:" + cookies.toString());
            System.out.println("Headers:" + headers.asHttpHeaders());
            System.out.println("body:" + body);		
		
		

68.6.13. Http Base Authentication - 401 Unauthorized

		
			WebClient client = WebClient.builder()
                    .baseUrl("https://www.netkiller.cn")
                    .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("admin", "uPQKFe98bIwZCzgVGjbIQRyRyyecW2Ha"))
//                    .defaultHeaders(httpHeaders -> httpHeaders.setBearerAuth("<bearer token>"))
                    .build();

            Mono<String> response = client.get().uri("/")
                    .retrieve().bodyToMono(String.class);

            log.info(response.block());		
		
		

		
            WebClient webClient = WebClient.create("https://www.netkiller.cn");
            final String flux = webClient.get()
                    .uri("/")
                    .headers(httpHeaders -> httpHeaders.setBasicAuth("admin", "uPQKFyRyyecbIwZCzgVGjbW2e98IQRHa"))
                    .retrieve()
                    .bodyToMono(String.class)
                    .block();
            System.out.println(flux);		
		
		

ExchangeFilterFunctions 方案

		
 		WebClient client = WebClient.builder()
                    .baseUrl("https://www.netkiller.cn")
                    .filter(ExchangeFilterFunctions.basicAuthentication("admin", "uPQcbIwZKFe98IGjbW2QRyRyyeCzgVHa"))
                    .build();

            Mono<String> response = client.get().uri("/")
                    .retrieve().bodyToMono(String.class);
            log.info(response.block());		
		
		

68.6.14. SSE(Server-Sent Events)

68.6.14.1. 订阅 SSE

		
public void consumeServerSentEvent() {
    WebClient client = WebClient.create("https://www.netkiller.cn:8080/sse/server");
    ParameterizedTypeReference<ServerSentEvent<String>> type
     = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

    Flux<ServerSentEvent<String>> eventStream = client.get()
      .uri("/stream")
      .retrieve()
      .bodyToFlux(type);

    eventStream.subscribe(
      content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
        LocalTime.now(), content.event(), content.id(), content.data()),
      error -> logger.error("Error receiving SSE: {}", error),
      () -> logger.info("Completed!!!"));
}		
		
		

同步阻塞等待 SSE 执行完成

		
        List<String> posts = eventStream.collectList().block();
        log.info("结果数:{}, {}" , posts.size(),posts.toString());		
		
		
		
public Flux<String> stream(String prompt) {
        String format = "请帮我写一篇《%s》故事。";
        String content = String.format(format, prompt);
        WebClient client = WebClient.create(url);
        Flux<String> flux = client.get()
                .uri("/chatgpt/stream?content={content}", content)
                .headers(httpHeaders -> httpHeaders.setBasicAuth(username, password))
                .retrieve()
                .bodyToFlux(String.class);
//                .map(content -> {
//                    log.info("data: {}", content);
//                    return content;
//                });
        return flux;
    }

    public Flux<String> streamSentence(String prompt, Callback1 callback) {
        //        Set<String> symbol = Set.of(",", ".", "!", "?", ",", "。", "!", "?");
        Set<String> symbol = Set.of(".", "!", "?", "。", "!", "?");

        return Flux.create(sink -> {
            StringBuffer word = new StringBuffer();
            List<String> sentence = new ArrayList<String>();
            Flux<String> flux = this.stream(prompt);
//            ParallelFlux<String> flux = stream.parallel(5).runOn(Schedulers.parallel());
            flux.subscribe(
                    content -> {
                        word.append(content);
                        boolean contains = symbol.stream().anyMatch(content::contains);
                        if (contains) {
                            sentence.add(word.toString());
//                            new Thread(() -> {
                            sink.next(word.toString());
//                            }).start();
                            log.debug("sentence: {}", sentence.get(sentence.size() - 1));
                            word.setLength(0);
                        }
//                    log.info("word: {}", content);
                    },
                    error -> log.error("Error receiving SSE: {}", error),
                    () -> {
                        sink.complete();
                        sentence.add(word.toString());
                        String segment = String.join("", sentence);
                        log.info("Story: {}", segment);
                        if (callback != null) {
                            callback.onCallback(segment);
                        }
                    });
        });
    }		
		
		

68.6.14.2. SSE(Server-Sent Events) 传输空格和回车问题

服务端

			
public Flux<String> deepseek(String device, String taskId, String speakId, List<Map<String, String>> messages) {

        WebClient webClient = WebClient.builder()
                .baseUrl(url)
                .defaultHeader("Content-Type", "application/json")
                .defaultHeader("Authorization", "Bearer " + authorization)
                .build();

//        List<Map<String, String>> messages = new ArrayList<Map<String, String>>();
//        messages.add(Map.of("role", "user", "content", content));

        log.debug(messages.toString());

        Map<String, Object> requestBody = Map.of(
                "model", model,
                "messages", messages,
                "stream", true
        );

        return webClient.post()
                .uri("/chat/completions")
                .bodyValue(requestBody)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(String.class)
//                .filter(chunk -> !chunk.trim().isEmpty())
                .map(chunk -> {
//                    if (chunk.trim().equals("data: [DONE]")) {
//                        return "[END]";
//                    }
                    try {
//                        log.debug("chunk: " + chunk);
                        String jsonString = chunk.replaceFirst("^data: ", "");
                        JsonNode jsonNode = new ObjectMapper().readTree(jsonString);
                        String result = jsonNode.path("choices").path(0).path("delta").path("content").asText();
                        result = result.replace(" ", "\\x20").replace("\n", "\\x0A");
//                        return result != null ? result : "";
                        log.debug("[" + result + "]");
                        return result;
                    } catch (JsonProcessingException e) {
                        return "";
                    }
                })
//                .filter(text -> text != null && !text.equals("null") && !text.isEmpty())
                .onErrorResume(e -> {
                    log.error("Error processing stream", e);
                    return Flux.just("[ERROR]");
                });
    }			
			
			

解决方案

			
# 传输前
result = result.replace(" ", "\\x20").replace("\n", "\\x0A");

# 接收端

String text = data.replace("\\x20", " ").replace("\\x0A", "\n"); 			
			
			

客户端

			
    public void summary(Map<String, String> messages, BiConsumer<String, String> biConsumer) throws InterruptedException {

        String jsonString = gson.toJson(messages);

        httpRestful.postStream("meeting/" + Config.Cloud.appId + "/" + Config.Android.androidId + "/summary", null, jsonString, new EventSourceListener() {
            @Override
            public void onClosed(@NonNull EventSource eventSource) {
                super.onClosed(eventSource);
                biConsumer.accept("onClosed", "");
            }

            @Override
            public void onEvent(@NonNull EventSource eventSource, @Nullable String id, @Nullable String type, @NonNull String data) {
                super.onEvent(eventSource, id, type, data);
                Log.d(TAG, "Stream Id: " + id + " Type: " + type + " Data: [" + data + "]");   // 请求到的数据
                String text = data.replace("\\x20", " ").replace("\\x0A", "\n");
                biConsumer.accept(type, text);
            }

            @Override
            public void onFailure(@NonNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) {
                super.onFailure(eventSource, t, response);
                networkFailure();
                Log.w(TAG, "Throwable: " + t + " Response: " + response);
            }

            @Override
            public void onOpen(@NonNull EventSource eventSource, @NonNull Response response) {
                super.onOpen(eventSource, response);
                biConsumer.accept("onOpen", "");
            }
        });

    }			
			
			

68.6.14.3. ChatGPT/DeepSeek 拼接字符,实现整句返回

			
package cn.netkiller.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

@Service
@Slf4j
public class DeepSeekService {
    private final Gson gson = new Gson();

    @Value("${deepseek.url}")
    private String url;
    @Value("${deepseek.authorization}")
    private String authorization;
    @Value("${deepseek.model}")
    private String model;

    public Flux<String> deepseek(String device, String taskId, String speakId, List<Map<String, String>> messages) {

        WebClient webClient = WebClient.builder()
                .baseUrl(url)
                .defaultHeader("Content-Type", "application/json")
                .defaultHeader("Authorization", "Bearer " + authorization)
                .build();

//        List<Map<String, String>> messages = new ArrayList<Map<String, String>>();
//        messages.add(Map.of("role", "user", "content", content));

        log.debug(messages.toString());

        Map<String, Object> requestBody = Map.of(
                "model", model,
                "messages", messages,
                "stream", true
        );

        Flux<String> flux = webClient.post()
                .uri("/chat/completions")
                .bodyValue(requestBody)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(String.class)
//                .filter(chunk -> !chunk.trim().isEmpty())
                .map(chunk -> {
//                    if (chunk.trim().equals("data: [DONE]")) {
//                        return "[END]";
//                    }
                    try {
//                        log.debug("chunk: " + chunk);
                        String jsonString = chunk.replaceFirst("^data: ", "");
                        JsonNode jsonNode = new ObjectMapper().readTree(jsonString);
                        String result = jsonNode.path("choices").path(0).path("delta").path("content").asText();
                        result = result.replace(" ", "\\x20").replace("\n", "\\x0A");
//                        return result != null ? result : "";
//                        log.debug("[" + result + "]");
                        return result;
                    } catch (JsonProcessingException e) {
                        return "";
                    }
                })
//                .filter(text -> text != null && !text.equals("null") && !text.isEmpty())
                .onErrorResume(e -> {
                    log.error("Error processing stream", e);
                    return Flux.just("[ERROR]");
                });

        Set<String> symbol = Set.of(".", "!", "?", ",", "。", "!", "?", "EOT");

        return flux.concatWithValues("EOT")
                .scan(new Sentence(), (sentence, characters) -> {
//                    log.info("characters " + characters);
                    sentence.words.append(characters);
                    boolean contains = symbol.stream().anyMatch(characters::contains);
                    if (contains) {
                        sentence.segment = true;
//                        log.info("current " + state.current);
                    }
                    return sentence;
                })
                .filter(state -> state.segment)
                .map(state -> state.getSentence());
    }

    public String deepseek(List<Map<String, String>> messages) {
        try {

            if (messages == null || messages.isEmpty()) {
                throw new RuntimeException("摘要内容为空");
            }

            // 4. 构建请求体(示例)
            Map<String, Object> requestBody = new HashMap<>();
            requestBody.put("model", model);
            requestBody.put("messages", messages);

            // 5. 发送请求并获取响应(使用WebClient)
            String response = WebClient.create(url)
                    .post()
                    .uri("/chat/completions")
                    .header("Authorization", "Bearer " + authorization)
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(requestBody)
                    .retrieve()
                    .bodyToMono(String.class)
                    .block();

            // 6. 解析响应
            JsonNode responseNode = new ObjectMapper().readTree(response);
            return responseNode.path("choices").path(0).path("message").path("content").asText();

        } catch (Exception e) {
            // 异常处理
            log.error("获取答案失败", e);
            return "无法生成答案:" + e.getMessage();
        }
    }

    public Flux<String> streamSegment(Flux<String> flux, Consumer<String> consumer) {

        StringBuffer sentence = new StringBuffer();
//        StringBuffer segment = new StringBuffer();
//        AtomicInteger length = new AtomicInteger(sequence.poll());
//        StringBuffer text = new StringBuffer();

        Set<String> symbol = Set.of(".", "!", "?", ",", "。", "!", "?");

        flux.share();
        flux.subscribe(
                word -> {
                    sentence.append(word);
                    boolean contains = symbol.stream().anyMatch(word::contains);
//                    log.debug(marker,"Word: {} ({}) Symbol:{}", word, word.length(), contains);
                    if (contains) {
//                        log.debug(marker, "Sentence: {} ({})", sentence, sentence.length());
                        consumer.accept(sentence.toString());
                        sentence.setLength(0);
//                        log.info(marker,"Segment length: {}", length);
                    }
                }, error -> {
//                    log.error(marker, "stream error: " + error);
//                    biConsumer.accept(ChatGPTComponent.ChatGPTText.ERROR, error.toString());
                }, () -> {
                    if (!sentence.isEmpty()) {
                        consumer.accept(sentence.toString());
                    }
//                    log.debug("Text: {} ({})", text, text.length());
                });
        return flux;
    }

    static class Sentence {
        StringBuilder words = new StringBuilder();
        boolean segment = false;

        public String getSentence() {
            String tmp = words.toString();
            words.setLength(0);
            segment = false;
            return tmp.replace("EOT", "");
        }
    }

    //
    @Data
    public static class DeepSeekResponse {

    }

}			
			
			

68.6.15. 超时时间

		
webClient.get()
  .uri("https://www.netkiller.cn/index.html")
  .retrieve()
  .bodyToFlux(JsonNode.class)
  .timeout(Duration.ofSeconds(5));
		
		

68.6.16. share() 共享订阅数据

WebClient 只能被订阅一次,如果你尝试多次订阅,就是重复请求

		
		WebClient client = WebClient.create(url);
        Flux<String> flux = client.get()
                .uri("/chatgpt/stream/sentence?content={content}&symbol=。", content)
                .headers(httpHeaders -> httpHeaders.setBasicAuth(username, password))
                .retrieve()
                .bodyToFlux(String.class)
        flux.subscribe(
                value -> {
                    log.warn(value);
                });
        flux.subscribe(System.out::println);
        flux.subscribe(System.out::println);
		
		

解决方案是加入 .share() 共享订阅数据

		
		WebClient client = WebClient.create(url);
        Flux<String> flux = client.get()
                .uri("/chatgpt/stream/sentence?content={content}&symbol=。", content)
                .headers(httpHeaders -> httpHeaders.setBasicAuth(username, password))
                .retrieve()
                .bodyToFlux(String.class)
                .share();		
		
		

68.6.17. 打印调试日志

.log() 会打印运行期间的所有信息

		
        Flux<String> flux = client.get()
                .uri("/chatgpt/stream?content={content}", prompt)
                .headers(httpHeaders -> httpHeaders.setBasicAuth(username, password))
                .retrieve()
                .bodyToFlux(String.class).log();
		
		

68.6.18. 解决 WebClient 截断 URI的问题

		
https://prod-tingwu-paas-common-beijing.oss-cn-beijing.aliyuncs.com/tingwu/output/1744644768465976/b0ef5830bfe94e9881ee08b0a2dab21b/b0ef5830bfe94e9881ee08b0a2dab21b_20250708103257.mp3?Expires=1754920073&OSSAccessKeyId=LTAI5tMzZ1D4o1drkJN1TfCr&Signature=PhUBnEuKW1VOhBMeyTaqJugqt9U%3D		
		
		

mp3后面,问号以及参数会被 WebClient 截断,解决方案

		
	@SneakyThrows
    public String audio(String taskId, String filename) {
        Optional<GetTaskInfoResponseBody.GetTaskInfoResponseBodyData> optional = meetingService.getTaskInfo(taskId);
        if (optional.isEmpty()) {
            return null;
        }
        String mp3file = optional.get().getOutputMp3Path();
        log.info("下载音频文件 " + mp3file);
        if (mp3file == null) return null;

        WebClient client = WebClient.builder()
                .codecs(configurer -> configurer
                        .defaultCodecs()
                        .maxInMemorySize(128 * 1024 * 1024))
                .build();

        Flux<DataBuffer> flux = client.get()
                .uri(URI.create(mp3file))
                .retrieve()
                .bodyToFlux(DataBuffer.class);

        String datetime = LocalDateTime.now().format(dateTimeFormatter);
        File file = new File(String.format("%s/%s/%s", WORKDIR, datetime, taskId));
        if (!file.isDirectory()) {
            file.mkdirs();
        }
        String filepath = String.format("%s/%s", file.getAbsoluteFile(), filename);

        Path path = Paths.get(filepath);
        DataBufferUtils.write(flux, path).block();
        long size = Files.size(path);
        log.info("文件下载完成: " + filepath + " 大小:" + size + " 字节");
        return String.format("%s/%s/%s", datetime, taskId, filename);
    }
		
		

68.6.19. 异常处理

68.6.19.1. 使用 onStatus() 处理 HTTP 响应异常(推荐)

针对特定 HTTP 状态码自定义处理逻辑,避免直接抛出异常。

WebClientResponseException:HTTP 响应状态码为 4xx 或 5xx 时抛出,包含响应状态码、 headers、响应体等信息。子类:HttpClientErrorException(4xx)、HttpServerErrorException(5xx)。

			
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;

public class WebClientExceptionExample {
    private final WebClient webClient = WebClient.create("https://api.example.com");

    public Mono<String> fetchData() {
        return webClient.get()
                .uri("/data")
                .retrieve()
                // 处理 4xx 客户端错误
                .onStatus(status -> status.is4xxClientError(), response -> {
                    // 可获取响应体、状态码等信息
                    return response.bodyToMono(String.class)
                            .flatMap(body -> Mono.error(
                                    new RuntimeException("客户端错误: " + response.statusCode() + ", 响应体: " + body)
                            ));
                })
                // 处理 5xx 服务端错误
                .onStatus(status -> status.is5xxServerError(), response -> {
                    return Mono.error(new RuntimeException("服务端错误: " + response.statusCode()));
                })
                .bodyToMono(String.class);
    }
}			
			
			

68.6.19.2. 使用 doOnError() 记录异常(不改变流的处理)

仅用于日志记录,不影响异常的传播(异常仍会向上抛出)。

			
public Mono<String> fetchData() {
    return webClient.get()
            .uri("/data")
            .retrieve()
            .bodyToMono(String.class)
            .doOnError(WebClientResponseException.class, e -> 
                log.error("HTTP 错误: 状态码={}, 响应体={}", e.getStatusCode(), e.getResponseBodyAsString())
            )
            .doOnError(IOException.class, e -> 
                log.error("网络异常: {}", e.getMessage(), e)
            );
}			
			
			

68.6.19.3. 使用 onErrorResume() 捕获所有异常

对所有可能的异常统一处理(包括网络异常、解析异常等)。

			
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.io.IOException;

public class WebClientExceptionExample {
    public Mono<String> fetchData() {
        return webClient.get()
                .uri("/data")
                .retrieve()
                .bodyToMono(String.class)
                // 捕获特定异常并返回默认值或转换异常
                .onErrorResume(WebClientResponseException.class, e -> {
                    System.err.println("HTTP 错误: " + e.getStatusCode());
                    return Mono.just("默认数据(因HTTP错误)"); // 返回默认值
                })
                .onErrorResume(IOException.class, e -> {
                    System.err.println("网络异常: " + e.getMessage());
                    return Mono.error(new RuntimeException("网络连接失败,请重试", e)); // 转换异常
                })
                .onErrorResume(Exception.class, e -> {
                    System.err.println("其他异常: " + e.getMessage());
                    return Mono.empty(); // 返回空结果
                });
    }
}			
			
			

68.6.19.4. 同步调用时的异常处理(block() 场景)

如果使用 block() 将响应转为同步结果,需用 try-catch 捕获异常:

			
public String fetchDataSync() {
    try {
        return webClient.get()
                .uri("/data")
                .retrieve()
                .bodyToMono(String.class)
                .block(); // 同步阻塞获取结果
    } catch (WebClientResponseException e) {
        // 处理 HTTP 响应异常
        log.error("HTTP 错误: {}", e.getStatusCode());
        return "默认值";
    } catch (Exception e) {
        // 处理其他异常(网络、解析等)
        log.error("调用失败", e);
        throw new RuntimeException("服务调用异常", e);
    }
}