使用 SpringWebFlux 的响应式 Web 应用程序详解

  • 非阻塞应用
  • 事件驱动和异步
  • 需要少量线程来垂直扩展(即在 JVM 内)


2. Spring 中的响应式编程

Spring Framework 在内部使用 Reactor 来支持其自身的反应式支持。 Reactor 是 Reactive Streams(发布者,Java9 引入)的一种实现。 Reactor 有以下两种数据类型:

  • Flux(它是一个可以发出 0 个或更多元素的 Stream)
  • Mono(它是一个可以发射 0 或 1 个元素的 Stream)

Spring 从其 API 中公开这些类型,从而使应用程序具有反应性。

在 Spring 5 中,引入了一个名为 WebFlux 的新模块,它支持使用 HTTP(REST) 和 Web 套接字创建反应式 Web 应用程序。

Spring Web Flux 支持以下两种模型:

  • 功能模型
  • 标注模型


下表比较了普通的 Spring 和 Web Flux:

传统堆栈 反应堆
Spring Web MVC Spring WebFlux
控制器和处理程序映射 路由器函数
Servlet API HTTP/反应流
Servlet 容器 任何支持 Servlet 3.1+、Tomcat 8.x、Jetty、Netty、UnderTow 的 servlet 容器


必须为将在员工上公开 CRUD 的员工管理系统创建使用 Spring Web Flux 的 REST API。



  • Java:1.8或以上
  • Maven:3.3.9 或以上
  • Eclipse Luna 或以上
  • Spring启动:2.0.0.M4
  • Spring Boot 启动器 WebFlux
  • 测试应用程序的邮递员


Spring5 WebFlux 的功能模型是使用 Spring MVC 样式注释的替代方法。在 Spring WebFlux 功能模型中,路由器和处理函数用于创建 MVC 应用程序。HTTP 请求通过 router function 路由(替代 @RequestMapping) 并通过处理程序函数 处理请求($ 处理程序方法的替代方法)。

每个处理程序函数 都将ServerRequest (org.springframework.web.reactive.function.server.ServerRequest) 作为参数,结果将返回Mono<ServerResponse>Flux<ServerResponse> (org.springframework.web.reactive.function.server.ServerResponse)。



<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
			<name>Spring Snapshots</name>
			<name>Spring Milestones</name>
			<name>Spring Snapshots</name>
			<name>Spring Milestones</name>
		<relativePath />
		<!-- lookup parent from repository -->
             <!-- Configuring Java 8 for the Project -->
      <!--Excluding Embedded tomcat to make use of the Netty Server-->


package com.webflux.dao;
import java.util.LinkedHashMap;
import java.util.Map;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import com.webflux.bussiness.bean.Employee;
public class EmployeeDAO {
	 * Map is used to Replace the Database 
	 * */
	static public Map mapOfEmloyeess = new LinkedHashMap();
	static int count=10004;
	   mapOfEmloyeess.put(10001, new Employee("Jack",10001,12345.6,1001));
	   mapOfEmloyeess.put(10002, new Employee("Justin",10002,12355.6,1002));
	   mapOfEmloyeess.put(10003, new Employee("Eric",10003,12445.6,1003));

