SpringCloud

1. SpringCloud 综述

Spring Cloud 是一套微服务规范

1.1 解决的问题

  • 微服务注册与发现
  • 网络问题(熔断场景)
  • 统一认证安全授权
  • 负载均衡
  • 链路追踪等

1.2 架构

1.2.1 核心组件

Spring Cloud 生态圈的组件按照发展可以分为两代

组件SCN(第一代)SCA(第二代)
注册中心Netflix Eureka阿里 Nacos
客户端负载均衡Netflix Ribbon阿里 Dubbo LB、
Spring Cloud LoadBalancer
熔断器/断路器Netflix Hustrix阿里 Sentinel
网关Netflix Zuul(性能一般)Spring Cloud GateWay
配置中心Spring Cloud Config阿里 Nacos、携程 Apollo
服务协调Netflix Feign阿里 Dubbo RPC
消息驱动Spring Cloud Stream
链路追踪Spring Cloud Sleuth / Zipkin
分布式事务阿里 Seata

1.2.2 体系结构

image-20210203230323802

Spring Cloud 各组件协同工作才能够支持一个完整的微服务架构,比如

  • 注册中心负责服务的注册与发现,将各服务连接起来
  • API 网关负责转发所有外来的请求
  • 断路器负责监控服务之间的情况,乱系多次失败进行熔断保护
  • 配置中心提供了统一的配置信息管理服务,可以实时地通知各个服务获取最新的配置信息

1.3 Spring Cloud 与 Dubbo 对比

  • 通信协议不同,Spring Cloud 基于应用层协议 HTTP,Dubbo 基于传输层协议 TCP / IP,Dubbo的效率高
  • Spring Cloud 体系全,可以提供一站式解决方案

1.4 Spring Cloud 与 Spring Boot 对比

  • Spring Cloud 只是利用了 Spring Boot 快速部署、自动装配的特点

2. 第一代 Spring Cloud 核心组件

2.1 服务注册中心 Eureka

2.1.1 服务注册中心的本质

  • 解耦服务提供者和服务消费者,服务消费者不需要知道服务具体位置,透明化路由
  • 在分布式环境下支持弹性扩容和缩容

2.1.2 架构

  • 服务注册中心一般工作原理

    image-20210205085200703

  • Eureka 基础架构

    image-20210205090004156

  • Eureka 集群交互流程及原理

    image-20210205091324467

    Eureka 通过心跳检测、健康检查和客户端缓存等机制,提高系统的灵活性、可伸缩性和可用性

    • 微服务启动后,会周期性地向 Eureka Server 发送心跳,默认30s
    • Eureka Server 在一定时间没有接收到某个微服务节点的心跳,会注销该微服务节点,默认90s
    • Eureka Client 会缓存 Eureka Server 的信息,即使所有的 Server 节点都宕机,Client 也能通过缓存找到服务提供者

2.1.3 Eureka 细节

2.1.3.1 元数据
@Autowired
private DiscoveryClient discoveryClient;
// 1、从 Eureka Server中获取serviceId服务的实例信息(使用客户端对象做这件事)
List<ServiceInstance> instances = discoveryClient.getInstances("serviceId");
// 2、如果有多个实例,选择一个使用(负载均衡的过程)
ServiceInstance serviceInstance = instances.get(0);
  • 标准元数据,如主机名、IP 地址、端口号等,
  • 自定义元数据,可以通过 eureka.instance.metadata-map 配置,存储格式为key=value
2.1.3.2 客户端
  • 服务注册

    项目引入 eureka-client 依赖和配置了 eureka.client.service-url.defaultZone 之后,服务启动时会向注册中心发起注册请求,请求携带了服务元数据信息,Eureka 注册中心将信息保存在 Map 中。

  • 服务续约

    • 服务每隔 30 秒会向注册中心续约(心跳)一次,也叫续活,可以修改 eureka.instance.lease-renewal-interval-in-seconds 默认值
    • 如果没有续约,租约会在 90 秒后到期,服务从列表中被移除,可以修改 eureka.instance.lease-expiration-duration-in-seconds 默认值
  • 获取服务列表

    服务每隔 30 秒会从注册中心拉取一份服务列表缓存到本地,可以修改 eureka.clinet.registry-fetch-interval-seconds 默认值

2.1.3.3 服务端
  • 服务下线

    服务正常关闭操作时,会发送服务下线的 REST 请求给 Eureka Server,注册中心接收到请求后将该服务置为下线状态

  • 失效剔除

    Eureka Server 每隔 60 秒检查,如果没有收到心跳,会注销该实例,可以修改 eureka.instance.lease-expiration-duration-in-seconds 默认值

  • 自我保护

    15 分钟之内超过 85% 的客户端节点都没有正常的心跳,进入自我保护模式

2.1.4 Eureka 源码分析

2.1.4.1 Eureka Server 源码
2.1.4.1.1 Eureka Server 启动过程

