柚子快報(bào)邀請(qǐng)碼778899分享:Dubbo服務(wù)消費(fèi)者調(diào)用過(guò)程
柚子快報(bào)邀請(qǐng)碼778899分享:Dubbo服務(wù)消費(fèi)者調(diào)用過(guò)程
同步 需要返回值:使用ExchangeClient.request()方法,返回一個(gè)ResponseFuture,一直阻塞到服務(wù)端返回響應(yīng)結(jié)果;
案例介紹
====
先看一個(gè)簡(jiǎn)單的客戶端引用服務(wù)的例子,HelloService,dubbo配置如下:
使用Zookeeper作為注冊(cè)中心 引用遠(yuǎn)程的HelloService接口服務(wù)
根據(jù)之前的介紹,在Spring啟動(dòng)的時(shí)候,根據(jù)
@Autowired
private HelloService helloService;
使用的不是ReferenceBean對(duì)象,而是ReferenceBean的getObject()方法返回的對(duì)象,該對(duì)象通過(guò)代理實(shí)現(xiàn)了HelloService接口,所以要看服務(wù)引用的整個(gè)過(guò)程就需要從ReferenceBean.getObject()方法開(kāi)始入手。
服務(wù)引用過(guò)程
將ReferenceConfig.init()中的內(nèi)容拆成具體的步驟,如下:
第一步:收集配置參數(shù)
methods=hello,
timestamp=1443695417847,
dubbo=2.5.3
application=consumer-of-helloService
side=consumer
pid=7748
interface=com.demo.dubbo.service.HelloService
第二步:從注冊(cè)中心獲取服務(wù)地址,返回Invoker對(duì)象
如果是單個(gè)注冊(cè)中心,代碼如下:
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
invoker = refprotocol.refer(interfaceClass, url);
上述url的內(nèi)容如下:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
application=consumer-of-helloService&
dubbo=2.5.6&
pid=8292&
registry=zookeeper&
timestamp=1443707173909&
refer=
application=consumer-of-helloService&
dubbo=2.5.6&
interface=com.demo.dubbo.service.HelloService&
methods=hello&
pid=8292&
side=consumer&
timestamp=1443707173884&
第三步:使用ProxyFactory創(chuàng)建出Invoker的代理對(duì)象
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
proxyFactory.getProxy(invoker);
下面就詳細(xì)說(shuō)明下上述提到的幾個(gè)概念:Protocol、Invoker、ProxyFactory
概念介紹
Invoker
Invoker是一個(gè)可執(zhí)行對(duì)象,有三種類型的Invoker:
本地執(zhí)行的Invoker(服務(wù)端使用) 遠(yuǎn)程通信執(zhí)行的Invoker(客戶端使用) 多個(gè)類型2的Invoker聚合成的集群版Invoker(客戶端使用)
Invoker的實(shí)現(xiàn)情況如下:
先來(lái)看服務(wù)引用的第2個(gè)步驟,返回Invoker對(duì)象
對(duì)于客戶端來(lái)說(shuō),Invoker應(yīng)該是2、3這兩種類型。先來(lái)說(shuō)第2種類型,即遠(yuǎn)程通信的Invoker,看DubboInvoker的源碼,調(diào)用過(guò)程AbstractInvoker.invoke()->doInvoke():
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
大致內(nèi)容就是:
將通過(guò)遠(yuǎn)程通信將Invocation信息傳遞給服務(wù)器端,服務(wù)器端接收到該Invocation信息后,找到對(duì)應(yīng)的本地Invoker,然后通過(guò)反射執(zhí)行相應(yīng)的方法,將方法的返回值再通過(guò)遠(yuǎn)程通信將結(jié)果傳遞給客戶端。
這里分3種情況:
執(zhí)行方法不需要返回值:直接調(diào)用ExchangeClient.send()方法。 執(zhí)行方法的結(jié)果需要異步返回:使用ExchangeClient.request()方法返回一個(gè)ResponseFuture對(duì)象,通過(guò)RpcContext中的ThreadLocal使ResponseFuture和當(dāng)前線程綁定,未等服務(wù)端響應(yīng)結(jié)果就直接返回,然后服務(wù)端通過(guò)ProtocolFilterWrapper.buildInvokerChain()方法會(huì)調(diào)用Filter.invoke()方法,即FutureFilter.invoker()->asyncCallback(),會(huì)獲取RpcContext的ResponseFuture對(duì)象,異步返回結(jié)果。 執(zhí)行方法的結(jié)果需要同步返回:使用ExchangeClient.request()方法,返回一個(gè)ResponseFuture,一直阻塞到服務(wù)端返回響應(yīng)結(jié)果
Protocol 使ResponseFuture和當(dāng)前線程綁定,未等服務(wù)端響應(yīng)結(jié)果就直接返回,然后服務(wù)端通過(guò)ProtocolFilterWrapper.buildInvokerChain()方法會(huì)調(diào)用Filter.invoke()方法,即FutureFilter.invoker()->asyncCallback(),會(huì)獲取RpcContext的ResponseFuture對(duì)象,異步返回結(jié)果。
執(zhí)行方法的結(jié)果需要同步返回:使用ExchangeClient.request()方法,返回一個(gè)ResponseFuture,一直阻塞到服務(wù)端返回響應(yīng)結(jié)果
Protocol
柚子快報(bào)邀請(qǐng)碼778899分享:Dubbo服務(wù)消費(fèi)者調(diào)用過(guò)程
推薦文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。