	 * Returns all the Existing Employees as Flux
	 * */
	public Flux getAllEmployee(){
		return Flux.fromStream(mapOfEmloyeess.values().stream());		

	/**Get Employee details using EmployeeId .
	 * Returns a Mono response with Data if Employee is Found
	 * Else returns a null
	 * */
	public Mono getEmployeeDetailsById(int id){
		Monores = null;
		Employee emp =mapOfEmloyeess.get(id);
		return res;

	/**Create Employee details.
	 * Returns a Mono response with auto-generated Id
	 * */
	public Mono addEmployee(Employee employee){
		mapOfEmloyeess.put(count, employee);
		return Mono.just(count);

	/**Update the Employee details,
	 * Receives the Employee Object and returns the updated Details
	 * as Mono  
	 * */
	public Mono updateEmployee (Employee employee){
		mapOfEmloyeess.put(employee.getEmployeeId(), employee);
		return Mono.just(employee);

	/**Delete the Employee details,
	 * Receives the EmployeeID and returns the deleted employee Details
	 * as Mono  
	 * */
	public Mono removeEmployee (int id){
		Employee emp= mapOfEmloyeess.remove(id);
		return Mono.just(emp);

可以观察到 EmployeeDAO 的所有方法都返回 Mono 或 Flux Response,从而使 DAO 层具有反应性。


package com.webflux.web.handler;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import com.webflux.bussiness.bean.Employee;
import com.webflux.dao.EmployeeDAO;
public class EmployeeHandler {

	private EmployeeDAO employeeDAO;
	 * Receives a ServerRequest.
	 * Invokes the method getAllEmployee() from EmployeeDAO.
	 * Prepares a Mono and returns the same.
	 * */	
      public Mono getEmployeeDetails(ServerRequest request) {
		Flux  res=employeeDAO.getAllEmployee();		
		return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
	 * Receives a ServerRequest.
	 * Extracts the Path Variable (named id) from the Request.
	 * Invokes the method [getEmployeeDetailsById()] from EmployeeDAO.
	 * Verifies if the object returned in the previous step is null 
	 * then returns a Bad request with appropriate message.
	 * Else Returns the Mono with the Employee Data.
	 * */
	public Mono getEmployeeDetailByEmployeeId(ServerRequest request) {
		//Extracts the Path Variable id from the Request
		int id =Integer.parseInt(request.pathVariable("id"));
		Mono employee = employeeDAO.getEmployeeDetailsById(id);
		Mono res= null;
									(fromObject("Please give a valid employee Id"));
			//Converting Mono of Mono type to Mono
		return res;

	 * Receives a ServerRequest.
	 * Makes use of BodyExtractors and Extracts the Employee Data as 
     * Mono from the ServerRequest.
	 * Invokes the method [addEmployee()] of the EmployeeDAO.
	 * Prepares a Mono and returns the same. 
	 * */
	public Mono addEmployee(ServerRequest request) {
		Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee.class));
            Mono mono= employeeDAO.addEmployee(requestBodyMono.block());
		//Converting Mono of Mono type to Mono
		Mono res= mono.flatMap(x->ServerResponse.ok().body
									(fromObject("Employee Created with Id"+x))); 
		return res;

	 * Receives a ServerRequest.
	 * Makes use of BodyExtractors and Extracts the Employee Data as 
     * Mono from the ServerRequest.
	 * Finds the Employee and updates the details by invoking updateEmployee() of   
     * EmployeeDAO. 
     * Prepares a Mono and returns the same.
	 * */
	public Mono updateEmployee(ServerRequest request) {
		Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee.class));
		Employee employee =  requestBodyMono.block();
	      Mono employeeRet = employeeDAO.getEmployeeDetailsById(employee.getEmployeeId());
	      Mono res= null;
			                ("Please Give valid employee details to update"));

			Mono emp= employeeDAO.updateEmployee(employee);
			//Converting Mono of Mono type to Mono
		return res;				

	 * Receives a ServerRequest.
	 * Makes use of BodyExtractors and Extracts the Employee Data as 
     * Mono from the ServerRequest.
	 * Finds the Employee and deletes the details by invoking removeEmployee() of   
     * EmployeeDAO. 
     * Prepares a Mono and returns the same.
	 * */
	public Mono deleteEmployee(ServerRequest request) {
		int myId = Integer.parseInt(request.pathVariable("id"));
		Mono res= null;
		if (employeeDAO.getEmployeeDetailsById(myId) == null) {
                      (fromObject("Please Give valid employee details to delete"));
		  Mono employee = employeeDAO.removeEmployee(myId);
              //Converting Mono of Mono type to Mono
		return res;

可以观察到,Handler 的所有方法都返回 Mono<ServerResponse>,从而使 Presentation Layer Reactive。

注意:事件处理程序方法应接受ServerRequest 并应返回Mono<ServerResponse>


package com.webflux.web.router.config;
import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.PUT;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.webflux.web.handler.EmployeeHandler;
 * Router is configuration class.
 * It links the incoming requests with appropriate HTTP methods to the 
 * respective method of the EmployeeHandler.
 * Method references are used for the mapping.
 * */
public class RouterConfiguration{
    EmployeeHandler employeeHandler;
    public RouterFunction monoRouterFunction() {
     RouterFunctionrouterFunction=  RouterFunctions.		
        return routerFunction;


package com.webflux;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
public class ApplicationBootUp {
	public static void main(String[] args) {


在 application.properties 中只提到了服务器端口:server.port=8090

可以使用命令部署应用程序:clean install spring-boot:run 并使用postman client 进行测试。


  • https://docs.spring.io/spring/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/web-reactive.html
  • http://www.baeldung.com/reactor-core
  • http://www.baeldung.com/spring-5-functional-web


下载 您可以在此处下载此示例的完整源代码:SpringWebFlux