最核心的任务:从其他节点拷贝注册信息到自己的注册表

  • 如果要将一个服务标注为注册中心,需要使用注解 @EnableEurekaServer ,查看该注解,里面导入了 EurekaServerMarkerConfiguration 配置类,其作用是产生一个类型为 Marker 的bean,进而激活 EurekaServerAutoConfiguration 这个自动配置类

    /**
     * Responsible for adding in a marker bean to activate
     * {@link EurekaServerAutoConfiguration}
     *
     * @author Biju Kunjummen
     */
    @Configuration
    public class EurekaServerMarkerConfiguration {
    
    	@Bean
    	public Marker eurekaServerMarkerBean() {
    		return new Marker();
    	}
    
    	class Marker {
    	}
    }
    
  • 在 eureka-server 的 jar 包,也可以看到自动装配的配置文件 spring.factories,找到 EurekaServerAutoConfiguration 自动配置类

    image-20210214171025495

  • 查看 EurekaServerAutoConfiguration 自动配置类,里面做了许多事,如提供仪表盘服务、对等节点信息更新

    @Configuration
    // 导入初始化配置类
    @Import(EurekaServerInitializerConfiguration.class)
    // 当 Marker 这个 Bean 存在时才加载
    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
    @EnableConfigurationProperties({ 
        EurekaDashboardProperties.class, InstanceRegistryProperties.class })
    @PropertySource("classpath:/eureka/server.properties")
    public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    	
        // 仪表盘,对外提供服务的接口
        @Bean
    	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
    	public EurekaController eurekaController() {
    		return new EurekaController(this.applicationInfoManager);
    	}
        
        // 对等节点感知实例注册器(集群环境下注册服务使用到的注册器)
    	@Bean
    	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
    			ServerCodecs serverCodecs) {
    		this.eurekaClient.getApplications(); // force initialization
    		return new InstanceRegistry(
                this.eurekaServerConfig, this.eurekaClientConfig,
    			serverCodecs, this.eurekaClient, 
           this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
                this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    	}
        
        // 封装对等节点相关的信息和操作,如更新集群中的对等节点
    	@Bean
    	@ConditionalOnMissingBean
    	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
    			ServerCodecs serverCodecs) {
    		return new RefreshablePeerEurekaNodes(
                registry, this.eurekaServerConfig, this.eurekaClientConfig, 
                serverCodecs, this.applicationInfoManager);
    	}
    }
    
    • 对外提供 RESTFUL 服务:EurekaServerAutoConfiguration 配置类注册了一个 Jersey 过滤器,用于发布 RESTFUL 服务(例如仪表盘),类似 Spring MVC

      @Bean
      public FilterRegistrationBean jerseyFilterRegistration(
          javax.ws.rs.core.Application eurekaJerseyApp) {
          FilterRegistrationBean bean = new FilterRegistrationBean();
          bean.setFilter(new ServletContainer(eurekaJerseyApp));
          bean.setOrder(Ordered.LOWEST_PRECEDENCE);
          bean.setUrlPatterns(
              Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
          return bean;
      }
      
    • 对等节点信息更新:查看对等节点 PeerEurekaNodesstart 方法,内部创建了线程池,用于定时更新节点信息,并且通过调用关系得到,start 方法是在 DefaultEurekaServerContext 对象创建之后调用的

      // com.netflix.eureka.cluster.PeerEurekaNodes#start
      public void start() {
          // 定时任务线程池
          taskExecutor = Executors.newSingleThreadScheduledExecutor(
              new ThreadFactory() {
                  @Override
                  public Thread newThread(Runnable r) {
                      Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                      thread.setDaemon(true);
                      return thread;
                  }
              }
          );
          try {
              // 更新对等节点信息
              updatePeerEurekaNodes(resolvePeerUrls());
              Runnable peersUpdateTask = new Runnable() {
                  @Override
                  public void run() {
                      try {
                          updatePeerEurekaNodes(resolvePeerUrls());
                      } catch (Throwable e) {
                          logger.error("Cannot update the replica Nodes", e);
                      }
      
                  }
              };
              // 定时更新对等节点信息
              taskExecutor.scheduleWithFixedDelay(
                  peersUpdateTask,
                  serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                  serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                  TimeUnit.MILLISECONDS
              );
          } catch (Exception e) {
              throw new IllegalStateException(e);
          }
          for (PeerEurekaNode node : peerEurekaNodes) {
              logger.info("Replica node URL:  {}", node.getServiceUrl());
          }
      }
      
      // com.netflix.eureka.DefaultEurekaServerContext#initialize,使用了@PostConstruct
      @PostConstruct
      @Override
      public void initialize() {
          logger.info("Initializing ...");
          peerEurekaNodes.start();
          try {
              registry.init(peerEurekaNodes);
          } catch (Exception e) {
              throw new RuntimeException(e);
          }
          logger.info("Initialized");
      }
      
  • 查看 EurekaServerInitializerConfiguration 配置类的 start 方法,里面创建了一个线程用于初始化 EurekaServerContext

    public void start() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                   // 重点关注:初始化EurekaServerContext细节
                    eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                    log.info("Started Eureka Server");
                    // 发布事件
                    publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                    // 设置状态属性
                    EurekaServerInitializerConfiguration.this.running = true;
                    // 发布事件
                    publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
                }
                catch (Exception ex) {
                    // Help!
                    log.error("Could not initialize Eureka servlet context", ex);
                }
            }
        }).start();
    }
    
    • org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized

      public void contextInitialized(ServletContext context) {
          try {
              // 初始化环境
              initEurekaEnvironment();
              // 重点关注:初始化上下文
              initEurekaServerContext();
              context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
          } catch (Throwable e) {
              log.error("Cannot bootstrap eureka server :", e);
              throw new RuntimeException("Cannot bootstrap eureka server :", e);
          }
      }
      
    • org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#initEurekaServerContext

      protected void initEurekaServerContext() throws Exception {
          // For backward compatibility
          JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                                                      XStream.PRIORITY_VERY_HIGH);
          XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                                                     XStream.PRIORITY_VERY_HIGH);
      
          if (isAws(this.applicationInfoManager.getInfo())) {
              this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                                                     this.eurekaClientConfig, this.registry, this.applicationInfoManager);
              this.awsBinder.start();
          }
      	// Holder 类为非 IOC 容器提供获取 serverContext 对象的接口
          EurekaServerContextHolder.initialize(this.serverContext);
      
          log.info("Initialized server context");
      
          // server 启动时从其他节点拷贝注册信息
          int registryCount = this.registry.syncUp();
          // 设置实例状态为 UP,并对外提供服务
          this.registry.openForTraffic(this.applicationInfoManager, registryCount);
      
          // 注册统计器
          EurekaMonitors.registerAllStats();
      }
      
    • 查看 syncUp 方法,其作用是从已有节点列表中获取注册信息并填充到新的节点

      public int syncUp() {
          // Copy entire entry from neighboring DS node
          int count = 0;
      
          // 可能网络原因而没有连上server,做重试
          for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
              if (i > 0) {
                  try {
                      Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                  } catch (InterruptedException e) {
                      logger.warn("Interrupted during registry transfer..");
                      break;
                  }
              }
              Applications apps = eurekaClient.getApplications();
              for (Application app : apps.getRegisteredApplications()) {
                  for (InstanceInfo instance : app.getInstances()) {
                      try {
                          if (isRegisterable(instance)) {
                              // 注册:把远程节点的注册信息注册到自己的注册表中
      // 注册表ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
                              register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                              count++;
                          }
                      } catch (Throwable t) {
                          logger.error("During DS init copy", t);
                      }
                  }
              }
          }
          return count;
      }
      
    • 查看 openForTraffic 方法,其作用是设置实例状态为 UP,同时开启 Timer 定时任务剔除失效服务

      // 集群模式的实现类com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
      @Override
      public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
          // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
          this.expectedNumberOfClientsSendingRenews = count;
          updateRenewsPerMinThreshold();
          logger.info("Got {} instances from neighboring DS node", count);
          logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
          this.startupTime = System.currentTimeMillis();
          if (count > 0) {
              this.peerInstancesTransferEmptyOnStartup = false;
          }
          DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
          boolean isAws = Name.Amazon == selfName;
          if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
              logger.info("Priming AWS connections for all replicas..");
              primeAwsReplicas(applicationInfoManager);
          }
          logger.info("Changing status to UP");
          // 设置实例状态为 UP
          applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
          // 通过 Timer 开启定时任务剔除失效服务
          super.postInit();
      }
      
      // super
      protected void postInit() {
          renewsLastMin.start();
          if (evictionTaskRef.get() != null) {
              evictionTaskRef.get().cancel();
          }
          // 任务逻辑: EvictionTask
          evictionTaskRef.set(new EvictionTask());
          evictionTimer.schedule(evictionTaskRef.get(),
                                 serverConfig.getEvictionIntervalTimerInMs(),
                                 serverConfig.getEvictionIntervalTimerInMs());
      }
      
2.1.4.1.2 Eureka Server 服务接口暴露策略
  • Eureka Server 启动时注册了 Jersey 过滤器,方法入参需要依赖注入 Application

    /**
     * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
     * required by the Eureka server.
     */
    @Bean
    public javax.ws.rs.core.Application jerseyApplication(Environment environment,
                                                          ResourceLoader resourceLoader) {
    
        ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
            false, environment);
    
        // Filter to include only classes that have a particular annotation.
        // 找到贴了Path注解的类,类似@RequestMapping
        provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
        provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    
        // Find classes in Eureka packages (or subpackages)
        // 扫描指定包及子包下的注解,类似@ComponentScan
        // EUREKA_PACKAGES:"com.netflix.discovery", "com.netflix.eureka"
        Set<Class<?>> classes = new HashSet<>();
        for (String basePackage : EUREKA_PACKAGES) {
            Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
            for (BeanDefinition bd : beans) {
                Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
                                                           resourceLoader.getClassLoader());
                classes.add(cls);
            }
        }
    
        // Construct the Jersey ResourceConfig
        //
        Map<String, Object> propsAndFeatures = new HashMap<>();
        propsAndFeatures.put(
            // Skip static content used by the webapp
            ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
            EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
    
        DefaultResourceConfig rc = new DefaultResourceConfig(classes);
        rc.setPropertiesAndFeatures(propsAndFeatures);
    
        return rc;
    }
    
  • 对外提供的接口服务,在 Jersey 中称为资源,resources 包下就是 Eureka Server 使用 Jersey 发布的 RESTFUL 风格服务接口

    image-20210215221213746

