spring技术内幕19-Spring RMI实现远程调用

1、Spring除了使用基于HTTP协议的远程调用方案,还未开发者提供了基于RMI机制的远程调用方法,RMI远程调用网络通信实现是基于TCP/IP协议完成的,而不是通过HTTP协议。在Spring RMI实现中,集成了标准的RMI-JRM解决方案,该方案是java虚拟机实现的一部分,它使用java序列化来完成对象的传输,是一个java到java环境的分布式处理技术,不涉及异构平台的处理。

2、RMI客户端配置:

和基于HTTP协议的远程调用类似,RMI远程调用客户端也需要进行类似如下的配置:

<bean id="rmiProxy" class="org.springframework.remoting.rmi.RmiProxyFactoryBean">

  <property name="serviceUrl">

          <value>rmi://hostAddress:1099/serviceUrl</value>

 </property>

<property name="serviceInterface">

    <value>远程调用接口</value>

</property>

</bean>

<bean id="rmiClient" class="RMI远程调用客户端类全路径">

    <property name="serviceInterface">

            <ref bean="rmiProxy"/>

    </property>

</bean>

注意:上面的配置中serviceUrl必须和服务端的远程调用提供者的id一致,另外,serviceUrl中使用的是rmi协议,默认端口是1099。

3、RmiProxyFactoryBean:

RmiProxyFactoryBean的主要功能是对RMI客户端封装,生成代理对象,查询得到RMI的stub对象,并通过这个stub对象发起相应的RMI远程调用请求。其源码如下:

public class RmiProxyFactoryBean extends RmiClientInterceptor implements FactoryBean<Object>,BeanClassLoaderAware {

       //远程调用代理对象

       private Object serviceProxy;

     //Spring IoC容器完成依赖注入后的回调方法

     public void afterPropertiesSet() {

           //调用父类RmiClientInterceptor的回调方法

           super.afterPropertiesSet();

          //获取客户端配置的远程调用接口

          if(getServiceInterface() == null){

               throw new IllegalArgumentException("Property 'serviceInterface' is required");

          }

         //创建远程调用代理对象并为代理对象设置拦截器。注意第二个参数this,因为RmiProxyFactoryBean继承RmiClientInterceptor,因此其也是拦截器

           this.serviceProxy = new ProxyFactory(getServiceInterface(),this).getProxy(getBeanClassLoader());

    }

    //Spring IoC容器获取被管理对象的接口方法,获取远程调用代理

    public Object getObject() {

           return this.serviceProxy;

    }

    public Class<?> getObjectType(){

          return getServiceInterface();

     }

    public boolean isSingleton(){

        return true;

   }

通过对上面RmiProxyFactoryBean源码分析中,我们看到在创建远程调用代理对象的时候需要设置拦截器,因为我们继续分析远程调用客户端拦截器RmiClientInterceptor。

4、RmiClientInterceptor封装RMI客户端:

RmiClientInterceptor对客户端的远程调用进行拦截,具体的生成调用代理对象、查找远程调用stub、以及通过RMI stub向服务端发起远程调用请求的具体实现都由RMI客户端拦截器实现,其源码如下:

public class RmiClientInterceptor extends RemoteInvocationBasedAccessor implements MethodInterceptor {

    //在Spring启动时查找远程调用

     private boolean lookupStubOnStartup = true;

    //对查找到或使用过的远程调用stub进行缓存

     private boolean cacheStub = true;

     //当连接失败是否刷新远程调用stub

     private boolean refreshStubOnConnectFailure = false;

     //RMI客户端socket工厂

      private RMIClientSocketFactory registryClientSocketFactory;

    //缓存的远程调用stub

      private Remote cacheStub;

    //创建远程调用stub监控器

     private final Object stubMonitor = new Object();

    //设置是否启动时查找RMI stub

      public void setLookupStudOnStartup(boolean lookupStubOnStartup) {

           this.lookupStubOnStartup = lookupStubOnStartup;

      }

      //设置是否缓存以查找的RMI stub

