响应式编程是为具有以下特征的应用程序创造的术语:
就像面向对象编程、函数式编程或过程式编程一样,反应式编程只是另一种编程范式。它使我们的程序:响应迅速、有弹性、有弹性。
Spring Framework 在内部使用 Reactor 来支持其自身的反应式支持。 Reactor 是 Reactive Streams(发布者,Java9 引入)的一种实现。 Reactor 有以下两种数据类型:
Spring 从其 API 中公开这些类型,从而使应用程序具有反应性。
在 Spring 5 中,引入了一个名为 WebFlux 的新模块,它支持使用 HTTP(REST) 和 Web 套接字创建反应式 Web 应用程序。
Spring Web Flux 支持以下两种模型:
在本文中,我们将探讨功能模型。
下表比较了普通的 Spring 和 Web Flux:
传统堆栈 | 反应堆 |
Spring Web MVC | Spring WebFlux |
控制器和处理程序映射 | 路由器函数 |
Servlet API | HTTP/反应流 |
Servlet 容器 | 任何支持 Servlet 3.1+、Tomcat 8.x、Jetty、Netty、UnderTow 的 servlet 容器 |
必须为将在员工上公开 CRUD 的员工管理系统创建使用 Spring Web Flux 的 REST API。
注意:项目的DAO层是硬编码的。
Spring5 WebFlux 的功能模型是使用 Spring MVC 样式注释的替代方法。在 Spring WebFlux 功能模型中,路由器和处理函数用于创建 MVC 应用程序。HTTP 请求通过 router function 路由(替代 @RequestMapping
) 并通过处理程序函数 处理请求($ 处理程序方法的替代方法)。
每个处理程序函数 都将ServerRequest (org.springframework.web.reactive.function.server.ServerRequest
) 作为参数,结果将返回Mono<ServerResponse>
或 Flux<ServerResponse>
(org.springframework.web.reactive.function.server.ServerResponse
)。
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.webflux</groupId> <artifactId>Demo_Spring_MVC_Web_Flux</artifactId> <version>0.0.1-SNAPSHOT</version> <repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository> <pluginRepository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.M4</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8 </project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8 </project.reporting.outputEncoding> <!-- Configuring Java 8 for the Project --> <java.version>1.8</java.version> </properties> <!--Excluding Embedded tomcat to make use of the Netty Server--> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
EmployeeDAO.java
package com.webflux.dao; import java.util.LinkedHashMap; import java.util.Map; import org.springframework.stereotype.Repository; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import com.webflux.bussiness.bean.Employee; @Repository public class EmployeeDAO { /** * Map is used to Replace the Database * */ static public Map mapOfEmloyeess = new LinkedHashMap(); static int count=10004; static { mapOfEmloyeess.put(10001, new Employee("Jack",10001,12345.6,1001)); mapOfEmloyeess.put(10002, new Employee("Justin",10002,12355.6,1002)); mapOfEmloyeess.put(10003, new Employee("Eric",10003,12445.6,1003)); } /** * Returns all the Existing Employees as Flux * */ public Flux getAllEmployee(){ return Flux.fromStream(mapOfEmloyeess.values().stream()); } /**Get Employee details using EmployeeId . * Returns a Mono response with Data if Employee is Found * Else returns a null * */ public Mono getEmployeeDetailsById(int id){ Monores = null; Employee emp =mapOfEmloyeess.get(id); if(emp!=null){ res=Mono.just(emp); } return res; } /**Create Employee details. * Returns a Mono response with auto-generated Id * */ public Mono addEmployee(Employee employee){ count++; employee.setEmployeeId(count); mapOfEmloyeess.put(count, employee); return Mono.just(count); } /**Update the Employee details, * Receives the Employee Object and returns the updated Details * as Mono * */ public Mono updateEmployee (Employee employee){ mapOfEmloyeess.put(employee.getEmployeeId(), employee); return Mono.just(employee); } /**Delete the Employee details, * Receives the EmployeeID and returns the deleted employee Details * as Mono * */ public Mono removeEmployee (int id){ Employee emp= mapOfEmloyeess.remove(id); return Mono.just(emp); } }
可以观察到 EmployeeDAO
的所有方法都返回 Mono 或 Flux Response,从而使 DAO 层具有反应性。
EmployeeHandler.java
package com.webflux.web.handler; import static org.springframework.web.reactive.function.BodyInserters.fromObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import com.webflux.bussiness.bean.Employee; import com.webflux.dao.EmployeeDAO; @Controller public class EmployeeHandler { @Autowired private EmployeeDAO employeeDAO; /** * Receives a ServerRequest. * Invokes the method getAllEmployee() from EmployeeDAO. * Prepares a Mono and returns the same. * */ public Mono getEmployeeDetails(ServerRequest request) { Flux res=employeeDAO.getAllEmployee(); return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON) .body(res,Employee.class); } /** * Receives a ServerRequest. * Extracts the Path Variable (named id) from the Request. * Invokes the method [getEmployeeDetailsById()] from EmployeeDAO. * Verifies if the object returned in the previous step is null * then returns a Bad request with appropriate message. * Else Returns the Mono with the Employee Data. * */ public Mono getEmployeeDetailByEmployeeId(ServerRequest request) { //Extracts the Path Variable id from the Request int id =Integer.parseInt(request.pathVariable("id")); Mono employee = employeeDAO.getEmployeeDetailsById(id); Mono res= null; if(employee==null){ res=ServerResponse.badRequest().body (fromObject("Please give a valid employee Id")); } else{ //Converting Mono of Mono type to Mono res=employee.flatMap(x->ServerResponse.ok().body(fromObject(x))); } return res; } /** * Receives a ServerRequest. * Makes use of BodyExtractors and Extracts the Employee Data as * Mono from the ServerRequest. * Invokes the method [addEmployee()] of the EmployeeDAO. * Prepares a Mono and returns the same. * */ public Mono addEmployee(ServerRequest request) { Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee.class)); Mono mono= employeeDAO.addEmployee(requestBodyMono.block()); //Converting Mono of Mono type to Mono Mono res= mono.flatMap(x->ServerResponse.ok().body (fromObject("Employee Created with Id"+x))); return res; } /** * Receives a ServerRequest. * Makes use of BodyExtractors and Extracts the Employee Data as * Mono from the ServerRequest. * Finds the Employee and updates the details by invoking updateEmployee() of * EmployeeDAO. * Prepares a Mono and returns the same. * */ public Mono updateEmployee(ServerRequest request) { Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee.class)); Employee employee = requestBodyMono.block(); Mono employeeRet = employeeDAO.getEmployeeDetailsById(employee.getEmployeeId()); Mono res= null; if(employeeRet==null){ res=ServerResponse.badRequest().body(fromObject ("Please Give valid employee details to update")); } else{ Mono emp= employeeDAO.updateEmployee(employee); //Converting Mono of Mono type to Mono res=emp.flatMap(x->ServerResponse.ok().body(fromObject(x))); } return res; } /** * Receives a ServerRequest. * Makes use of BodyExtractors and Extracts the Employee Data as * Mono from the ServerRequest. * Finds the Employee and deletes the details by invoking removeEmployee() of * EmployeeDAO. * Prepares a Mono and returns the same. * */ public Mono deleteEmployee(ServerRequest request) { int myId = Integer.parseInt(request.pathVariable("id")); Mono res= null; if (employeeDAO.getEmployeeDetailsById(myId) == null) { res=ServerResponse.badRequest().body (fromObject("Please Give valid employee details to delete")); }else{ Mono employee = employeeDAO.removeEmployee(myId); //Converting Mono of Mono type to Mono res=employee.flatMap(x->ServerResponse.ok().body(fromObject(x))); } return res; } }
可以观察到,Handler 的所有方法都返回 Mono<ServerResponse>
,从而使 Presentation Layer Reactive。
注意:事件处理程序方法应接受ServerRequest 并应返回Mono<ServerResponse>
RouterConfiguration.java
package com.webflux.web.router.config; import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE; import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RequestPredicates.POST; import static org.springframework.web.reactive.function.server.RequestPredicates.PUT; import static org.springframework.web.reactive.function.server.RequestPredicates.accept; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; import com.webflux.web.handler.EmployeeHandler; @Configuration /** * Router is configuration class. * It links the incoming requests with appropriate HTTP methods to the * respective method of the EmployeeHandler. * Method references are used for the mapping. * */ public class RouterConfiguration{ @Autowired EmployeeHandler employeeHandler; @Bean public RouterFunction monoRouterFunction() { RouterFunctionrouterFunction= RouterFunctions. route(GET("/emp/controller/getDetails"). and(accept(MediaType.APPLICATION_JSON)), employeeHandler::getEmployeeDetails) .andRoute(GET("/emp/controller/getDetailsById/{id}") .and(accept(MediaType.APPLICATION_JSON)), employeeHandler::getEmployeeDetailByEmployeeId) .andRoute(POST("/emp/controller/addEmp") .and(accept(MediaType.APPLICATION_JSON)), employeeHandler::addEmployee) .andRoute(PUT("/emp/controller/updateEmp") .and(accept(MediaType.APPLICATION_JSON)), employeeHandler::updateEmployee) .andRoute(DELETE("/emp/controller/deleteEmp/{id}") .and(accept(MediaType.APPLICATION_JSON)), employeeHandler::deleteEmployee); return routerFunction; } }
ApplicationBootUp.java
package com.webflux; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ApplicationBootUp { public static void main(String[] args) { SpringApplication.run(ApplicationBootUp.class); } }
在 application.properties 中只提到了服务器端口:server.port=8090
。
可以使用命令部署应用程序:clean install spring-boot:run
并使用postman client
进行测试。
标签2: Java教程地址:https://www.cundage.com/article/jcg-reactive-web-applications-using-springwebflux.html