2.1.4.1.3 Eureka Server服务注册接口
  • 查看 com.netflix.eureka.resources.ApplicationResource#addInstance,这是接收客户端注册服务的接口

    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // 参数校验
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }
    
        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
    
        // 注册
        registry.register(info, "true".equals(isReplication));
        // 完成注册返回204(留意 Eureka Client 注册成功之后会打印一个204)
        return Response.status(204).build();  // 204 to be backwards compatible
    }
    
  • 查看 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        // 服务时效间隔,默认90s
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            // 如果客户端自己设置了,以客户端配置为准
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        // 调用父类的 register 方法注册实例(把实例信息存储到注册表)
        super.register(info, leaseDuration, isReplication);
        // 将注册到当前 server 的实例信息同步到其他 peer 节点
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    
  • 查看实例注册过程 com.netflix.eureka.registry.AbstractInstanceRegistry#register

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 读锁
            read.lock();
            // 从 registry 获取 appName(即spring.application.name)的所有实例信息
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            
            // appName 的实例信息为空,那么新建一个 map
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            
            // 获取实例的 Lease 租约信息
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 如果有租约,则保留最后一个时间戳而不覆盖它
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                ...
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    // log
                    registrant = existingLease.getHolder();
                }
            } else {
                // 如果不存在租约,说明是新实例注册
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        // 更新续约阈值(85%)
                        updateRenewsPerMinThreshold();
                    }
                }
                // log
            }
            
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            
            // 将实例信息放到注册表中
            gMap.put(registrant.getId(), lease);
            // 同步维护最近注册队列
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            
            // 如果当前实例已经维护了 OverriddenStatus,将其也放到此 Eureka Server 的 overriddenInstanceStatusMap 中
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
    
            // 根据 overridden status 规则设置状态
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
            // 如果续约以 UP 状态注册,将当前时间戳设置为租赁服务时间戳
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            // 更新最后更新时间
            registrant.setLastUpdatedTimestamp();
            // 使当前应用的 ResponseCache 失效
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                        registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            // 解锁
            read.unlock();
        }
    }
    
  • 查看实例同步过程 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers

    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        // 计时器
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            
            // 如果没有集群或者已经复制了,结束复制
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
    
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    // 跳过自己的 url
                    continue;
                }
                // 复制实例操作到节点,根据不同的 Action 来响应,如下线、心跳续约、注册等
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
    
2.1.4.1.4 Eureka Server 服务续约接口
  • 查看 com.netflix.eureka.resources.InstanceResource#renewLease,这是接收客户端续约的接口

    @PUT
    public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 续约
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        ...
    }
    
  • 查看 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew

    public boolean renew(final String appName, final String id, final boolean isReplication) {
        // 调用父类的续约方法,内部其实是调用leaseToRenew.renew(),更新 lastUpdateTimestamp
        if (super.renew(appName, id, isReplication)) {
            // 续约成功后,将心跳续约操作同步到其他节点
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
    
2.1.4.2 Eureka Client 端源码
2.1.4.2.1 Eureka Client 启动过程主干
  1. 读取配置
  2. 启动时从 Eureka Server 获取服务实例信息
  3. 将自己注册到 Eureka Server(addInstance)
  4. 开启一些定时任务,如心跳续约,刷新本地服务缓存列表
2.1.4.2.2 Eureka Client 注册服务
  • 要将一个服务注册到注册中心,只需要引入 eureka-client 的 jar 包(注解 @EnableEurekaClient 是可选的),可以看到自动装配的配置文件 spring.factories,找到 EurekaClientAutoConfiguration 自动配置类,里面有个 EurekaDiscoveryClientConfiguration 需要关注

    @Configuration
    @EnableConfigurationProperties
    // EurekaClientConfig 在 server-client 的 jar 里面,只要引入了就会被自动装配,成为客户端
    @ConditionalOnClass(EurekaClientConfig.class)
    // 如果不想作为客户端,可以设置属性 eureka.client.enabled=false
    @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
    public class EurekaDiscoveryClientConfiguration {
        // 自动装配过程中,这里就注入了 Marker,所以可以不使用 @EnableEurekaClient
    	@Bean
    	public Marker eurekaDiscoverClientMarker() {
    		return new Marker();
    	}
    }
    
  • 回到主类,查看 EurekaClientAutoConfiguration

    ...
    public class EurekaClientAutoConfiguration {
        // 读取配置文件,封装成 bean 注入到容器
        @Bean
    	@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
    	public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, 
            ManagementMetadataProvider managementMetadataProvider) {
    		String hostname = getProperty("eureka.instance.hostname");
    		boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
    		String ipAddress = getProperty("eureka.instance.ip-address");
            ...
        }
        
        // 实例化客户端
        @Bean(destroyMethod = "shutdown") // bean 销毁时调用的方法
        @ConditionalOnMissingBean(value = EurekaClient.class, 
                                  search = SearchStrategy.CURRENT)
        public EurekaClient eurekaClient(ApplicationInfoManager manager, 
                                         EurekaClientConfig config) {
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                                         this.context);
        }
    }
    
  • 查看实例化客户端的过程

    // org.springframework.cloud.netflix.eureka.CloudEurekaClient
    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
                             EurekaClientConfig config,
                             AbstractDiscoveryClientOptionalArgs<?> args,
                             ApplicationEventPublisher publisher) {
        // 调用父类构造函数
        super(applicationInfoManager, config, args);
        this.applicationInfoManager = applicationInfoManager;
        this.publisher = publisher;
        this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
        ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }
    
    // com.netflix.discovery.DiscoveryClient#DiscoveryClient
    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
        // 调用另一个构造器
        this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
            	// 注册失败时降级处理,不关注
        	});
    }
    
    // com.netflix.discovery.DiscoveryClient#DiscoveryClient
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        ...
        // fetchRegistry:从注册中心获取注册信息列表
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            // 获取失败,降级处理
            fetchRegistryFromBackup();
        }
        
        ...
    
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                // 将自己注册到 Eureka Server
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
        
        // 初始化定时任务,如刷新本地缓存定时任务、心跳
        initScheduledTasks();
    }
    
  • 分析 com.netflix.discovery.DiscoveryClient#fetchRegistry 方法,如果本地缓存没有注册信息,则全量获取,否则更新本地缓存

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // 从本地缓存中获取信息列表
        Applications applications = getApplications();
        if (clientConfig.shouldDisableDelta()
            || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
            || forceFullRegistryFetch
            || (applications == null)
            || (applications.getRegisteredApplications().size() == 0)
            || (applications.getVersion() == -1)) {
            ...
            // 全量获取(首次注册或者服务被剔除,通过 jerseyClient 通信)
            getAndStoreFullRegistry();
        } else {
            // 更新
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    }
    ...
}
  • 分析 com.netflix.discovery.DiscoveryClient#register

    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            // 通过 jerseyClient 发起注册请求
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        }
        ...
        // 状态码是204,即注册成功
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }
    
  • 分析 com.netflix.discovery.DiscoveryClient#initScheduledTasks

    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // 刷新本地缓存定时任务,
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
    
        if (clientConfig.shouldRegisterWithEureka()) {
            ...
            // 心跳定时任务,
            scheduler.schedule(
                new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);
            ...
        }
    }
    
2.1.4.2.3 Eureka Client 心跳续约
  • HeartbeatThread -》调用 renew 与服务端通信成功 -》更新 lastSuccessfulHeartbeatTimestamp

    // com.netflix.discovery.DiscoveryClient.HeartbeatThread
    private class HeartbeatThread implements Runnable {
    
        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    
2.1.4.2.4 Eureka Client 下线服务
  • 在实例化EurekaClient时,声明了bean 销毁时调用的方法 @Bean(destroyMethod = “shutdown”),用于下线服务

    @PreDestroy
    @Override
    public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");
    
            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }
    
            cancelScheduledTasks();
    
            // If APPINFO was registered
            if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                // 注销服务
                unregister();
            }
    
            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }
    
            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();
    
            logger.info("Completed shut down of DiscoveryClient");
        }
    }
    
  • 查看 com.netflix.discovery.DiscoveryClient#unregister,底层使用jerseyClient发送下线请求

    void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {
                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }
    