       public void setCacheStub(boolean cacheStub) {

                this.cacheStub = cacheStub;

       }

       //设置当连接失败时,是否刷新RMI stub

       public void setRefreshStubOnConnectFailure(boolean refreshStubOnConnectFailure) {

            this.refreshStubOnConnectFailure = refreshStubOnConnectFailure;

      }

     //设置客户端socket工厂

     public void setRegistryClientSocketFactory(RMIClientSocketFactory registryClientSocketFactory){

            this.registryClientSocketFactory = registryClientSocketFactory;

     }

 //Spring IoC容器回调方法,由子类RmiProxyFactoryBean回调方法调用

   public void afterPropertiesSet(){

        //调用父类RemoteInvocationBasedAccessor的回调方法

        super.afterPropertiesSet();

        prepare();

    }

   //初始化RMI客户端

    public void prepare() throws RemoteLookupFailureException {

            //如果设置了启动时查找RMI stub

            if(this.lookupStubOnStartup){

                //查找RMI stub

                 Remote remoteObj = lookupStub();

                 if(logger.isDebugEnabled()){

                    //如果查找到的RMI stub是RmiInvocationHandler类型

                    if(remoteObj instanceof RmiInvocationHandler) {

                          logger.debug("RMI stub [" + getServiceUrl() + "] is an RMI invoker");

                    }

                    //如果获取到客户端配置的serviceInterface不为null

                    else if(getServiceInterface()!= null){

                        //判断客户端配置的serviceInterface是否是RMI stub实例

                        boolean isImpl = getServiceInterface().isInstance(remoteObj);

                        logger.debug("Using service interface [" + getServiceInterface().getName() + "] for RMI stub [" + getServiceUrl() + "] - " +

                                 (!isImpl ? "not " : "") + "directly imlemented");

                      }

                  }

                  //如何设置了缓存RMI stub,将缓存的stub设置为查找到的RMI stub

                  if(this.cacheStub) {

                          this.cacheStub = remoteObj;

                 }

             }

        }

        //查找RMI stub

          protected Remote lookupStub() throws RemoteLookupFailureException {

                 try{

                       Remote stub = null;

                       //如果设置了客户端socket工厂

                      if(this.registryClientSocketFactory != null){

                          //获取并解析客户端配置的serviceUrl

                           URL url = new URL(null,getServiceUrl(),new DummyURLStreamHandler());

                          //获取客户端配置的serviceUrl协议

                           String protocol = url.getProtocol();

                          //如果客户端配置的serviceUrl中协议不为null且不是rmi

                          if(protocol != null && ! "rmi".equals(protocol)){

                                throw new MalformedURLException("Invalid URL schem '" + protocol + "'");

                          }

                         //获取客户端配置的serviceUrl中的主机地址

                         String host = url.getHost();

                        //获取客户端配置的serviceUrl中的端口

                         int port = url.getPort();

                        //获取客户端配置的serviceUrl中请求路径

                         String name = url.getPath();

                        //如果请求路径不为null,且请求路径以“/”开头,则取得“/”

                        if(name!=null && name.startsWith("/")){

                           name = name.substring(1);

                       }

                       //根据客户端配置的serviceUrl信息和客户端socket工厂创建远程对象引用

                        Registry registry = LocateRegistry.getRegistry(host,port,this.registryClientSocketFactory);

                       //通过远程对象引用查找指定RMI请求的RMI stub

                        stub = registry.lookup(name);

              }

              //如果客户端配置的serviceUrl中协议为null或者是rmi

              else {

                  //直接通过RMI标准API查找客户端配置的serviceUrl的RMI stub

                   stub = Naming.lookup(getServiceUrl());

              }

              if(logger.isDebugEnabled()){

                     logger.debug("Located RMI stub with URL [" + getServiceUrl() + "]");

              }

             return stub;

      }

   //对查找RMI stub过程中异常处理

     catch(MalformedURLException ex){

            throw new RemoteLookupFailureException("Service URL [" + getServiceUrl() + "] is invalid",ex);

      }

      catch(NotBoundException ex){

              throw new RemoteLookupFailureException("Could not find RMI service  [" + getServiceUrl() + "] in RMI registry",ex);

      }

      catch(RemoteException ex){

           throw new RemoteLookupFailureException("Lookup of RMI stub failed",ex);

      }

  }

