前言
Dubbo 框架采用分层设计,自上而下共分为十层。最底下的 Serialize 层关心的是如何序列化对象、往上的 Transporte 层关心的是如何把数据传输到远程、再往上的 Exchange 层关心的则是如何实现 请求-应答 消息交换模式、再往上就是 Protocol 层,它关心的是如何封装 RPC 调用,屏蔽底层细节。
理解Protocol
协议可以看作是通信双方都要遵守的约定和规则。
为什么需要协议???
网络只能传输字节序列,在发送方和接收方看来报文只是一段二进制数据,如何解读这段二进制数据里每个字节甚至每个比特位的含义,就是协议要规定的。只有通信双方使用相同的协议,一次远程通信才能正确进行。反之,一方使用A协议、一方使用B协议,就会出现“鸡同鸭讲”的场景,谁也不知道对方在说什么。
Dubbo 抽象出了协议层,通过核心接口 Protocol 来暴露服务和引用服务。暴露服务用来处理 RPC 请求,引用服务用来发起 RPC 调用。不管是服务提供方还是消费方,都必须遵守相同的协议规则,才能保证一次 RPC 调用的正确性。
设计实现
协议层接口定义在dubbo-rpc-api
模块,核心 SPI 接口是 Protocol。以 dubbo 协议为例,整体流程如下所示,中间会有一个 DubboCodec 负责消息的编解码。
Protocol
Protocol 定义了服务暴露和引用的抽象,export()
用来暴露服务,refer()
用来引用服务。
@SPI("dubbo")
public interface Protocol {
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
}
对于export()
来说,入参 Invoker 是框架传过来的对于本地 Service 实现的一个封装,调用其Invoker#invoke
就会触发本地 Service 实现逻辑。
暴露服务是为了提供服务,所以export()
一般都会绑定一个本地的端口,用来处理 tcp 连接和请求。以 DubboProtocol 为例,默认会通过 Netty 开启服务。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//服务唯一标识
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 开启服务
openServer(url);
// 优化序列化效率
optimizeSerialization(url);
return exporter;
}
对于refer()
来说,引用远程服务会得到一个 Invoker,它屏蔽了 RPC 调用的细节,调用其Invoker#invoke
就会发起一次 RPC 调用。因为io传输是异步的,业务又需要同步调用,所以 Dubbo 会在外面套一层异步转同步的实现。
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// io传输是异步,异步转同步
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
以 DubboProtocol 为例,引用 Invoker 首先要初始化 ExchangeClient 客户端,DubboInvoker 要依赖它发请求。
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 创建DubboInvoker依赖ExchangeClient,发请求
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
Invoker
Invoker 接口是对调用程序的抽象,调用其invokee()
即可发起一次调用。这个调用可以是一次本地方法的调用,也可以是一次 RPC 调用,关键在于实现类。
public interface Invoker<T> extends Node {
Class<T> getInterface();
Result invoke(Invocation invocation) throws RpcException;
}
对于服务提供方来说,Invoker 是对本地 Service 实现的封装,子类均继承 AbstractProxyInvoker,通过 ProxyFactory 创建。
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
// 被代理的对象 对Provider来说是本地Service实现
private final T proxy;
private final Class<T> type;
private final URL url;
......
}
ProxyFactory 会基于本地 Service 实现对象创建 Invoker 代理对象,也是个 SPI 接口,支持通过 JDK 动态代理和 javassist 技术来创建。
@SPI("javassist")
public interface ProxyFactory {
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
默认是 JavassistProxyFactory,Dubbo 这里做了个小优化,默认反射调用方法效率不高,Dubbo 会生成一个 Wrapper 对象,动态生成包装类,在字节码层面实现直接的方法调用。
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
对于服务消费方来说,Invoker 是对远程 RPC 调用的封装,Dubbo 通过 Invoker 接口帮我们屏蔽了底层实现,消费方的 Invoker 可以通过Protocol#refer
获取。以 DubboInvoker 为例,因为要实现 RPC 调用必然要发送网络请求,所以它需要依赖 ExchangeClient。
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
......
}
RPC 调用其实就是通过 ExchangeClient 把 RpcInvocation 请求发出去
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 轮询客户端
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 是否单向发送,不期望对端响应数据
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 计算超时
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 回调线程池
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 发送请求
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
尾巴
Dubbo Protocol 关心的是:服务端如何暴露本地服务实现 Invoker,客户端如何引用远程服务 Invoker,并且让这两个 Invoker 可以正确通信,完成整个 RPC 调用。如何正确通信,就涉及到双方的协议实现细节,如果双方使用不同的协议,就会出现“鸡同鸭讲”的场景,谁都不知道对方在说什么,更别提完成 RPC 调用了。所以,任何一个 Protocol 实现,都必须同时实现服务的暴露和引用。