2.2 负载均衡 Ribbon

2.2.1 负载均衡介绍

  • 按负载均衡算法执行的位置,分为两种
    • 服务端负载均衡,如Nginx、F5,负载均衡的算法在服务端执行
    • 客户端负载均衡,如 Ribbon,负载均衡的算法在客户端执行

2.2.2 负载均衡策略

  • 接口 com.netflix.loadbalancer.IRule,Ribbon 内置了多种负载均衡策略

    image-20210216230037784

  • 负载均衡策略

    负载均衡策略描述
    RoundRobinRule 轮询策略默认超过 10 次获取的 server 都可不用时会返回一个空的 server
    RandomRule 随机策略如果随机到的 server 为 null 或者不可用,会 while 不停的循环获取
    RetryRule 重试策略一定时限内循环重试
    BestAvailableRule 最小连接数策略遍历 serverList,选取可用的且连接数最小的一个 server
    AvailabilityFiltering 可用过滤策略扩展了轮询策略,先通过默认的轮询选取一个 server,再判断该 server 是否可用、链接数是否超限
    ZoneAvoidanceRule 区域隔离策略(默认)扩展了轮询策略,继承了两个过滤器,会过滤超时和链接数过多的server,还会过滤掉不符合要求的zone区域里所有的节点
  • 修改负载均衡策略

    # 指定被调用服务的负载均衡策略,如果不指定则是全局
    order-service:
      ribbon:
        NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
    

2.2.3 Ribbon 源码分析

2.2.3.1 工作原理
  • 请求过程

    • 客户端通过 restTemplate.getForObject 发起请求

    • 请求被拦截器拦截

    • 拦截器通过请求 URL 里的服务名称获取对应的服务实例列表

    • 按照负载均衡算法获取选取一个服务实例

    • 最终再通过 RestTemplate 调用远程服务

      image-20210217103126312

  • Ribbon 组件

    • IRule ,负载均衡策略接口

    • IPing,心跳检测接口

    • ServerListFilter,过滤服务实例列表接口

    • ServerListUpdater,更新服务实例列表接口

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jlCyvuJP-1613892039385)(C:\Users\linjc\AppData\Roaming\Typora\typora-user-images\image-20210217104055598.png)]

      负载均衡管理器 LoadBalancer 是核心,相当于大脑,负责协调上述组件(躯干)

2.2.3.2 @LoadBalanced 源码分析
  • 查看注解 @LoadBalanced,其作用是将普通的 RestTemplate 配置为一个 LoadBalancerClient 来使用

    /**
     * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
     * @author Spencer Gibb
     */
    @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Qualifier
    public @interface LoadBalanced {
    }
    
    public interface LoadBalancerClient extends ServiceInstanceChooser {
        // 根据服务执行请求
    	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
    
    	// 根据服务执行请求
    	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
    
    	// 生成URL,serviceName:port =》 ip:port
    	URI reconstructURI(ServiceInstance instance, URI original);
    }
    
  • 查看 netflix-ribbon 的 jar 包,找到自动装配文件 spring.factories

    image-20210217195433950

  • 分析 RibbonAutoConfiguration

    看到是先加载 RibbonAutoConfiguration,然后再加载 LoadBalancerAutoConfiguration,其中注册 SpringClientFactory bean对象时还装配了RibbonClientConfiguration

    @Configuration
    @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
    @RibbonClients
    @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
    // 加载 RibbonAutoConfiguration 后,再加载 LoadBalancerAutoConfiguration
    @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
    @EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class})
    public class RibbonAutoConfiguration {
    
        // 注册 SpringClientFactory,其构造器内部装配了 RibbonClientConfiguration
        // 而 RibbonClientConfiguration 又装配了 Ribbon 的大脑和躯干,如 ILoadBalancer
    	@Bean
    	public SpringClientFactory springClientFactory() {
    		SpringClientFactory factory = new SpringClientFactory();
    		factory.setConfigurations(this.configurations);
    		return factory;
    	}
    
        // 注册 LoadBalancerClient,使接下来的 LoadBalancerAutoConfiguration 能装配
    	@Bean
    	@ConditionalOnMissingBean(LoadBalancerClient.class)
    	public LoadBalancerClient loadBalancerClient() {
    		return new RibbonLoadBalancerClient(springClientFactory());
    	}
        ...
    }
    
  • 分析 LoadBalancerAutoConfiguration,主要是给 RestTemplate 添加负载均衡拦截器

    @Configuration
    // 只有 RestTemplate 这个类存在时才装配
    @ConditionalOnClass(RestTemplate.class)
    // RibbonAutoConfiguration 装配时已注册 LoadBalancerClient
    @ConditionalOnBean(LoadBalancerClient.class)
    @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
    public class LoadBalancerAutoConfiguration {
    
        // 注入添加了 @LoadBalanced 注解的 RestTemplate 集合
    	@LoadBalanced
    	@Autowired(required = false)
    	private List<RestTemplate> restTemplates = Collections.emptyList();
    
    	@Configuration
    	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    	static class LoadBalancerInterceptorConfig {
            // 注册负载均衡拦截器
    		@Bean
    		public LoadBalancerInterceptor ribbonInterceptor(
    				LoadBalancerClient loadBalancerClient,
    				LoadBalancerRequestFactory requestFactory) {
    			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
    		}
    
            // 注册 RestTemplate 定制器,并给拦截链添加一个负载均衡拦截器
    		@Bean
    		@ConditionalOnMissingBean
    		public RestTemplateCustomizer restTemplateCustomizer(
    				final LoadBalancerInterceptor loadBalancerInterceptor) {
    			return restTemplate -> {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                };
    		}
    	}
        
    	@Bean
    	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
    			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
    		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
                for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                    for (RestTemplateCustomizer customizer : customizers) {
                        // 使用定制器给 RestTemplate 对象添加一个拦截器
                        customizer.customize(restTemplate);
                    }
                }
            });
    	}
    }
    
    • 查看 LoadBalancerInterceptorintercept 方法

      @Override
      public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                                          final ClientHttpRequestExecution execution) throws IOException {
          // 获取请求URL
          final URI originalUri = request.getURI();
          // 从URL里获取服务名称
          String serviceName = originalUri.getHost();
          Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
          // 由 LoadBalancerClient 执行负载均衡逻辑
          return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
      }
      
    • 查看 RibbonLoadBalancerClientexecute 方法,这里分为三步

      public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
          // 1.获取负载均衡器(RibbonClientConfiguration 装配时注册的)
          ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
          // 2.根据负载均衡器,选取一个服务实例
          Server server = getServer(loadBalancer, hint);
          if (server == null) {
              throw new IllegalStateException("No instances available for " + serviceId);
          }
          RibbonServer ribbonServer = new RibbonServer(
              serviceId, server, isSecure(server, serviceId),
              serverIntrospector(serviceId).getMetadata(server));
      
          // 3.执行,调用远程服务
          return execute(serviceId, ribbonServer, request);
      }
      
      // 分析下getServer方法,选取服务实例的逻辑,默认是区域隔离策略,即轮询
      // com.netflix.loadbalancer.AbstractServerPredicate#incrementAndGetModulo
      // modulo 是 服务实例列表长度
      private int incrementAndGetModulo(int modulo) {
          for (;;) {
              int current = nextIndex.get();
              // 通过模运算得到下一个服务实例的索引值
              int next = (current + 1) % modulo;
              // 通过CAS设置下一个服务实例的索引值(避免并发场景下产生数据错乱问题)
              if (nextIndex.compareAndSet(current, next) && current < modulo)
                  return current;
          }
      }
      
      // 分析下 execute 方法
      @Override
      public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
          ...
          try {
              // 向服务实例发起请求,底层仍然是 RestTemplate 的底层代码
              T returnVal = request.apply(serviceInstance);
              statsRecorder.recordStats(returnVal);
              return returnVal;
          }
          ...
      }
      
  • 分析 RibbonClientConfiguration,看看在负载均衡调用 chooseServer 时,ServerList是如何注入的

    @Configuration
    @EnableConfigurationProperties
    @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
    public class RibbonClientConfiguration {
        @Bean
    	@ConditionalOnMissingBean
    	public IRule ribbonRule(IClientConfig config) {
    		if (this.propertiesFactory.isSet(IRule.class, name)) {
    			return this.propertiesFactory.get(IRule.class, config, name);
    		}
            // 如果没有配置负载均衡策略,默认使用区域隔离策略
    		ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
    		rule.initWithNiwsConfig(config);
    		return rule;
    	}
        
        // 注册 ServerList,但此时还是个空集合
        @Bean
    	@ConditionalOnMissingBean
    	@SuppressWarnings("unchecked")
    	public ServerList<Server> ribbonServerList(IClientConfig config) {
    		if (this.propertiesFactory.isSet(ServerList.class, name)) {
    			return this.propertiesFactory.get(ServerList.class, config, name);
    		}
    		ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
    		serverList.initWithNiwsConfig(config);
    		return serverList;
    	}
        
        // 注册 ILoadBalancer
        @Bean
    	@ConditionalOnMissingBean
    	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
    			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
    			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
    			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    		}
            // 将容器中的 ServerList 对象注入到 ILoadBalancer中
    		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
    				serverListFilter, serverListUpdater);
    	}
      
    }
    
    • 查看 ZoneAwareLoadBalancer 的构造器,找到 DynamicServerListLoadBalancer#restOfInit

      void restOfInit(IClientConfig clientConfig) {
          boolean primeConnection = this.isEnablePrimingConnections();
          this.setEnablePrimingConnections(false);
          // 开启延时定时任务,获取新服务实例,然后更新到 Ribbon 本地缓存
          enableAndInitLearnNewServersFeature();
          // 立即获取服务实例(因为上面延时没有立马执行,这个也是上面定时任务最终要执行的逻辑)
          updateListOfServers();
          if (primeConnection && this.getPrimeConnections() != null) {
              this.getPrimeConnections()
                  .primeConnections(getReachableServers());
          }
          this.setEnablePrimingConnections(primeConnection);
          LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
      }
      
