Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

你的响应阻塞了没有?--Spring-WebFlux源码分析

一天不进步,就是退步 2019-02-19 11:40:00 阅读数:310 评论数:0 点赞数:0 收藏数:0

1. Spring WebFlux是什么?

Spring WebFlux是Spring Framework 5.0中引入的新的反应式Web框架。 与Spring MVC不同,它不需要Servlet API,完全异步和非阻塞, 并通过Reactor项目实现Reactive Streams规范。 并且可以在诸如Netty,Undertow和Servlet 3.1+容器的服务器上运行。Reactor 也是 Spring 5 中反应式编程的基础,它一个新的反应式编程库。

2. Reactor是什么?

Reactor offers non-blocking and backpressure-ready network runtimes including local TCP/HTTP/UDP client & servers based on the robust Netty framework.

Reactor提供了一个非阻塞的,高并发的基于健壮的Netty框架的网络运行API,包括本地tcp/http/udp 客户端和服务端。

重要的两个概念

Flux 和 Mono 是 Reactor 中的两个基本概念。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

简单说Mono返回单个元素,Flux返回多个元素

3. spring webflux处理请求流程

核心控制器DispatcherHandler,等同于阻塞方式的DispatcherServlet//// Central dispatcher for HTTP request handlers/controllers. Dispatches to / registered handlers for processing a request, providing convenient mapping / facilities. / /

{@codeDispatcherHandler} discovers the delegate components it needs from / Spring configuration. It detects the following in the application context: /

    /
  • {@linkHandlerMapping} -- map requests to handler objects /
  • {@linkHandlerAdapter} -- for using any handler interface /
  • {@linkHandlerResultHandler} -- process handler return values /
/ /