  //获取RMI stub

   protected Remote getStub() throws RemoteLookupFailureException {

        //如果没有配置缓存RMI stub,或者设置了启动时查找RMI sub或当连接失败时不刷新RMI stub

        if(!this.cacheStub || (this.lookupStubOnStartup && !this.refreshStubOnConnectFailure)){

                //如果缓存的RMI stub不为null,则直接返回,否则,查找RMI stub

                return  (this.cacheStub != null ? this.cachedStub : lookupStub());

        }

        //如果设置了缓存RMI stub,且设置了启动时查找RMI stub或者当连接失败时刷新RMI stub

        else {

               //线程同步

               synchronized(this.stubMonitor){

                   //如果缓存的RMI stub为null

                     if(this.cachedStub == null){

                          //则将查找的RMI stub缓存

                           this.cachedStub = lookupStub();

                  }

                  //返回缓存的RMI stub
                  return this.cachedStub;

            }

         }

     }

    //拦截器对客户端远程调用方法的拦截入口

      public Object invoke(MethodInvocation invocation) throws Throwable {

           //获取RMI stub

           Remote stub = getStub();

           try{

                //拦截客户端远程调用方法

               return doInvoke(invocation,stub);

           }

           catch(RemoteConnectFailureException ex){

                return handleRemoteConnectFailure(invocation,ex);

           }

           catch(RemoteException ex){

              if(isConnectFailure(ex)){

                    return handleRemoteConnectFailure(invocation,ex);

              }

              else{

                    throw ex;

               }

            }

     }

  //判断是否连接失败

    protected boolean isConnectFailure(RemoteException ex){

           return RmiClientInterceptorUtils.isConnectFailure(ex);

      }

     //处理远程连接失败

       private Object handleRemoteConnectFailure(MethodInvocation invocation,Exception ex) throws Throwable {

            //如果设置了当连接失败时,刷新RMI stub

            if(this.refreshStubOnConnectFailure){

                String msg = "Could not connect to RMI service [" + getServiceUrl() + "] - retrying";

                if(logger.isDebugEnabled()){

                    logger.warn(msg.ex);

                 }

                else if(logger.isWarnEnabled()){

                      logger.warn(msg);

               }

               //刷新查找远程调用stub

                return refreshAndRetry(invocation);

           }

          else {

               throw ex;

          }

      }

      //刷新RMI stub

        protected Object refreshAndRetry(MethodInvocation invocation) throws Throwable {

            Remote freshStub = null;

           //线程同步

           synchronized(this.stubMonitor){

                 this.cacheStub = null;

                 //查找RMI stub

                 freshStub = lookupStub();

               //如果设置了缓存RMI stub

                if(this.cacheStub) {

                    //将刷新查找的RMI stub缓存

                     this.cacheStub = freshStub;

               }

          }

         return doInvoke(invocation,freshStub);

   }

  //具体RMI调用的地方

    protected Object doInvoke(MethodInvocation invocation,Remote stub) throws Throwable {

          //如果RMI stub是RmiInvocationHandler类型

           if(stub instanceof RmiInvocationHandler) {

               //调用RmiInvocationHandler的RMI

                try{

                      return doInvoke(invocation,(RmiInvocationHandler)stub);

                }

                catch(RemoteException ex){

                  throw RmiClientInterceptorUtils.convertRmiAccessException(invocation.getMethod(),ex,isConnectFailure(ex),getServiceUrl());

               }

               catch(InvocationTargetException ex){

                   Throwable exToThrow = ex.getTargetException();

                    RemoteInvocationUtils.fillInClientStackTraceIfPossible(exToThrow);

                    throw exToThrow;

             }

            catch(Throwable ex){

                throw new RemoteInvocationFailureException("Invocation of method [" +invocaiton.getMethod() +

                            "] failed in RMI service [" + getServiceUrl() + "]" ,ex);

             }

          }

          //如果RMI stub不是RmiInvocationHandler类型

          else {

                 //使用传统的RMI调用方式

                  try{

                        return RmiClientInterceptorUtils.invokeRemoteMethod(invocation,stub);

                  }

                  catch(InvocationTargetException ex) {

                       Throwable targetEx = ex.getTargetException();

                       if(targetEx instanceof RemoteException) {

                           RemoteException rex = (RemoteException) targetEx;

                           throw RmiClientInterceptorUtils.convertRmiAcccessException(invocation.getMethod(),rex,isConnectionFailure(rex),getServiceUrl());

                }

               else{

                      throw targetEx;

               }

          }

       }

   }

