接口方法与MethodHandler映射map的生成机制 在开始之前,先说一下FeignClient接口的上的SpringMVC注解是如何被解析的,回顾到之前生成动态代理的时候,有个nameToHandler的map,有一句非常关键的代码。
1 Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
这里面就完成了SpringMVCContract对方法的解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public Map<String, MethodHandler> apply (Target key) { List<MethodMetadata> metadata = contract.parseAndValidatateMetadata(key.type()); Map<String, MethodHandler> result = new LinkedHashMap<String, MethodHandler>(); for (MethodMetadata md : metadata) { BuildTemplateByResolvingArgs buildTemplate; if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null ) { buildTemplate = new BuildFormEncodedTemplateFromArgs(md, encoder); } else if (md.bodyIndex() != null ) { buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder); } else { buildTemplate = new BuildTemplateByResolvingArgs(md); } result.put(md.configKey(), factory.create(key, md, buildTemplate, options, decoder, errorDecoder)); } return result; }
画个图
Feign请求处理大体流程
请求流程分析 动态代理拦截Client所有的方法调用 在动态代理生成以后,动态代理所有的调用都会被FeignInvocationHandler拦截,所以我们分析实际的请求流程,需要去查看invoke方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if ("equals" .equals(method.getName())) { try { Object otherHandler = args.length > 0 && args[0 ] != null ? Proxy.getInvocationHandler(args[0 ]) : null ; return equals(otherHandler); } catch (IllegalArgumentException e) { return false ; } } else if ("hashCode" .equals(method.getName())) { return hashCode(); } else if ("toString" .equals(method.getName())) { return toString(); } return dispatch.get(method).invoke(args); }
将方法上的请求参数封装到RequestTemplate里 Map<Method, MethodHandler> dispatch;维护了方法对象和SynchronousMethodHandler的映射,所以流程到这里,直接跟到SynchronousMethodHandler利的invoke方法去看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public Object invoke (Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); Retryer retryer = this .retryer.clone(); while (true ) { try { return executeAndDecode(template); } catch (RetryableException e) { retryer.continueOrPropagate(e); if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue ; } } }
将拿到的参数拼到url上,例如 /user/{id}拼接成, /user/1?name=xxx&age=19
执行所有的RequestInterceptor 然后开始调用executeAndDecode方法,执行http调用的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Object executeAndDecode (RequestTemplate template) throws Throwable { Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { response = client.execute(request, options); response.toBuilder().request(request).build(); .... } Request targetRequest (RequestTemplate template) { for (RequestInterceptor interceptor : requestInterceptors) { interceptor.apply(template); } return target.apply(new RequestTemplate(template)); }
遍历请求拦截器,将每个请求拦截都应用到RequestTemplate模板上去,也就是让每个请求拦截器对请求进行处理
基于RequestTemplate创建一个Request对象,用于发送请求
将请求的参数都传入了进去,连接超时时间都是10s,读超时时间是60s。基于LoadBalancerFeignClient进行了请求的处理和发送,同时获取了Response。
获得负载均衡器选择服务发起请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public Response execute (Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this .delegate, request, uriWithoutHost); IClientConfig requestConfig = getClientConfig(options, clientName); return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null ) { throw io; } throw new RuntimeException(e); } }
获取请求的服务名称,也就是ServiceA
从请求URL中剔除了服务名称
基于去除了服务名称的地址创建了一个RibbonRequest
读取某个服务ribbon的配置IClientConfig
创建FeignLoadBalancer,封装了ribbon的ILoadBalancer(重点)
Feign是如何与Ribbon进行整合的 上面已经创建了FeignLoadBalancer,他内部封装了Ribbon的ILoadBalancer,所以要重点分析下他究竟是如何与Ribbon进行整合的,用的是Ribbon的哪一个ILoadBalancer。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public FeignLoadBalancer create (String clientName) { if (this .cache.containsKey(clientName)) { return this .cache.get(clientName); } IClientConfig config = this .factory.getClientConfig(clientName); ILoadBalancer lb = this .factory.getLoadBalancer(clientName); ServerIntrospector serverIntrospector = this .factory.getInstance(clientName, ServerIntrospector.class); FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector, loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector); this .cache.put(clientName, client); return client; }
从SpringClientFactory获取ILoadBalancer等组件,SpringClientFactory就是Ribbon初始化的时候创建的,所以这里获取到的是ZoneAwareLoadBalancer,这里也就自然的与Eureka完成了整合。
FeignLoadBalancer如何负载均衡选择Server 进入到executeWithLoadBalancer方法中构造了一个LoadBalancerCommand,然后下面的submit方法,有一个匿名内部类ServerOperation的的实现传进去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public T executeWithLoadBalancer (final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call (Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this .execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); .....
提交了一个匿名内部类作为参数,那么ServerOperation.call方法就一定会在submit方法里被调用,跟到submit方法里去看下,因为是第一次进入,所以server肯定是null,selectServer()方法,看名字明显就是调用负载均衡选择服务实例的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public Observable<T> submit (final ServerOperation<T> operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null ) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); Observable<T> o = (server == null ? selectServer() : Observable.just(server)) ......省略部分代码 return operation.call(server)... ......胜率部分代码 private Observable<Server> selectServer () { return Observable.create(new OnSubscribe<Server>() { @Override public void call (Subscriber<? super Server> next) { try { Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); }
构造了一个LoadBalancerCommand
构造了一个ServerOperation,包含了发起http调用的逻辑,作为参数传入LoadBalancerCommand.submit方法,后面会进行回调
在submit方法中,会调用selectServer方法,选择服务实例
selectServer方法调用loadBalancerContext.getServerFromLoadBalancer,最终调用负载均衡器chooseServer 方法选择一个服务实例,
拿到服务实例后,将Server对象传入ServerOperation的call方法进行回调
ServerOperation用server的信息替换host里的服务名,拿到真正的请求地址
再调用子类也就是FeignLoadBalancer.execute方法执行http请求
默认的connectTimeout和readTimeout都是1000毫秒
响应结果封装为RibbonResponse
收到响应后将json串转换成对象 回到最初的SynchronousMethodHandler方法里,在executeAndDecode方法中,response = client.execute(request, options);
在拿到RibbonResponse以后,开始进行对响应的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 Object executeAndDecode (RequestTemplate template) throws Throwable { Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { response = client.execute(request, options); response.toBuilder().request(request).build(); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); boolean shouldClose = true ; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); response.toBuilder().request(request).build(); } if (Response.class == metadata.returnType()) { if (response.body() == null ) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false ; return response; } byte [] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300 ) { if (void .class == metadata.returnType()) { return null ; } else { return decode(response); } } else if (decode404 && response.status() == 404 && void .class != metadata.returnType()) { return decode(response); } else { throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } } }
拿到响应以后,执行decode方法,这个decoder默认是ResponseEntityDecoder,将json字符串转换成java对象,也就是方法的返回类型,metadata.returnType()。
1 2 3 4 5 6 7 8 9 Object decode (Response response) throws Throwable { try { return decoder.decode(response, metadata.returnType()); } catch (FeignException e) { throw e; } catch (RuntimeException e) { throw new DecodeException(e.getMessage(), e); } }
画个图总结