Home | 简体中文 | 繁体中文 | 杂文 | 知乎专栏 | Github | OSChina 博客 | 云社区 | 云栖社区 | Facebook | Linkedin | 视频教程 | 打赏(Donations) | About
知乎专栏多维度架构 微信号 netkiller-ebook | QQ群:128659835 请注明“读者”

10.2. WebFlux Router

10.2.1. Component 原件

		
package cn.netkiller.webflux.component;

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Mono;

@Component
public class HelloWorldHandler {

	public HelloWorldHandler() {
	}

	public Mono<ServerResponse> helloWorld(ServerRequest request) {
		return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(BodyInserters.fromObject("Hello World!!!"));
	}
}

		
		

10.2.2. 路由配置

		
package cn.netkiller.webflux.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

import cn.netkiller.webflux.component.HelloWorldHandler;

@Configuration
public class WebFluxRouter {

	public WebFluxRouter() {
	}

	@Bean
	public RouterFunction<ServerResponse> routeHelloWorld(HelloWorldHandler helloWorldHandler) {

		return RouterFunctions.route(RequestPredicates.GET("/hello").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), helloWorldHandler::helloWorld);
	}
}
		
		
		

10.2.3. Thymeleaf

10.2.3.1. 模板引擎 Thymeleaf 依赖

			
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>			
			
			

10.2.3.2. application.properties 相关的配置

			
spring.thymeleaf.cache=true # Enable template caching.
spring.thymeleaf.check-template=true # Check that the template exists before rendering it.
spring.thymeleaf.check-template-location=true # Check that the templates location exists.
spring.thymeleaf.enabled=true # Enable Thymeleaf view resolution for Web frameworks.
spring.thymeleaf.encoding=UTF-8 # Template files encoding.
spring.thymeleaf.excluded-view-names= # Comma-separated list of view names that should be excluded from resolution.
spring.thymeleaf.mode=HTML5 # Template mode to be applied to templates. See also StandardTemplateModeHandlers.
spring.thymeleaf.prefix=classpath:/templates/ # Prefix that gets prepended to view names when building a URL.
spring.thymeleaf.reactive.max-chunk-size= # Maximum size of data buffers used for writing to the response, in bytes.
spring.thymeleaf.reactive.media-types= # Media types supported by the view technology.
spring.thymeleaf.servlet.content-type=text/html # Content-Type value written to HTTP responses.
spring.thymeleaf.suffix=.html # Suffix that gets appended to view names when building a URL.
spring.thymeleaf.template-resolver-order= # Order of the template resolver in the chain.
spring.thymeleaf.view-names= # Comma-separated list of view names that can be resolved.
			
			

10.2.3.3. 

			
	@GetMapping("/welcome")
    public Mono<String> hello(final Model model) {
        model.addAttribute("name", "Neo");
        model.addAttribute("city", "深圳");

        String path = "hello";
        return Mono.create(monoSink -> monoSink.success(path));
    }

    @GetMapping("/list")
    public String listPage(final Model model) {
        final Flux<City> citys = cityService.findAllCity();
        model.addAttribute("cityLists", citys);
        return "cityList";
    }			
			
			

10.2.3.4. Tymeleaf 视图

welcome.html

			
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8"/>
    <title>欢迎页面</title>
</head>

<body>

<h1 >你好,欢迎来自<p th:text="${city}"></p>的<p th:text="${name}"></p></h1>

</body>
</html>			
			
			

cityList.html

			
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8"/>
    <title>城市列表</title>
</head>

<body>

<div>


    <table>
        <legend>
            <strong>城市列表</strong>
        </legend>
        <thead>
        <tr>
            <th>城市编号</th>
            <th>省份编号</th>
            <th>名称</th>
            <th>描述</th>
        </tr>
        </thead>
        <tbody>
        <tr th:each="city : ${cityLists}">
            <td th:text="${city.id}"></td>
            <td th:text="${city.provinceId}"></td>
            <td th:text="${city.name}"></td>
            <td th:text="${city.description}"></td>
        </tr>
        </tbody>
    </table>

</div>

</body>
</html>			
			
			

10.2.4. Webflux Redis

10.2.4.1. Maven Redis 依赖

			
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
		</dependency>
			
			

10.2.4.2. Redis 配置

			
server:
  port: 8080
spring:
  application:
    name: webflux
  redis:
    host: 127.0.0.1
    port: 6379
    password: pwd2020
    timeout: 5000
    lettuce:
      pool:
        max-active: 200 
        max-idle: 20 
        min-idle: 5 
        max-wait: 1000 			
			
			

10.2.4.3. Config

			
	@Bean
	public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
		ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(factory,RedisSerializationContext.string());
		return reactiveRedisTemplate;
	}			
			
			

10.2.4.4. Service

			
@Service
public class RedisServiceImpl implements RedisService {

	@Autowired
	private ReactiveRedisTemplate<String, String> redisTemplate;
	
	@Override
	public Mono<String> get(String key) {
		
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.get(key);
	}

	@Override
	public Mono<String> set(String key,User user) {
		
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.getAndSet(key, JSON.toJSONString(user));
	}

	@Override
	public Mono<Boolean> delete(String key) {
		
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.delete(key);
	}

	@Override
	public Mono<String> update(String key,User user) {
		
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.getAndSet(key, JSON.toJSONString(user));
	}

	@Override
	public Flux<String> all(String key) {
		ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
		return operations.range(key, 0, -1);
	}


	@Override
	public Mono<Long> push(String key,List<String> list) {
		
		ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
		return operations.leftPushAll(key, list);
	}
	
	@Override
	public Flux<String> find(String key) {
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return redisTemplate.keys(key).flatMap(keyId ->operations.get(keyId));
	}
}			
			
			