2.2.3.3 轮询策略源码分析
  • 查看 RoundRobinRule 的 choose 方法

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }
    
        Server server = null;
        int count = 0;
        // 最多获取服务实例 10 次
        while (server == null && count++ < 10) {
            // 所有可用服务实例列表
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();
    
            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }
    
            // 获取下一个服务实例的轮询索引
            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);
    
            if (server == null) {
                /* Transient. */
                // 服务实例为空,可能是没有注册到,放弃当前线程时间片,等一下再拿
                Thread.yield();
                continue;
            }
    
            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }
    
            // Next.
            server = null;
        }
    
        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                     + lb);
        }
        return server;
    }
    
2.2.3.4 随机策略源码分析
  • 查看 RandomRule 的 choose方法,与 RoundRobinRule 相比有两点不同

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;
    
        // 1.一直while获取服务实例
        while (server == null) {
            if (Thread.interrupted()) {
                return null;
            }
            List<Server> upList = lb.getReachableServers();
            List<Server> allList = lb.getAllServers();
    
            int serverCount = allList.size();
            if (serverCount == 0) {
                return null;
            }
    
            // 2.随机获取下一个服务实例的索引
            int index = chooseRandomInt(serverCount);
            server = upList.get(index);
    
            if (server == null) {
                Thread.yield();
                continue;
            }
    
            if (server.isAlive()) {
                return (server);
            }
    
            // Shouldn't actually happen.. but must be transient or a bug.
            server = null;
            Thread.yield();
        }
    
        return server;
    }
    
    protected int chooseRandomInt(int serverCount) {
        return ThreadLocalRandom.current().nextInt(serverCount);
    }
    

2.3 熔断器 Hystrix

2.3.1 微服务雪崩效应

  • 现象:微服务调用链路中,下游服务响应时间过长,大量请求阻塞,大量线程无法释放,导致服务器资源耗尽,引起系统整体崩溃
  • 解决:
    • 服务熔断:下游服务不可用或响应时间过长,快速返回错误信息
    • 服务降级:服务熔断后,调用者本地 fallback 返回一个预留的值(也叫兜底数据)
    • 服务限流:有些场景不能服务降级,如秒杀
      • 限制总并发数,如数据库连接池、线程池
      • 限制瞬时并发数,如 nginx 限制瞬时并发连接数
      • 限制时间窗口内的平均速率,如 Guava 的 RateLimiter,nginx 的 limite_req 模块
      • 限制远程调用接口速率

2.3.2 Hystrix 介绍

  • Hystrx 可以隔离访问远程系统、服务或者第三方库,防止级联失败,从而提升系统的可用性与容错性。Hystrix 通过以下几点实现延迟和容错:
    • 包裹请求:使用 @HystrixCommand 包裹对依赖的调用逻辑
    • 跳闸机制:当某服务的错误率超过阈值时,Hystrix 跳闸,一段时间内停止请求该服务
    • 资源隔离:Hystrix 为每个依赖都维护了一个小型线程池(舱壁模式)
    • 监控:Hystrix 可以通过 Hystrix Dashboard 和 Hystrix Turbine 近实时监控运行指标和配置的变化
    • 回退机制:当请求失败、超时、被拒或者断路器打开时,执行回退逻辑,也就是服务降级
    • 自我修复:断路器打开一段时间后,会自动进入“半开”状态,尝试请求服务
  • 使用的注解
    • @EnableCircuitBreaker,开启熔断器,可以使用 @EnableHystrix
    • @HystrixCommand,贴在controller的方法上可以包裹请求,使用 Hystrix 的熔断与降级功能

2.3.3 @HystrixCommand 使用

@HystrixCommand(
    /******************服务降级,自定义本地方法******************/
    fallbackMethod = "myFallBack",
    
    /******************线程池隔离策略,舱壁模式******************/
    // 线程池标识,每个方法都要保持唯一,不唯一的话就共用了
    threadPoolKey = "xxx",
    // 线程池细节属性配置
    threadPoolProperties = {
        @HystrixProperty(name="coreSize",value = "2"), // 线程数
        @HystrixProperty(name="maxQueueSize",value="20") // 等待队列长度
    },
    
	/******************服务熔断配置******************/
    commandProperties = {
        // 超时熔断
 		@HystrixProperty(
        	name="execution.isolation.thread.timeoutInMilliseconds",value="1000"),
        
        // hystrix 高级配置,定制工作流程
        // 统计时间窗口定义
        @HystrixProperty(
            name = "metrics.rollingStats.timeInMilliseconds",value = "10000"),
        // 统计时间窗口内的最小请求数
        @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "20"),
        // 统计时间窗口内的错误数量百分比阈值
        @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",value = "50"),
        // 自我修复时的活动窗口长度
        @HystrixProperty(
            name = "circuitBreaker.sleepWindowInMilliseconds",value = "5000")
    }
)

2.3.4 Hystrix 工作流程

image-20210218232551738

  1. 当调用出现问题时,开启一个时间窗口(默认10s)
  2. 在时间窗口内,统计调用次数是否达到最小请求数
    1. 如果没有达到,重置请求数,回到第一步
    2. 如果达到了,走第三步
  3. 在时间窗口内,统计失败的调用次数占所有请求数的百分比是否达到阈值
    1. 如果没有达到,重置百分比,回到第一步
    2. 如果达到了,走第四部
  4. 跳闸,hystrix 状态变为 CIRCUIT_OPEN,停止调用远程服务,并且开启一个活动窗口(默认 5s),每隔 5s,Hystrix 会让一个请求尝试调用问题服务
    1. 如果调用失败,回调第三步
    2. 如果调用成功,重置断路器状态为 UP,回到第一步