  //调用RmiInvocationHandler的RMI

  protected Object doInvoke(MethodInvocation methodInvocation,RmiInvocationHandler invocationHandler) throws RemoteException,

            NoSuchMethodException,IllegalAccessException,InvocationTargetException {

             //如果客户端远程调用请求时toString()方法

              if(AopUtils.isToStringMethod(methodInvocation.getMethod())){

                    return "RMI invoker proxy for service URL [" + getServiceUrl() + "]";

              }

      }

通过上面对RmiClientInterceptor源码分析,我们看到Spring对RMI远程调用使用以下两种方式:

(1)RMI调用器方式:

这种方式和Spring HTTP调用器非常类似,即使用RemoteInvocation来封装调用目标对象、目标方法、参数类型等信息,RMI服务器端接收到RMI请求之后直接调用目标对象的匹配方法。

(2)传统RMI调用方式:

使用JDK的反射机制,直接调用远程调用stub的方法。

5、RMI的服务端配置:

在Spring RMI服务端需要进行类似如下的配置:

<bean id="rmiService" class="org.springframework.remoting.rmi.RmiServiceExporter">

<property name="service">

<ref bean="RMI服务端对象"/>

</property>

<property name="serviceInterface">

<value>RMI服务接口</value>

</property>

<property name="serviceName">

 <value>RMI服务导出名称</value>

</property>

<property name="registerPort">

<value>1099</value>

</property>

</bean>

RMI中,基于TCP/IP协议,而不是HTTP协议来实现底层网络通信,由于RMI的网络通信已经由Java RMI实现,所以这里不再使用Spring MVC的DispatcherServlet来转发客户端配置的远程调用请求URL,只需要指定RMI的TCP/IP监听端口和服务导出的名称即可。

6、RmiServiceExporter导出RMI远程调用对象:

RmiServiceExporter主要功能是将服务端远程对象提供的服务导出供客户端请求调用,同时将导出的远程对象和注册器绑定起来供客户端查询,其主要源码如下:

public class RmiServiceExporter extends RmiBasedExporter implements InitializingBean,DisposableBean {

      //导出的RMI服务名称

        private String serviceName;

     //RMI服务端口

       private int servicePort = 0;

    //RMI客户端socket工厂

       private RMIClientSocketFactory clientSocketFactory;

    //RMI服务炖socket工厂

       private RMIServerSocketFactory serverSocketFactory;

    //注册器

        private Registry registry;

     //注册主机

        private String registryHost;

     //注册端口

        private int registryPort = Regstry.REGISTRY_PORT;

    //客户端注册socket工厂

       private RMIClientSocketFactory registryClientSocketFactory;

     //服务端注册socket工厂

       private RMIServerSocketFactory registryServerSocketFactory;

      //总是注册时注册

     private boolean alwaysCreateRegistry = false;

      //替换已有的绑定

      private boolean replaceExistingBinding = true;

     //导出的远程对象

      private Remote exportedObject;

    //创建注册

      private boolean createRegistry = false;

     //注入服务端配置的RMI导出服务名称,格式为:"rmi://host:post/NAME”

     public void setServiceName(String serviceName) {

             this.serviceName = serviceName;

      }

     //设置服务端口

      public void setServicePort(int servicePort){

              this.servicePort = servicePort;

      }