10.2.4.5. 

			
@RestController
@RequestMapping("/user")
public class UserController {
	
	public final static String USER_KEY="user";
	
	@Autowired
	private RedisService redisService;
	
	@GetMapping("/get/{key}")
	public Mono<String> getUserByKey(@PathVariable("id")String key){
		return redisService.get(key);
	}
	
	@GetMapping("/add")
	public Mono<String> add(User user){
		user = new User();
		user.setAccount("neo");
		user.setPassword("123456");
		user.setNickname("netkiller");
		user.setEmail("netkiller@msn.com");
		user.setPhone("");
		user.setGender(true);
		user.setBirthday("1980-01-30");
		user.setProvince("广东省");
		user.setCity("深圳市");
		user.setCounty("南山区");
		user.setAddress("");
		user.setState("Enabled");

		System.out.println(JSON.toJSONString(user));
		return redisService.set("neo",user);		
	}
	
	@GetMapping("/addlist")
	public Mono<Long> addlist(){
		List<String> list=new ArrayList<String>();
		User user = new User();
		user.setAccount("neo");
		user.setPassword("123456");
		user.setNickname("netkiller");
		user.setEmail("netkiller@msn.com");
		user.setPhone("");
		user.setGender(true);
		user.setBirthday("1980-01-30");
		user.setProvince("广东省");
		user.setCity("深圳市");
		user.setCounty("南山区");
		user.setAddress("");
		user.setState("Enabled");
		
		//添加第一条数据
		list.add(JSON.toJSONString(user));
		//添加第二条数据
		list.add(JSON.toJSONString(user));
		//添加第三条数据
		list.add(JSON.toJSONString(user));
		
		return redisService.addlist("list", list);
	}

	@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux<String> findAll(){
		return redisService.all("list").delayElements(Duration.ofSeconds(2));
	}
	
	@GetMapping("/getUsers")
	public Flux<String> findUsers() {
		return redisService.find("*").delayElements(Duration.ofSeconds(2));
	}
}			
			
			

10.2.5. Webflux Mongdb

10.2.5.1. Maven 依赖

			
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
	</dependency>			
			
			

10.2.5.2. Repository

			
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

import cn.netkiller.entity.User;

public interface UserRepository extends ReactiveMongoRepository<User, Long>{

}
			
			

10.2.5.3. Service

			
@Service
public class MongoServiceImpl implements MongoService {
	
	@Autowired
	private UserRepository userRepository;
	
	@Override
	public Mono<User> getById(Long id) {
		return userRepository.findById(id);
	}

	@Override
	public Mono<User> addUser(User user) {
		return userRepository.save(user);
	}

	@Override
	public Mono<Boolean> deleteById(Long id) {
		 userRepository.deleteById(id);
		 return Mono.create(userMonoSink -> userMonoSink.success());
	}

	@Override
	public Mono<User> updateById(User user) {
		return userRepository.save(user);
	}

	@Override
	public Flux<User> findAllUser() {
		return userRepository.findAll();
	}
}			
			
			

10.2.5.4. 控制器

			
@RestController
@RequestMapping("/usermg")
public class UserMongoController {
	
	@Autowired
	private MongoService mongoService;

	@GetMapping("/add")
	public Mono<User> add(User user) {
		user = new User();
		User user = new User();
		user.setAccount("neo");
		user.setPassword("123456");
		user.setNickname("netkiller");
		user.setEmail("netkiller@msn.com");
		user.setPhone("");
		user.setGender(true);
		user.setBirthday("1980-01-30");
		user.setProvince("广东省");
		user.setCity("深圳市");
		user.setCounty("南山区");
		user.setAddress("");
		user.setState("Enabled");
		
	
		System.out.println(JSON.toJSONString(user));
		return mongoService.addUser(user);

	}
	
	/**
	 *	注意这里 produces = MediaType.APPLICATION_STREAM_JSON_VALUE 必须这样设置
	 */
	@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux<User> findAll(){
		return mongoService.findAllUser().delayElements(Duration.ofSeconds(1));
	}
}
			
			

produces 如果不是application/stream+json则调用端无法滚动得到结果,将一直阻塞等待数据流结束或超时。

10.2.6. SSE

		
package cn.netkiller.webflux.controller;

import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;

import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

@RestController
@RequestMapping("/sse")
public class SseController {
	private int count_down = 10;

	public SseController() {

	}

	@GetMapping(value = "/launch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<ServerSentEvent<Object>> countDown() {

		return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build());
	}

	private String getCountDownSec() {
		if (count_down > 0) {
			count_down--;
			return "倒计时:" + count_down;
		}
		return "发射";
	}

	@GetMapping("/random")
	public Flux<ServerSentEvent<Integer>> randomNumbers() {
		return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, ThreadLocalRandom.current().nextInt())).map(data -> ServerSentEvent.<Integer>builder().event("random").id(Long.toString(data.getT1())).data(data.getT2()).build());
	}

	@GetMapping("/range")
	public Flux<Object> range() {
		return Flux.range(10, 1).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build());
	}

}
		
		
		

运行结果

		
id:0
event:launch
data:倒计时:9

id:1
event:launch
data:倒计时:8

id:2
event:launch
data:倒计时:7

id:3
event:launch
data:倒计时:6

id:4
event:launch
data:倒计时:5

id:5
event:launch
data:倒计时:4

id:6
event:launch
data:倒计时:3

id:7
event:launch
data:倒计时:2

id:8
event:launch
data:倒计时:1

id:9
event:launch
data:倒计时:0

id:10
event:launch
data:发射
		
		
		

10.2.7. Mono/Flux

Mono(返回0或1个元素)/Flux(返回0-n个元素)