2.3.5 Hystrix 监控

2.3.5.1 Hystrix Dashboard 断路监控仪表盘
  1. 新建监控工程,引入 netflix-hystrix-dashboard 依赖,在启动类上面贴上 @EnableHystrixDashboard
  2. 在被监控的工程中注册监控 servlet,提供监控数据给仪表盘
  3. 上述两个工程启动后,访问监控工程 http://dashboard-ip:port/hystrix,进入仪表盘
  4. 输入被监控的微服务端点地址 http://service-ip:port ,展示详细的监控数据
2.3.5.2 Hystrix Turbine 聚合监控
  • 问题:一个微服务有多个实例,通过 ip:port 端口时每次只能查看一个实例
  • 解决:使用 Hystrix Turbine 聚合监控
  • 步骤
    1. 新建聚合监控工程,引入 netflix-turbine 和 eureka-client,在启动类上面贴上 @EnableTurbine
    2. 添加 turbine 的配置,turbine.appconfig = 被监控的服务名称A,被监控的服务名称B
    3. 在 Hystrix Dashboard 的基础下,访问聚合监控工程 http://turbine-ip:port/turbine.stream 可以得到一个微服务下多个实例的监控数据
    4. 回到仪表盘,输入 http://turbine-ip:port/turbine.stream,可以展详细的聚合监控数据

2.3.6 Hystrix 源码分析

  • 入口:@EnableCircuitBreaker 激活熔断器,导入了一个 Selector

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Import(EnableCircuitBreakerImportSelector.class)
    public @interface EnableCircuitBreaker {
    }
    
  • 查看 EnableCircuitBreakerImportSelector,关注父类方法,简单来说就是加载一个配置类

    public class EnableCircuitBreakerImportSelector extends
    		SpringFactoryImportSelector<EnableCircuitBreaker> {
    
    	@Override
    	protected boolean isEnabled() {
            // 默认启用熔断器
    		return getEnvironment().getProperty(
    				"spring.cloud.circuit.breaker.enabled", Boolean.class, Boolean.TRUE);
    	}
    }
    
    // 父类
    public abstract class SpringFactoryImportSelector<T>
    		implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
    
    	private ClassLoader beanClassLoader;
    	private Class<T> annotationClass;
    	private Environment environment;
    	private final Log log = LogFactory.getLog(SpringFactoryImportSelector.class);
    
    	@SuppressWarnings("unchecked")
    	protected SpringFactoryImportSelector() {
            // 获取泛型的全限定名
            // org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker
    		this.annotationClass = (Class<T>) GenericTypeResolver
    				.resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class);
    	}
        
    	@Override
    	public String[] selectImports(AnnotationMetadata metadata) {
    		if (!isEnabled()) {
    			return new String[0];
    		}
    		AnnotationAttributes attributes = AnnotationAttributes.fromMap(
    				metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
    
    		Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
    				+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
    
    		// Find all possible auto configuration classes, filtering duplicates
            // 读取 spring.factories 文件,获取key为 annotationClass 的配置类,然后注入
    		List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
    				.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
            ...
    		}
    
    		return factories.toArray(new String[factories.size()]);
    	}
        ...
    }
    
  • 查看 netflix-hystrix 的 jar 包,找到 spring.factories,可知要加载的配置类是 HystrixCircuitBreakerConfiguration

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
    org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
    
    org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
    org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
    
  • 分析 HystrixCircuitBreakerConfiguration 配置类,注入了一个切面 bean,实现 Hystrix 的机制就是切面(代理模式)

    @Configuration
    public class HystrixCircuitBreakerConfiguration {
    
        // 注册切面
    	@Bean
    	public HystrixCommandAspect hystrixCommandAspect() {
    		return new HystrixCommandAspect();
    	}
    
  • 分析 HystrixCommandAspect,切点是注解 @HystrixCommand,做了个环绕通知增强

    @Aspect
    public class HystrixCommandAspect {
    
        private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
    
        static {
            META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                    .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                    .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                    .build();
        }
    
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    
        public void hystrixCommandAnnotationPointcut() {
        }
    
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
        public void hystrixCollapserAnnotationPointcut() {
        }
    
        @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
        public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
            // 获取原始目标方法
            Method method = getMethodFromTarget(joinPoint);
            Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
            if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
                throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                        "annotations at the same time");
            }
            MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
            // 封装元数据
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            // create 方法创建了个 GenericCommand,有两个重要的方法,run 方法封装了对原始目标方法的调用,getFallback 方法封装了对回退方法的调用
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                    metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
    
            Object result;
            try {
                if (!metaHolder.isObservable()) {
                    // 非 Observable
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = executeObservable(invokable, executionType, metaHolder);
                }
            } catch (HystrixBadRequestException e) {
                throw e.getCause();
            } catch (HystrixRuntimeException e) {
                throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
            }
            return result;
        }
    }
    
  • GenericCommand 有个父类 AbstractCommand,其构造器做了大量 init 操作,其中包括了线程池的初始化

/* package */
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, 
		HystrixThreadPoolProperties.Setter propertiesBuilder) {
    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
    String key = threadPoolKey.name();

    // this should find it for all but the first time
    // 从本地缓存里获取线程池
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // if we get here this is the first time so we need to initialize
    synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            // 没有的话再创建,底层是 new ThreadPoolExecutor()
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
        }
    }
    return threadPools.get(key);
}

2.4 远程调用 Feign

2.4.1 远程调用方式

  • 使用 RestTemplate 的问题
    • 在代码里拼接 URL 麻烦且容易出错
    • restTemplate.getForObject 代码很固定,模版化代码
  • 使用 Feign 的好处
    • 使用简单,类似 Dubbo 的本地方法调用风格,面向接口编程

2.4.2 Feign 介绍

  • Feign 是一个轻量级的 RESTFUL 的 HTTP 客户端,项目引入 openfeign 的 jar 后,相当于拥有 RestTemplate + Ribbon + Hystrix 三者的功能

    image-20210220235815912

  • Feign 对负载均衡的支持

    #针对的被调用方微服务名称,不加就是全局生效
    service-name:
      ribbon:
        #请求连接超时时间
        ConnectTimeout: 2000
        #请求处理超时时间(Feign超时时长设置)
        ReadTimeout: 3000
        #对所有操作都进行重试
        OkToRetryOnAllOperations: true
        ####根据如上配置,当访问到故障请求的时候,它会再尝试访问一次当前实例(次数由MaxAutoRetries配置),
        ####如果不行,就换一个实例进行访问,如果还不行,再换一次实例访问(更换次数由MaxAutoRetriesNextServer配置),
        ####如果依然不行,返回失败信息。
        MaxAutoRetries: 0 #对当前选中实例重试次数,不包括第一次调用
        MaxAutoRetriesNextServer: 0 #切换实例的重试次数
        NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule #负载策略调整
    
  • Feign 对熔断器的支持

    feign:
      hystrix:
        # 启用 Feign 的熔断功能
        enabled: true
    hystrix:
      command:
        default:
          execution:
            isolation:
              thread:
                ####Hystrix的超时时长设置
                # tips: 如果Feign、Hystrix都设置了超时时长,按最小值控制
                timeoutInMilliseconds: 15000
    
  • Feign 对日志的支持

    logging:
      level:
        # Feign日志只会对日志级别为debug的做出响应
        com.lagou.edu.controller.service.ResumeServiceFeignClient: debug
    
  • Feign 对请求压缩和响应压缩的支持

    feign:
      compression:
        request:
          # 开启请求压缩
          enabled: true
          # 设置压缩的数据类型,此处也是默认值
          mime-types: text/html,application/xml,application/json
          # 设置触发压缩的⼤⼩下限,此处也是默认值
          min-request-size: 2048
        response:
          # 开启响应压缩
          enabled: true
    