{@codeDispatcherHandler} is also designed to be a Spring bean itself and / implements {@linkApplicationContextAware} for access to the context it runs / in. If {@codeDispatcherHandler} is declared with the bean name "webHandler" / it is discovered by {@linkWebHttpHandlerBuilder/#applicationContext} which / creates a processing chain together with {@codeWebFilter}, / {@codeWebExceptionHandler} and others. / /

A {@codeDispatcherHandler} bean declaration is included in / {@linkorg.springframework.web.reactive.config.EnableWebFlux @EnableWebFlux} / configuration. / /@authorRossen Stoyanchev /@authorSebastien Deleuze /@authorJuergen Hoeller /@since5.0 /@seeWebHttpHandlerBuilder/#applicationContext(ApplicationContext)/*/

3.1 初始化

获取HandlerMapping,HandlerAdapter,HandlerResultHandler的所有实例

 protected voidinitStrategies(ApplicationContext context) { Map mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerMapping.class, true, false); //1 ArrayList mappings = new ArrayList<>(mappingBeans.values()); AnnotationAwareOrderComparator.sort(mappings);this.handlerMappings =Collections.unmodifiableList(mappings); Map adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerAdapter.class, true, false); //2this.handlerAdapters = new ArrayList<>(adapterBeans.values()); AnnotationAwareOrderComparator.sort(this.handlerAdapters); Map beans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerResultHandler.class, true, false); //3this.resultHandlers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(this.resultHandlers); }

其中,1.获取所有HandlerMapping实例

2.获取所有HandlerAdapter实例

3.获取所有HandlerResultHandler实例

3.2 流式处理请求public Monohandle(ServerWebExchange exchange) {if (this.handlerMappings == null) {returncreateNotFoundError(); }return Flux.fromIterable(this.handlerMappings) .concatMap(mapping->mapping.getHandler(exchange))//1 .next() .switchIfEmpty(createNotFoundError())//2 .flatMap(handler->invokeHandler(exchange, handler))//3 .flatMap(result->handleResult(exchange, result));//4 }

其中,第一步,从handlerMapping这个map中获取HandlerMapping

第二步,触发HandlerApter的handle方法private MonoinvokeHandler(ServerWebExchange exchange, Object handler) {if (this.handlerAdapters != null) {for (HandlerAdapter handlerAdapter : this.handlerAdapters) {if(handlerAdapter.supports(handler)) {returnhandlerAdapter.handle(exchange, handler); } } }return Mono.error(new IllegalStateException("No HandlerAdapter: " +handler)); }

第三步,触发HandlerResultHandler 的handleResult方法

private MonohandleResult(ServerWebExchange exchange, HandlerResult result) {returngetResultHandler(result).handleResult(exchange, result) .onErrorResume(ex-> result.applyExceptionHandler(ex).flatMap(exceptionResult ->getResultHandler(exceptionResult).handleResult(exchange, exceptionResult))); }privateHandlerResultHandler getResultHandler(HandlerResult handlerResult) {if (this.resultHandlers != null) {for (HandlerResultHandler resultHandler : this.resultHandlers) {if(resultHandler.supports(handlerResult)) {returnresultHandler; } } }throw new IllegalStateException("No HandlerResultHandler for " +handlerResult.getReturnValue()); }

4.HandlerMapping实现

5.HanlderAdapter的实现

6.HandlerResultHandler的实现

 7.不同容器的实现

 7.1 Reactor实现ReactorHttpHandlerAdapter

执行apply方法@Overridepublic Monoapply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) { NettyDataBufferFactory bufferFactory= newNettyDataBufferFactory(reactorResponse.alloc());try{ ReactorServerHttpRequest request= newReactorServerHttpRequest(reactorRequest, bufferFactory); ServerHttpResponse response= newReactorServerHttpResponse(reactorResponse, bufferFactory);if (request.getMethod() ==HttpMethod.HEAD) { response= newHttpHeadResponseDecorator(response); }return this.httpHandler.handle(request, response) .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage())) .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed")); }catch(URISyntaxException ex) {if(logger.isDebugEnabled()) { logger.debug("Failed to get request URI: " +ex.getMessage()); } reactorResponse.status(HttpResponseStatus.BAD_REQUEST);returnMono.empty(); } }

其中,HttpHandler的定义

// / Lowest level contract forreactive HTTP request handling that serves as a/common denominator across different runtimes./ /

Higher-level, but still generic, building blocks forapplications such as/{@code WebFilter}, {@code WebSession}, {@code ServerWebExchange}, and others/ are available in the {@code org.springframework.web.server} package./ /

Application level programming models such as annotated controllers and/ functional handlers are available in the {@code spring-webflux} module./ /

Typically an {@link HttpHandler} represents an entire application with/ higher-level programming models bridged via/{@link org.springframework.web.server.adapter.WebHttpHandlerBuilder}./Multiple applications at unique context paths can be plugged in with the/help of the {@link ContextPathCompositeHandler}./ /@author Arjen Poutsma/@author Rossen Stoyanchev/ @since 5.0 /@see ContextPathCompositeHandler/*/

具体的实现类为:ContextPathCompositeHandler

//// {@codeHttpHandler} delegating requests to one of several {@codeHttpHandler}'s /based on simple, prefix-based mappings. / /

This is intended as a coarse-grained mechanism for delegating requests to / one of several applications -- each represented by an {@codeHttpHandler}, with / the application "context path" (the prefix-based mapping) exposed via / {@linkServerHttpRequest/#getPath()}. / /@authorRossen Stoyanchev /@since5.0/*/@Overridepublic Monohandle(ServerHttpRequest request, ServerHttpResponse response) {//Remove underlying context path first (e.g. Servlet container) String path =request.getPath().pathWithinApplication().value();return this.handlerMap.entrySet().stream() .filter(entry->path.startsWith(entry.getKey())) .findFirst() .map(entry->{ String contextPath= request.getPath().contextPath().value() +entry.getKey(); ServerHttpRequest newRequest=request.mutate().contextPath(contextPath).build();returnentry.getValue().handle(newRequest, response); }) .orElseGet(()->{ response.setStatusCode(HttpStatus.NOT_FOUND);returnresponse.setComplete(); }); }

基于前缀的映射Handler

 7.2 Jetty实现JettyHttpHandlerAdapter

继承自ServletHttpHandlerAdapter 实现了Servlet,执行service方法@Overridepublic void service(ServletRequest request, ServletResponse response) throwsServletException, IOException {//Check for existing error attribute first if(DispatcherType.ASYNC.equals(request.getDispatcherType())) { Throwable ex=(Throwable) request.getAttribute(WRITEERRORATTRIBUTE_NAME);throw new ServletException("Failed to create response content", ex); }//Start async before Read/WriteListener registration AsyncContext asyncContext =request.startAsync(); asyncContext.setTimeout(-1); ServletServerHttpRequest httpRequest;try{ httpRequest=createRequest(((HttpServletRequest) request), asyncContext);//1 }catch(URISyntaxException ex) {if(logger.isDebugEnabled()) { logger.debug("Failed to get request URL: " +ex.getMessage()); } ((HttpServletResponse) response).setStatus(400); asyncContext.complete();return; } ServerHttpResponse httpResponse= createResponse(((HttpServletResponse) response), asyncContext, httpRequest);//2if (httpRequest.getMethod() ==HttpMethod.HEAD) { httpResponse= newHttpHeadResponseDecorator(httpResponse); } AtomicBoolean isCompleted= newAtomicBoolean(); HandlerResultAsyncListener listener= newHandlerResultAsyncListener(isCompleted, httpRequest); asyncContext.addListener(listener); HandlerResultSubscriber subscriber= newHandlerResultSubscriber(asyncContext, isCompleted, httpRequest);this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber);//3 }

其中,1.创建request

2.创建response

            3.handler执行的结果进行subscribe

JettyHttpHandlerAdapter是ServletHttpHandlerAdapter 的扩展,重写了创建request  创建response方法

 7.3 Tomcat实现TomcatHttpHandlerAdapter

TomcatHttpHandlerAdapter是ServletHttpHandlerAdapter 的扩展,重写了创建request  创建response方法

 7.4 AbstractReactiveWebInitializer抽象类

继承自AbstractReactiveWebInitializer的类可以在servlet容器中安装一个Spring Reactive Web Application。@Overridepublic void onStartup(ServletContext servletContext) throwsServletException { String servletName=getServletName(); Assert.hasLength(servletName,"getServletName() must not return null or empty"); ApplicationContext applicationContext=createApplicationContext(); Assert.notNull(applicationContext,"createApplicationContext() must not return null"); refreshApplicationContext(applicationContext); registerCloseListener(servletContext, applicationContext); HttpHandler httpHandler= WebHttpHandlerBuilder.applicationContext(applicationContext).build(); ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(httpHandler); ServletRegistration.Dynamic registration = servletContext.addServlet(servletName, servlet);if (registration == null) {throw new IllegalStateException("Failed to register servlet with name '" + servletName + "'. " + "Check if there is another servlet registered under the same name."); } registration.setLoadOnStartup(1); registration.addMapping(getServletMapping()); registration.setAsyncSupported(true); }

它通过将ServletHttpHandlerAdapter实例作为一个servlet安装到servler容器中。

8.总结

  DispatcherHandler的流程是

1.通过 HandlerMapping(和DispathcherServlet中的HandlerMapping不同)获取到HandlerAdapter放到ServerWebExchange的属性中

2.获取到HandlerAdapter后触发handle方法,得到HandlerResult

3.通过HandlerResult,触发handleResult,针对不同的返回类找到不同的HandlerResultHandler如

视图渲染ViewResolutionResultHandler,ServerResponseResultHandler, ResponseBodyResultHandler, ResponseEntityResultHandler

  不同容器有不同的实现,如Reactor,Jetty,Tomcat等。

参考文献:

【1】https://blog.csdn.net/qq_15144655/article/details/80708915

【2】https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html

版权声明
本文为[一天不进步,就是退步]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/davidwang456/p/10396168.html

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;

支付宝红包,每日可领