     //设置RMI客户端socket工厂

      public void setClientSocketFactory(RMIClientSocketFactory clientSocketFactory) {

           this.clientSocketFactory = clientSocketFactory;

    }

    //设置RMI服务端socket工厂

    public void setServerSocketFactory(RMIServerSocketFactory serverSocketFactory){

         this.serverSocketFactory = serverSocketFactory;

    }

   //设置RMI注册器

    public void setRegistry(Registry registry){

            this.registry = registry;

    }

   //设置RMI注册主机

    public void setRegistryHost(String registryHost) {

            this.registryHost = registryHost;

    }

    //设置RMI注册端口

     public void setRegistryPort(int registryPort) {

          this.registryPort = registryPort;

     }

    //设置用于注册的RMI客户端socket工厂

    public void setRegistryClientSocketFactory(RMIClientSocketFactory registryClientSocketFactory) {

           this.registryClientSocketFactory = registryClientSocketFactory;

    }

  //设置用于注册的RMI服务端socket工厂

    public void setRegistryServerSocketFactory(RMIServerSocketFactory registryServerSocketFactory) {

               this.registryServerSocketFactory = registryServerSocketFactory;

   }

 //设置是否总是创建注册,而不是试图查找指定端口上已有的注册

   public void setAlwaysCreateRegistry(boolean alwaysCreateRegistry) {

          this.alwaysCreateRegistry = alwaysCreateRegistry;

   }

   //设置是否提供已绑定的RMI注册

   public void setReplaceExistingBinding(boolean replaceExistingBinding) {

          this.replaceExistingBinding = replaceExistingBinding;

    }

   //IoC容器依赖注入完成之后的回调方法

  public void afterPropertiesSet() throws RemoteException {

          prepare();

  }

  //初始化RMI服务导出器

   public void prepare() throws RemoteException {

         //调用其父类RmiBasedExporter的方法,检查服务引用是否被设置

         checkService();

         //如果服务导出名称为null

        if(this.serviceName == null) {

           throw new IllegalArgumentException("Propertry 'serviceName' is required");

        }

       //检查socket工厂

       if(this.clientSocketFactory instanceof RMIServerSocketFactory) {

             this.serverSocketFactory = (RMIServerSocketFactory) this.clientSocketFactory;

       }

      if((this.clientSocketFactory != null && this.serverSocketFactory == null ) || (this.clientSocketFactory == null && this.serverSocketFactory != null)){

           throw new IllegalArgumentException("Both RMIClientSocketFactory and RMIServerSocketFactory or more required");

      }

     //检查RMI注册的socket工厂

     if(this.registryClientSocketFactory instanceof RMIServerSocketFactory) {

            this.registryServerSocketFactory = (RMIServerSocketFactory) this.registryClientSocketFactory;

     }

    if(this.registryClientSocketFactory == null && this.regitryServerSocketFactory != null) {

           throw new IllegalArgumentException("RMIServerSocketFactory without RMIClientSocketFactory for registry not supported");

    }

    this.createdRegistry = false;

   //获取RMI注册器

    if(this.registry == null) {

         this.registry = getRegistry(this.registryHost,this.registryPort, this.registryClientSocketFactory,this.registryServerSocketFactory);

         this.createRegistry = true;

    }

    //获取要被导出的服务端远程对象

      this.exportedObject = getObjectToExport();

      if(logger.isInfoEnabled()){

         logger.info("Binding service '" + this.serviceName + " ' to RMI registry: " + this.registry);

     }

     //导出远程服务对象

       if(this.clientSocketFactory != null) {

           UnicastRemoteObject.exportObject(this.exportedObject,this.servicePort,this.clientSocketFactory,this.serverSocketFactory);

      }

     else {

          UnicastRemoteObject.exportObject(this.exportedObject,this.servicePort);

     }

     //将RMI对象绑定到注册器

       try{

             if(this.replaceExistingBinding) {

                  this.registry.rebind(this.serviceName,this.exportedObject);

              }

              else{

                  this.registry.bind(this.serviceName,this.exportedObject);

             }

         }

       //异常处理

       catch(AlreadyBoundException ex){

              unexportObjectSilently();

               throw new IllegalStateException("Already an RMI object bound for name '" + this.serviceName + "' : " + ex.toString());

       }

       catch(RemoteException ex){

             unexportObjectSilently();

             throw ex;

        }

    }

