1、Spring除了使用基于HTTP协议的远程调用方案,还未开发者提供了基于RMI机制的远程调用方法,RMI远程调用网络通信实现是基于TCP/IP协议完成的,而不是通过HTTP协议。在Spring RMI实现中,集成了标准的RMI-JRM解决方案,该方案是java虚拟机实现的一部分,它使用java序列化来完成对象的传输,是一个java到java环境的分布式处理技术,不涉及异构平台的处理。
2、RMI客户端配置:
和基于HTTP协议的远程调用类似,RMI远程调用客户端也需要进行类似如下的配置:
<bean id="rmiProxy" class="org.springframework.remoting.rmi.RmiProxyFactoryBean">
<property name="serviceUrl">
<value>rmi://hostAddress:1099/serviceUrl</value>
</property>
<property name="serviceInterface">
<value>远程调用接口</value>
</property>
</bean>
<bean id="rmiClient" class="RMI远程调用客户端类全路径">
<property name="serviceInterface">
<ref bean="rmiProxy"/>
</property>
</bean>
注意:上面的配置中serviceUrl必须和服务端的远程调用提供者的id一致,另外,serviceUrl中使用的是rmi协议,默认端口是1099。
3、RmiProxyFactoryBean:
RmiProxyFactoryBean的主要功能是对RMI客户端封装,生成代理对象,查询得到RMI的stub对象,并通过这个stub对象发起相应的RMI远程调用请求。其源码如下:
public class RmiProxyFactoryBean extends RmiClientInterceptor implements FactoryBean<Object>,BeanClassLoaderAware {
//远程调用代理对象
private Object serviceProxy;
//Spring IoC容器完成依赖注入后的回调方法
public void afterPropertiesSet() {
//调用父类RmiClientInterceptor的回调方法
super.afterPropertiesSet();
//获取客户端配置的远程调用接口
if(getServiceInterface() == null){
throw new IllegalArgumentException("Property 'serviceInterface' is required");
}
//创建远程调用代理对象并为代理对象设置拦截器。注意第二个参数this,因为RmiProxyFactoryBean继承RmiClientInterceptor,因此其也是拦截器
this.serviceProxy = new ProxyFactory(getServiceInterface(),this).getProxy(getBeanClassLoader());
}
//Spring IoC容器获取被管理对象的接口方法,获取远程调用代理
public Object getObject() {
return this.serviceProxy;
}
public Class<?> getObjectType(){
return getServiceInterface();
}
public boolean isSingleton(){
return true;
}
通过对上面RmiProxyFactoryBean源码分析中,我们看到在创建远程调用代理对象的时候需要设置拦截器,因为我们继续分析远程调用客户端拦截器RmiClientInterceptor。
4、RmiClientInterceptor封装RMI客户端:
RmiClientInterceptor对客户端的远程调用进行拦截,具体的生成调用代理对象、查找远程调用stub、以及通过RMI stub向服务端发起远程调用请求的具体实现都由RMI客户端拦截器实现,其源码如下:
public class RmiClientInterceptor extends RemoteInvocationBasedAccessor implements MethodInterceptor {
//在Spring启动时查找远程调用
private boolean lookupStubOnStartup = true;
//对查找到或使用过的远程调用stub进行缓存
private boolean cacheStub = true;
//当连接失败是否刷新远程调用stub
private boolean refreshStubOnConnectFailure = false;
//RMI客户端socket工厂
private RMIClientSocketFactory registryClientSocketFactory;
//缓存的远程调用stub
private Remote cacheStub;
//创建远程调用stub监控器
private final Object stubMonitor = new Object();
//设置是否启动时查找RMI stub
public void setLookupStudOnStartup(boolean lookupStubOnStartup) {
this.lookupStubOnStartup = lookupStubOnStartup;
}
//设置是否缓存以查找的RMI stub
public void setCacheStub(boolean cacheStub) {
this.cacheStub = cacheStub;
}
//设置当连接失败时,是否刷新RMI stub
public void setRefreshStubOnConnectFailure(boolean refreshStubOnConnectFailure) {
this.refreshStubOnConnectFailure = refreshStubOnConnectFailure;
}
//设置客户端socket工厂
public void setRegistryClientSocketFactory(RMIClientSocketFactory registryClientSocketFactory){
this.registryClientSocketFactory = registryClientSocketFactory;
}
//Spring IoC容器回调方法,由子类RmiProxyFactoryBean回调方法调用
public void afterPropertiesSet(){
//调用父类RemoteInvocationBasedAccessor的回调方法
super.afterPropertiesSet();
prepare();
}
//初始化RMI客户端
public void prepare() throws RemoteLookupFailureException {
//如果设置了启动时查找RMI stub
if(this.lookupStubOnStartup){
//查找RMI stub
Remote remoteObj = lookupStub();
if(logger.isDebugEnabled()){
//如果查找到的RMI stub是RmiInvocationHandler类型
if(remoteObj instanceof RmiInvocationHandler) {
logger.debug("RMI stub [" + getServiceUrl() + "] is an RMI invoker");
}
//如果获取到客户端配置的serviceInterface不为null
else if(getServiceInterface()!= null){
//判断客户端配置的serviceInterface是否是RMI stub实例
boolean isImpl = getServiceInterface().isInstance(remoteObj);
logger.debug("Using service interface [" + getServiceInterface().getName() + "] for RMI stub [" + getServiceUrl() + "] - " +
(!isImpl ? "not " : "") + "directly imlemented");
}
}
//如何设置了缓存RMI stub,将缓存的stub设置为查找到的RMI stub
if(this.cacheStub) {
this.cacheStub = remoteObj;
}
}
}
//查找RMI stub
protected Remote lookupStub() throws RemoteLookupFailureException {
try{
Remote stub = null;
//如果设置了客户端socket工厂
if(this.registryClientSocketFactory != null){
//获取并解析客户端配置的serviceUrl
URL url = new URL(null,getServiceUrl(),new DummyURLStreamHandler());
//获取客户端配置的serviceUrl协议
String protocol = url.getProtocol();
//如果客户端配置的serviceUrl中协议不为null且不是rmi
if(protocol != null && ! "rmi".equals(protocol)){
throw new MalformedURLException("Invalid URL schem '" + protocol + "'");
}
//获取客户端配置的serviceUrl中的主机地址
String host = url.getHost();
//获取客户端配置的serviceUrl中的端口
int port = url.getPort();
//获取客户端配置的serviceUrl中请求路径
String name = url.getPath();
//如果请求路径不为null,且请求路径以“/”开头,则取得“/”
if(name!=null && name.startsWith("/")){
name = name.substring(1);
}
//根据客户端配置的serviceUrl信息和客户端socket工厂创建远程对象引用
Registry registry = LocateRegistry.getRegistry(host,port,this.registryClientSocketFactory);
//通过远程对象引用查找指定RMI请求的RMI stub
stub = registry.lookup(name);
}
//如果客户端配置的serviceUrl中协议为null或者是rmi
else {
//直接通过RMI标准API查找客户端配置的serviceUrl的RMI stub
stub = Naming.lookup(getServiceUrl());
}
if(logger.isDebugEnabled()){
logger.debug("Located RMI stub with URL [" + getServiceUrl() + "]");
}
return stub;
}
//对查找RMI stub过程中异常处理
catch(MalformedURLException ex){
throw new RemoteLookupFailureException("Service URL [" + getServiceUrl() + "] is invalid",ex);
}
catch(NotBoundException ex){
throw new RemoteLookupFailureException("Could not find RMI service [" + getServiceUrl() + "] in RMI registry",ex);
}
catch(RemoteException ex){
throw new RemoteLookupFailureException("Lookup of RMI stub failed",ex);
}
}
//获取RMI stub
protected Remote getStub() throws RemoteLookupFailureException {
//如果没有配置缓存RMI stub,或者设置了启动时查找RMI sub或当连接失败时不刷新RMI stub
if(!this.cacheStub || (this.lookupStubOnStartup && !this.refreshStubOnConnectFailure)){
//如果缓存的RMI stub不为null,则直接返回,否则,查找RMI stub
return (this.cacheStub != null ? this.cachedStub : lookupStub());
}
//如果设置了缓存RMI stub,且设置了启动时查找RMI stub或者当连接失败时刷新RMI stub
else {
//线程同步
synchronized(this.stubMonitor){
//如果缓存的RMI stub为null
if(this.cachedStub == null){
//则将查找的RMI stub缓存
this.cachedStub = lookupStub();
}
//返回缓存的RMI stub
return this.cachedStub;
}
}
}
//拦截器对客户端远程调用方法的拦截入口
public Object invoke(MethodInvocation invocation) throws Throwable {
//获取RMI stub
Remote stub = getStub();
try{
//拦截客户端远程调用方法
return doInvoke(invocation,stub);
}
catch(RemoteConnectFailureException ex){
return handleRemoteConnectFailure(invocation,ex);
}
catch(RemoteException ex){
if(isConnectFailure(ex)){
return handleRemoteConnectFailure(invocation,ex);
}
else{
throw ex;
}
}
}
//判断是否连接失败
protected boolean isConnectFailure(RemoteException ex){
return RmiClientInterceptorUtils.isConnectFailure(ex);
}
//处理远程连接失败
private Object handleRemoteConnectFailure(MethodInvocation invocation,Exception ex) throws Throwable {
//如果设置了当连接失败时,刷新RMI stub
if(this.refreshStubOnConnectFailure){
String msg = "Could not connect to RMI service [" + getServiceUrl() + "] - retrying";
if(logger.isDebugEnabled()){
logger.warn(msg.ex);
}
else if(logger.isWarnEnabled()){
logger.warn(msg);
}
//刷新查找远程调用stub
return refreshAndRetry(invocation);
}
else {
throw ex;
}
}
//刷新RMI stub
protected Object refreshAndRetry(MethodInvocation invocation) throws Throwable {
Remote freshStub = null;
//线程同步
synchronized(this.stubMonitor){
this.cacheStub = null;
//查找RMI stub
freshStub = lookupStub();
//如果设置了缓存RMI stub
if(this.cacheStub) {
//将刷新查找的RMI stub缓存
this.cacheStub = freshStub;
}
}
return doInvoke(invocation,freshStub);
}
//具体RMI调用的地方
protected Object doInvoke(MethodInvocation invocation,Remote stub) throws Throwable {
//如果RMI stub是RmiInvocationHandler类型
if(stub instanceof RmiInvocationHandler) {
//调用RmiInvocationHandler的RMI
try{
return doInvoke(invocation,(RmiInvocationHandler)stub);
}
catch(RemoteException ex){
throw RmiClientInterceptorUtils.convertRmiAccessException(invocation.getMethod(),ex,isConnectFailure(ex),getServiceUrl());
}
catch(InvocationTargetException ex){
Throwable exToThrow = ex.getTargetException();
RemoteInvocationUtils.fillInClientStackTraceIfPossible(exToThrow);
throw exToThrow;
}
catch(Throwable ex){
throw new RemoteInvocationFailureException("Invocation of method [" +invocaiton.getMethod() +
"] failed in RMI service [" + getServiceUrl() + "]" ,ex);
}
}
//如果RMI stub不是RmiInvocationHandler类型
else {
//使用传统的RMI调用方式
try{
return RmiClientInterceptorUtils.invokeRemoteMethod(invocation,stub);
}
catch(InvocationTargetException ex) {
Throwable targetEx = ex.getTargetException();
if(targetEx instanceof RemoteException) {
RemoteException rex = (RemoteException) targetEx;
throw RmiClientInterceptorUtils.convertRmiAcccessException(invocation.getMethod(),rex,isConnectionFailure(rex),getServiceUrl());
}
else{
throw targetEx;
}
}
}
}
//调用RmiInvocationHandler的RMI
protected Object doInvoke(MethodInvocation methodInvocation,RmiInvocationHandler invocationHandler) throws RemoteException,
NoSuchMethodException,IllegalAccessException,InvocationTargetException {
//如果客户端远程调用请求时toString()方法
if(AopUtils.isToStringMethod(methodInvocation.getMethod())){
return "RMI invoker proxy for service URL [" + getServiceUrl() + "]";
}
}
通过上面对RmiClientInterceptor源码分析,我们看到Spring对RMI远程调用使用以下两种方式:
(1)RMI调用器方式:
这种方式和Spring HTTP调用器非常类似,即使用RemoteInvocation来封装调用目标对象、目标方法、参数类型等信息,RMI服务器端接收到RMI请求之后直接调用目标对象的匹配方法。
(2)传统RMI调用方式:
使用JDK的反射机制,直接调用远程调用stub的方法。
5、RMI的服务端配置:
在Spring RMI服务端需要进行类似如下的配置:
<bean id="rmiService" class="org.springframework.remoting.rmi.RmiServiceExporter">
<property name="service">
<ref bean="RMI服务端对象"/>
</property>
<property name="serviceInterface">
<value>RMI服务接口</value>
</property>
<property name="serviceName">
<value>RMI服务导出名称</value>
</property>
<property name="registerPort">
<value>1099</value>
</property>
</bean>
RMI中,基于TCP/IP协议,而不是HTTP协议来实现底层网络通信,由于RMI的网络通信已经由Java RMI实现,所以这里不再使用Spring MVC的DispatcherServlet来转发客户端配置的远程调用请求URL,只需要指定RMI的TCP/IP监听端口和服务导出的名称即可。
6、RmiServiceExporter导出RMI远程调用对象:
RmiServiceExporter主要功能是将服务端远程对象提供的服务导出供客户端请求调用,同时将导出的远程对象和注册器绑定起来供客户端查询,其主要源码如下:
public class RmiServiceExporter extends RmiBasedExporter implements InitializingBean,DisposableBean {
//导出的RMI服务名称
private String serviceName;
//RMI服务端口
private int servicePort = 0;
//RMI客户端socket工厂
private RMIClientSocketFactory clientSocketFactory;
//RMI服务炖socket工厂
private RMIServerSocketFactory serverSocketFactory;
//注册器
private Registry registry;
//注册主机
private String registryHost;
//注册端口
private int registryPort = Regstry.REGISTRY_PORT;
//客户端注册socket工厂
private RMIClientSocketFactory registryClientSocketFactory;
//服务端注册socket工厂
private RMIServerSocketFactory registryServerSocketFactory;
//总是注册时注册
private boolean alwaysCreateRegistry = false;
//替换已有的绑定
private boolean replaceExistingBinding = true;
//导出的远程对象
private Remote exportedObject;
//创建注册
private boolean createRegistry = false;
//注入服务端配置的RMI导出服务名称,格式为:"rmi://host:post/NAME”
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
//设置服务端口
public void setServicePort(int servicePort){
this.servicePort = servicePort;
}
//设置RMI客户端socket工厂
public void setClientSocketFactory(RMIClientSocketFactory clientSocketFactory) {
this.clientSocketFactory = clientSocketFactory;
}
//设置RMI服务端socket工厂
public void setServerSocketFactory(RMIServerSocketFactory serverSocketFactory){
this.serverSocketFactory = serverSocketFactory;
}
//设置RMI注册器
public void setRegistry(Registry registry){
this.registry = registry;
}
//设置RMI注册主机
public void setRegistryHost(String registryHost) {
this.registryHost = registryHost;
}
//设置RMI注册端口
public void setRegistryPort(int registryPort) {
this.registryPort = registryPort;
}
//设置用于注册的RMI客户端socket工厂
public void setRegistryClientSocketFactory(RMIClientSocketFactory registryClientSocketFactory) {
this.registryClientSocketFactory = registryClientSocketFactory;
}
//设置用于注册的RMI服务端socket工厂
public void setRegistryServerSocketFactory(RMIServerSocketFactory registryServerSocketFactory) {
this.registryServerSocketFactory = registryServerSocketFactory;
}
//设置是否总是创建注册,而不是试图查找指定端口上已有的注册
public void setAlwaysCreateRegistry(boolean alwaysCreateRegistry) {
this.alwaysCreateRegistry = alwaysCreateRegistry;
}
//设置是否提供已绑定的RMI注册
public void setReplaceExistingBinding(boolean replaceExistingBinding) {
this.replaceExistingBinding = replaceExistingBinding;
}
//IoC容器依赖注入完成之后的回调方法
public void afterPropertiesSet() throws RemoteException {
prepare();
}
//初始化RMI服务导出器
public void prepare() throws RemoteException {
//调用其父类RmiBasedExporter的方法,检查服务引用是否被设置
checkService();
//如果服务导出名称为null
if(this.serviceName == null) {
throw new IllegalArgumentException("Propertry 'serviceName' is required");
}
//检查socket工厂
if(this.clientSocketFactory instanceof RMIServerSocketFactory) {
this.serverSocketFactory = (RMIServerSocketFactory) this.clientSocketFactory;
}
if((this.clientSocketFactory != null && this.serverSocketFactory == null ) || (this.clientSocketFactory == null && this.serverSocketFactory != null)){
throw new IllegalArgumentException("Both RMIClientSocketFactory and RMIServerSocketFactory or more required");
}
//检查RMI注册的socket工厂
if(this.registryClientSocketFactory instanceof RMIServerSocketFactory) {
this.registryServerSocketFactory = (RMIServerSocketFactory) this.registryClientSocketFactory;
}
if(this.registryClientSocketFactory == null && this.regitryServerSocketFactory != null) {
throw new IllegalArgumentException("RMIServerSocketFactory without RMIClientSocketFactory for registry not supported");
}
this.createdRegistry = false;
//获取RMI注册器
if(this.registry == null) {
this.registry = getRegistry(this.registryHost,this.registryPort, this.registryClientSocketFactory,this.registryServerSocketFactory);
this.createRegistry = true;
}
//获取要被导出的服务端远程对象
this.exportedObject = getObjectToExport();
if(logger.isInfoEnabled()){
logger.info("Binding service '" + this.serviceName + " ' to RMI registry: " + this.registry);
}
//导出远程服务对象
if(this.clientSocketFactory != null) {
UnicastRemoteObject.exportObject(this.exportedObject,this.servicePort,this.clientSocketFactory,this.serverSocketFactory);
}
else {
UnicastRemoteObject.exportObject(this.exportedObject,this.servicePort);
}
//将RMI对象绑定到注册器
try{
if(this.replaceExistingBinding) {
this.registry.rebind(this.serviceName,this.exportedObject);
}
else{
this.registry.bind(this.serviceName,this.exportedObject);
}
}
//异常处理
catch(AlreadyBoundException ex){
unexportObjectSilently();
throw new IllegalStateException("Already an RMI object bound for name '" + this.serviceName + "' : " + ex.toString());
}
catch(RemoteException ex){
unexportObjectSilently();
throw ex;
}
}
.....
}
7、RemoteInvocationBasedExporter处理RMI远程调用请求:
RmiServiceExporter的父类RmiBasedExporter的父类RemoteInvocationBasedExporter负责对RMI远程调用请求进行处理,并将处理的结果封装返回,其源码如下:
public abstract class RemoteInvocationBasedExporter extends RemoteExporter {
//RMI远程调用处理器,RMI远程调用请求是由DefaultRemoteInvocationExecutor处理
private RemoteInvocationExecutor remoteInvocationExecutor = new DefaultRemoteInvocationExecutor();
//设置RMI远程调用处理器
public void setRemoteInvocationExecutor(RemoteInvocationExecutor remoteInvocationExecutor) {
this.remoteInvocationExecutor = remoteInvocationExecutor;
}
//获取RMI远程调用处理器
public RemoteInvocationExecutor getRemoteInvocationExecutor(){
return this.remoteInvocationExecutor;
}
//对RMI远程调用请求进行处理的地方
protected Object invoke(RemoteInvocation invocation,Object targetObject) throws NoSuchMethodException,IllegalAccessException,InvocationTargetException {
if(logger.isTraceEnabled()){
logger.trace("Executing " + invocation);
}
try{
//调用DefaultRemoteInvocationExecutor对RMI远程调用请求进行处理
return getRemoteInvocationExecutor().invoke(invocation,targetObject);
}
catch(NoSuchMethodException ex){
if(logger.isDebugEnabled()){
logger.warn("Could not find target method for " + invocation,ex);
}
throw ex;
}
catch(IllegalAccessException ex){
if(logger.isDebugEnabled()){
logger.warn("Could not access target method for " + invocation,ex);
}
throw ex;
}
catch(InvocationTargetException ex){
if(logger.isDebugEnabled()){
logger.debug("Target method failed for " + invocation,ex.getTargetException());
}
throw ex;
}
}
//获取RMI远程调用请求的处理结果
protected RemoteInvocationResult invokeAndCreateResult(RemoteInvocation invocation, Object targetObject) {
try{
Object value = invoke(invocation,targetObject);
return new RemoteInvocationResult(value);
}
catch(Throwable ex){
return new RemoteInvocationResult(ex);
}
}
}
RMI远程调用请求最终由DefaultRemoteInvocationExecutor处理。
8、DefaultRemoteInvocationExecutor用于处理RMI远程调用请求,并返回处理后的结果,其源码如下:
public class DefaultRemoteInvocationExecutor implements RemoteInvocationExecutor {
//处理RMI远程调用请求
public Object invoke(RemoteInvocation invocation,Object targetObject) throws NoSuchMethodException,IllegalAccessException,InvocationTargetException {
Assert.notNull(invocation,"RemoteInvocation must not be null");
Asser.notNull(targetObject,"Target object must not be null");
//调用RemoteInvocation处理RMI远程调用请求
return invocation.invoke(targetObject);
}
}
9、RemoteInvocation处理RMI远程调用请求:
RemoteInvocation的invoke方法处理RMI远程调用请求,并返回远程调用处理后的结果,其源码如下:
public Object invoke(Object targetObject) throws NoSuchMethodException,IllegalAccessException,InvocationTargetException {
//使用JDK反射机制获取远程调用服务端目标对象的方法
Method method = targetObject.getClass().getMethod(this.methodName,this.parameterTypes);
//使用JDK反射机制调用目标对象的方法
return method.invoke(targetObject,this.arguments);
}