2.4.3 Feign 源码分析

  • 入口:@EnableFeignClients,导入了一个 Registrar

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    @Documented
    @Import(FeignClientsRegistrar.class)
    public @interface EnableFeignClients {
        ...
    }
    
  • 查看 FeignClientsRegistrar,实现了 ImportBeanDefinitionRegistrar 接口,重写 registerBeanDefinitions 方法,完成 Bean 的注入

    class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar,  ResourceLoaderAware, EnvironmentAware {
        @Override
    	public void registerBeanDefinitions(AnnotationMetadata metadata,
    			BeanDefinitionRegistry registry) {
            // 将全局默认配置注入到容器(@EnableFeignClients 的属性 defaultConfiguration)
    		registerDefaultConfiguration(metadata, registry);
            // 将 Feign 客户端注入到容器
    		registerFeignClients(metadata, registry);
    	}
    }
    
  • 分析 registerFeignClients 方法,其作用是扫描 @EnableFeignClients 包下及子包下所有贴了 @FeignClient 的接口,然后再注册 Feign 客户端

    public void registerFeignClients(AnnotationMetadata metadata,
                                     BeanDefinitionRegistry registry) {
        // 定义扫描器,扫描 @FeignClient
        ClassPathScanningCandidateComponentProvider scanner = getScanner();
        scanner.setResourceLoader(this.resourceLoader);
    
        Set<String> basePackages;
    
        Map<String, Object> attrs = metadata
            .getAnnotationAttributes(EnableFeignClients.class.getName());
        AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(
            FeignClient.class);
        final Class<?>[] clients = attrs == null ? null
            : (Class<?>[]) attrs.get("clients");
        if (clients == null || clients.length == 0) {
            scanner.addIncludeFilter(annotationTypeFilter);
            // 获取 @EnableFeignClients 的包名
            basePackages = getBasePackages(metadata);
        }
        else {
            final Set<String> clientClasses = new HashSet<>();
            basePackages = new HashSet<>();
            for (Class<?> clazz : clients) {
                basePackages.add(ClassUtils.getPackageName(clazz));
                clientClasses.add(clazz.getCanonicalName());
            }
            AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() {
                @Override
                protected boolean match(ClassMetadata metadata) {
                    String cleaned = metadata.getClassName().replaceAll("\\$", ".");
                    return clientClasses.contains(cleaned);
                }
            };
            scanner.addIncludeFilter(
                new AllTypeFilter(Arrays.asList(filter, annotationTypeFilter)));
        }
    
        // 递归扫描
        for (String basePackage : basePackages) {
            Set<BeanDefinition> candidateComponents = scanner
                .findCandidateComponents(basePackage);
            for (BeanDefinition candidateComponent : candidateComponents) {
                if (candidateComponent instanceof AnnotatedBeanDefinition) {
                    // verify annotated class is an interface
                    AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
                    AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
                    Assert.isTrue(annotationMetadata.isInterface(),
                                  "@FeignClient can only be specified on an interface");
    
                    Map<String, Object> attributes = annotationMetadata
                        .getAnnotationAttributes(
                        FeignClient.class.getCanonicalName());
    
                    String name = getClientName(attributes);
                    registerClientConfiguration(registry, name,
                                                attributes.get("configuration"));
    
                    // 注册 Feign 客户端
                    registerFeignClient(registry, annotationMetadata, attributes);
                }
            }
        }
    }
    
  • 分析 registerFeignClient 方法,给每一个客户端生成代理对象,这是实现 Feign 的机制

    private void registerFeignClient(BeanDefinitionRegistry registry,
                                     AnnotationMetadata annotationMetadata, 
                                     Map<String, Object> attributes) {
        String className = annotationMetadata.getClassName();
        // 关注 FeignClientFactoryBean,里面生成了代理对象
        BeanDefinitionBuilder definition = BeanDefinitionBuilder
            .genericBeanDefinition(FeignClientFactoryBean.class);
        ...
    }
    
  • 查看 org.springframework.cloud.openfeign.FeignClientFactoryBean#getObject

    @Override
    public Object getObject() throws Exception {
        return getTarget();
    }
    
    /**
     * @param <T> the target type of the Feign client
     * @return a {@link Feign} client created with the specified data and the context information
     */
    <T> T getTarget() {
        FeignContext context = applicationContext.getBean(FeignContext.class);
        Feign.Builder builder = feign(context);
    
        // 如果 @FeignClient 未指定URL,生成的 Feign 客户端应该是具备负载均衡的
        // 如果 @FeignClient 指定了URL,适合开发测试时使用
        if (!StringUtils.hasText(this.url)) {
            if (!this.name.startsWith("http")) {
                url = "http://" + this.name;
            }
            else {
                url = this.name;
            }
            url += cleanPath();
            // 走负载均衡
            return (T) loadBalance(builder, context, 
                                   new HardCodedTarget<>(this.type, this.name, url));
        }
        ...
    }
    
    // org.springframework.cloud.openfeign.FeignClientFactoryBean#loadBalance
    protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
                                HardCodedTarget<T> target) {
        // LoadBalancerFeignClient
        Client client = getOptional(context, Client.class);
        if (client != null) {
            // 使用 builder 构造器包装 client
            builder.client(client);
            Targeter targeter = get(context, Targeter.class);
            // 执行
            return targeter.target(this, builder, context, target);
        }
    
        throw new IllegalStateException(
            "No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?");
    }
    
    // targeter.target =》feign.target =》build().newInstance
    // feign.ReflectiveFeign#newInstance,动态代理
    @Override
    public <T> T newInstance(Target<T> target) {
        // 方法对应的增强
        Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
        Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
        List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
    
        for (Method method : target.type().getMethods()) {
            if (method.getDeclaringClass() == Object.class) {
                continue;
            } else if (Util.isDefault(method)) {
                DefaultMethodHandler handler = new DefaultMethodHandler(method);
                defaultMethodHandlers.add(handler);
                methodToHandler.put(method, handler);
            } else {
                methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
            }
        }
        InvocationHandler handler = factory.create(target, methodToHandler);
        // 动态代理
        T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
                                            new Class<?>[] {target.type()}, handler);
    
        for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
            defaultMethodHandler.bindTo(proxy);
        }
        return proxy;
    }
    
  • 分析方法对应的增加,feign.SynchronousMethodHandler#invoke

    @Override
    public Object invoke(Object[] argv) throws Throwable {
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        Retryer retryer = this.retryer.clone();
        while (true) {
            try {
                // 执行
                return executeAndDecode(template);
            } catch (RetryableException e) {
                ...
                continue;
            }
        }
    }
    
    // feign.SynchronousMethodHandler#executeAndDecode
    Object executeAndDecode(RequestTemplate template) throws Throwable {
        Request request = targetRequest(template);
    
        if (logLevel != Logger.Level.NONE) {
            logger.logRequest(metadata.configKey(), logLevel, request);
        }
    
        Response response;
        long start = System.nanoTime();
        try {
            // 执行请求,内部就是 Ribbon 选择 server 的过程
            response = client.execute(request, options);
        } catch (IOException e) {
            if (logLevel != Logger.Level.NONE) {
                logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
            }
            throw errorExecuting(request, e);
        }
        ...
    }
    