   .....

}

7、RemoteInvocationBasedExporter处理RMI远程调用请求:

 RmiServiceExporter的父类RmiBasedExporter的父类RemoteInvocationBasedExporter负责对RMI远程调用请求进行处理,并将处理的结果封装返回,其源码如下:

public abstract class RemoteInvocationBasedExporter extends RemoteExporter {

    //RMI远程调用处理器,RMI远程调用请求是由DefaultRemoteInvocationExecutor处理

   private RemoteInvocationExecutor remoteInvocationExecutor = new DefaultRemoteInvocationExecutor();

  //设置RMI远程调用处理器

   public void setRemoteInvocationExecutor(RemoteInvocationExecutor remoteInvocationExecutor) {

         this.remoteInvocationExecutor = remoteInvocationExecutor;

   }

  //获取RMI远程调用处理器

   public RemoteInvocationExecutor getRemoteInvocationExecutor(){

        return this.remoteInvocationExecutor;

   }

  //对RMI远程调用请求进行处理的地方

   protected Object invoke(RemoteInvocation invocation,Object targetObject) throws NoSuchMethodException,IllegalAccessException,InvocationTargetException {

            if(logger.isTraceEnabled()){

                 logger.trace("Executing " + invocation);

            }

           try{

                //调用DefaultRemoteInvocationExecutor对RMI远程调用请求进行处理

                return getRemoteInvocationExecutor().invoke(invocation,targetObject);

           }

           catch(NoSuchMethodException ex){

               if(logger.isDebugEnabled()){

                    logger.warn("Could not find target method for " + invocation,ex);

               }

               throw ex;

        }

       catch(IllegalAccessException ex){

              if(logger.isDebugEnabled()){

                  logger.warn("Could not access target method for " + invocation,ex);

              }

              throw ex;

       }

       catch(InvocationTargetException ex){

               if(logger.isDebugEnabled()){

                    logger.debug("Target method failed for " + invocation,ex.getTargetException());

              }

              throw ex;

       }

   }

  //获取RMI远程调用请求的处理结果

   protected RemoteInvocationResult invokeAndCreateResult(RemoteInvocation invocation, Object targetObject) {

          try{

               Object value = invoke(invocation,targetObject);

                return new RemoteInvocationResult(value);

          }

          catch(Throwable ex){

                 return new RemoteInvocationResult(ex);

          }

      }

}

RMI远程调用请求最终由DefaultRemoteInvocationExecutor处理。

8、DefaultRemoteInvocationExecutor用于处理RMI远程调用请求,并返回处理后的结果,其源码如下:

public class DefaultRemoteInvocationExecutor implements RemoteInvocationExecutor {

       //处理RMI远程调用请求

       public Object invoke(RemoteInvocation invocation,Object targetObject) throws NoSuchMethodException,IllegalAccessException,InvocationTargetException {

             Assert.notNull(invocation,"RemoteInvocation must not be null");

             Asser.notNull(targetObject,"Target object must not be null");

             //调用RemoteInvocation处理RMI远程调用请求

              return invocation.invoke(targetObject);

     }

}

9、RemoteInvocation处理RMI远程调用请求:

RemoteInvocation的invoke方法处理RMI远程调用请求,并返回远程调用处理后的结果,其源码如下:

public Object invoke(Object targetObject) throws NoSuchMethodException,IllegalAccessException,InvocationTargetException {

      //使用JDK反射机制获取远程调用服务端目标对象的方法

      Method method = targetObject.getClass().getMethod(this.methodName,this.parameterTypes);

     //使用JDK反射机制调用目标对象的方法

     return method.invoke(targetObject,this.arguments);

}

       

     

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值