2.5 网关 Gateway

  • 网关在架构中的位置,接收外部请求并路由到内部服务

    image-20210312101447578

  • Gateway 只是网关中的一种实现,Gateway 基于 Reactor 模型实现,支持异步非阻塞,有三个核心的概念

    • 路由 route:由 ID + URI + predicates + filters 组成,从配置文件看也能体现这一点
    • 断言 predicate:判断条件
    • 过滤器 filter
    image-20210312114942933
  • Gateway 工作过程,核心逻辑是路由转发 + 执行过滤器链

    Spring Cloud Gateway Diagram

  • Gateway 内置了 11 种路由规则,常用的规则有请求路径正则匹配和远程地址匹配

    image-20210312155148990

  • Gateway 过滤器

    • 根据影响时机点划分
      • pre
      • post
    • 根据过滤器类型划分
      • Gateway Filter 单路由过滤器
      • Global Filter 全局过滤器
  • Gateway 需要保证高可用,通过部署多个 Gateway 和用 Nginx 做负载均衡来实现

    # 配置多个GateWay实例 
    upstream gateway {
      server 127.0.0.1:9002;
      server 127.0.0.1:9003;
    }
    location / {
      proxy_pass http://gateway;
    }
    
  • Gateway 配置示例

    spring:
      cloud:
        gateway:
        	# 可以配置多个路由
          routes:
            # 路由 id,需要保持唯一
            - id: path_route
            	# 目标服务地址,如果是动态路由,则配置 lb://服务名,从注册中心取具体的服务地址
              uri: https://example.org
              # 断言
              predicates:
                - Path=/red/{segment},/blue/{segment}
            	# 过滤器
            	filters:
                # 移除 url 的第一段字符串
                - StripPrefix=1
    

2.6 分布式配置中心 Spring Cloud Config

  • Config 结构示意图

    • 服务端:访问外部 Git 获取配置文件,并以接口的形式将配置文件的内容提供给客户端
    • 客户端:通过接口获取配置文件的内容,然后初始化自己的应用

    image-20210315173558648

  • 构建步骤

    • 在 git 上面创建新仓库 hello-config-repo(与应用名保持一致就好)

    • 上传 yml 配置文件,命名规范为 {application}-{profile}.yml 或 {application}-{profile}.properties

    • 构建 config server,使用注解 @EnableConfigServer 开启配置中心服务器功能,还需要增加以下核心配置

      spring:
        cloud:
          config:
            server:
              # 访问 git 需要的连接参数
              git:
                uri: https://github.com/xx/hello-config-repo.git
                username: 用户名
                password: 密码
                search-paths:
                  - 仓库名
            # 读取分支
            label: master
      
    • 启动服务配置中心,可以通过 http://localhost:port/master/{application}-{profile}.yml 查看配置文件内容

    • 构建 config client

      spring:
        cloud:
          # config客户端配置,和ConfigServer通信,并告知ConfigServer希望获取的配 置信息在哪个文件中
          config:
            name: application #配置文件名称
            profile: dev #后缀名称
            label: master #分支名称
            uri: http://localhost:port #ConfigServer配置中心地址
      
    • 手动刷新 client 的配置,可以避免服务重启

      • 引入 actuator,并暴露通信端点

        management:
          endpoints:
            web:
              exposure:
                include: refresh
        
        • 在配置类贴上 @RefreshScope
        • 手动向Client客户端发起POST请求,http://localhost:port/actuator/refresh
          • 缺点:其他客户端也需要访问一次才会更新配置
    • 自动刷新 client 的配置,使用消息总线 Spring Cloud Bus 发布配置信息变更的广播消息

      • 示意图

        image-20210315191352424

      • 构建步骤

        • Config Server 引入 bus-amqp 依赖,增加消息总线支持

          spring:
            rabbitmq:
              host: 127.0.0.1
              port: 5672
              username: guest
              password: guest
          
          # 暴露通信端点
          management:
            endpoints:
              web:
                exposure:
                  include: bus-refresh
          
        • 向配置中心服务端发送post请求 http://localhost:port/actuator/bus-refresh,各个客户端配置即可自动刷新

        • 如果希望定向某个实例,可以访问 http://localhost:9006/actuator/bus-refresh/服务名:端口

2.7 消息驱动 Spring Cloud Stream

  • Stream的本质

    屏蔽底层不同 MQ 消息中间件之间的差异,统一 MQ 的编程模型,降低学习、开发和维护的成本。

  • 编程模型

    image-20210318224139195

    Binder 绑定器用于绑定具体的 MQ 实现,目前支持 Rabbit MQ 和 Kafka

  • 编程使用的注解

    • @Input,标识输入通道,消费者通过输入通道获取消息
    • @Output,标识输出通道,生产者将消息放进输出通道,传递给下游
    • @StreamListener,监听队列,消费者监听消息的到来
    • @EnableBinding,把 channel 和 Exchange(对 Rabbit MQ 而言)绑定在一起
  • 生产者的配置

    spring:
      cloud:
        stream:
          binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
            binderName: # 给Binder定义的名称,用于后面的关联
              type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
              environment: # MQ环境配置(用户名、密码等)
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 关联整合通道和binder对象
            output: # output是我们定义的通道名称,此处不能乱改
              destination: exchangeName # 要使用的Exchange名称(消息队列主题名称)
              content-type: text/plain # application/json # 消息类型设置,比如json
              binder: binderName # 关联MQ服务
    
  • 消费者的配置

    spring:
      cloud:
        stream:
          binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
            binderName: # 给Binder定义的名称,用于后面的关联
              type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
              environment: # MQ环境配置(用户名、密码等)
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 关联整合通道和binder对象
            input: # output是我们定义的通道名称,此处不能乱改
              destination: exchangeName # 要使用的Exchange名称(消息队列主题名称)
              content-type: text/plain # application/json # 消息类型设置,比如json
              binder: binderName # 关联MQ服务
    
  • 自定义消息通道,支持多个输入通道、输出通道,步骤如下

    • 定义接口

      interface CustomChannel {
          String INPUT_LOG = "inputLog";
          String OUTPUT_LOG = "outputLog";
        
          @Input(INPUT_LOG)
          SubscribableChannel inputLog();
          @Output(OUTPUT_LOG)
          MessageChannel outputLog();
      }
      
    • 使用

      • 在使用 @EnableBinding 注解时,传入上述定义的接口

      • 在配置文件中绑定

        bindings:
          inputLog:
            destination: exchangeName1
          outputLog:
            destination: exchangeName2
        
  • 消息分组,避免重复消费

    spring:
      cloud:
        stream:
          bindings:
            input:
              group: groupName # 分组名称,同一个组的消费者,只有一个消费者能处理消息
    

3.常见问题

3.1 Eureka 服务发现慢

  • 原因

    image-20210318231532269

    • 服务端存在缓存

      一级缓存默认 30 s 从二级缓存同步一次,而二级缓存的过期时间是 180 s,客户端获取服务实例时,先从一级缓存获取,如果没有再从二级缓存获取,如果还是没有,触发缓存的加载,从存储层拉取数据到缓存中,再返回给客户端

    • 客户端存在缓存

      Eureka Client 从 Eureka Server 拉取服务实例列表,存在 30 的缓存;

      Ribbon 从 Eureka Client 获取服务实例,也存在 30 s 的缓存

  • 处理

    • 对服务端而言,可以:
      • 缩短只读缓存更新时间 eureka.server.response-cache-update-interval-ms
      • 关闭只读缓存 eureka.server.use-read-only- response-cache
      • 缩短心跳间隔时间 eureka.server.eviction-interval-timer-in-ms
    • 对客户端而言,可以:
      • 缩短拉取服务信息的时间 eureka.client.registryFetchIntervalSeconds
      • 缩短缓存服务信息的时间 ribbon.serverListRefreshInterval,可以设置为 3 s

3.2 Spring Cloud 各组件超时

  • Ribbon 超时

    • ribbon.ReadTimeout
    • ribbon.ConnectTimeout
  • Hystrix 超时

    • hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds

      需要注意的是,Hystrix 的超时时间设置要大于 Ribbon 的,而且如果 Ribbon 开启了重试机制,Hystrix 也要将这部时间计算在内,否则会出现 Ribbon 还在重试中而 Hystrix 发生超时

  • Feign 超时

    • 如果 Feign 和 Ribbon 都设置了超时,优先以 Ribbon 的为准,因此超时设置推荐只配置 Ribbon 和 Hystrix
评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

火车站卖橘子

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值