1. SpringCloud 综述
Spring Cloud 是一套微服务规范
1.1 解决的问题
- 微服务注册与发现
- 网络问题(熔断场景)
- 统一认证安全授权
- 负载均衡
- 链路追踪等
1.2 架构
1.2.1 核心组件
Spring Cloud 生态圈的组件按照发展可以分为两代
组件 | SCN(第一代) | SCA(第二代) |
---|---|---|
注册中心 | Netflix Eureka | 阿里 Nacos |
客户端负载均衡 | Netflix Ribbon | 阿里 Dubbo LB、 Spring Cloud LoadBalancer |
熔断器/断路器 | Netflix Hustrix | 阿里 Sentinel |
网关 | Netflix Zuul(性能一般) | Spring Cloud GateWay |
配置中心 | Spring Cloud Config | 阿里 Nacos、携程 Apollo |
服务协调 | Netflix Feign | 阿里 Dubbo RPC |
消息驱动 | Spring Cloud Stream | |
链路追踪 | Spring Cloud Sleuth / Zipkin | |
分布式事务 | 阿里 Seata |
1.2.2 体系结构
Spring Cloud 各组件协同工作才能够支持一个完整的微服务架构,比如
- 注册中心负责服务的注册与发现,将各服务连接起来
- API 网关负责转发所有外来的请求
- 断路器负责监控服务之间的情况,乱系多次失败进行熔断保护
- 配置中心提供了统一的配置信息管理服务,可以实时地通知各个服务获取最新的配置信息
1.3 Spring Cloud 与 Dubbo 对比
- 通信协议不同,Spring Cloud 基于应用层协议 HTTP,Dubbo 基于传输层协议 TCP / IP,Dubbo的效率高
- Spring Cloud 体系全,可以提供一站式解决方案
1.4 Spring Cloud 与 Spring Boot 对比
- Spring Cloud 只是利用了 Spring Boot 快速部署、自动装配的特点
2. 第一代 Spring Cloud 核心组件
2.1 服务注册中心 Eureka
2.1.1 服务注册中心的本质
- 解耦服务提供者和服务消费者,服务消费者不需要知道服务具体位置,透明化路由
- 在分布式环境下支持弹性扩容和缩容
2.1.2 架构
-
服务注册中心一般工作原理
-
Eureka 基础架构
-
Eureka 集群交互流程及原理
Eureka 通过心跳检测、健康检查和客户端缓存等机制,提高系统的灵活性、可伸缩性和可用性
- 微服务启动后,会周期性地向 Eureka Server 发送心跳,默认30s
- Eureka Server 在一定时间没有接收到某个微服务节点的心跳,会注销该微服务节点,默认90s
- Eureka Client 会缓存 Eureka Server 的信息,即使所有的 Server 节点都宕机,Client 也能通过缓存找到服务提供者
2.1.3 Eureka 细节
2.1.3.1 元数据
@Autowired
private DiscoveryClient discoveryClient;
// 1、从 Eureka Server中获取serviceId服务的实例信息(使用客户端对象做这件事)
List<ServiceInstance> instances = discoveryClient.getInstances("serviceId");
// 2、如果有多个实例,选择一个使用(负载均衡的过程)
ServiceInstance serviceInstance = instances.get(0);
- 标准元数据,如主机名、IP 地址、端口号等,
- 自定义元数据,可以通过
eureka.instance.metadata-map
配置,存储格式为key=value
2.1.3.2 客户端
-
服务注册
项目引入
eureka-client
依赖和配置了eureka.client.service-url.defaultZone
之后,服务启动时会向注册中心发起注册请求,请求携带了服务元数据信息,Eureka 注册中心将信息保存在 Map 中。 -
服务续约
- 服务每隔 30 秒会向注册中心续约(心跳)一次,也叫续活,可以修改
eureka.instance.lease-renewal-interval-in-seconds
默认值 - 如果没有续约,租约会在 90 秒后到期,服务从列表中被移除,可以修改
eureka.instance.lease-expiration-duration-in-seconds
默认值
- 服务每隔 30 秒会向注册中心续约(心跳)一次,也叫续活,可以修改
-
获取服务列表
服务每隔 30 秒会从注册中心拉取一份服务列表缓存到本地,可以修改
eureka.clinet.registry-fetch-interval-seconds
默认值
2.1.3.3 服务端
-
服务下线
服务正常关闭操作时,会发送服务下线的 REST 请求给 Eureka Server,注册中心接收到请求后将该服务置为下线状态
-
失效剔除
Eureka Server 每隔 60 秒检查,如果没有收到心跳,会注销该实例,可以修改
eureka.instance.lease-expiration-duration-in-seconds
默认值 -
自我保护
15 分钟之内超过 85% 的客户端节点都没有正常的心跳,进入自我保护模式
2.1.4 Eureka 源码分析
2.1.4.1 Eureka Server 源码
2.1.4.1.1 Eureka Server 启动过程
最核心的任务:从其他节点拷贝注册信息到自己的注册表
-
如果要将一个服务标注为注册中心,需要使用注解
@EnableEurekaServer
,查看该注解,里面导入了EurekaServerMarkerConfiguration
配置类,其作用是产生一个类型为Marker
的bean,进而激活EurekaServerAutoConfiguration
这个自动配置类/** * Responsible for adding in a marker bean to activate * {@link EurekaServerAutoConfiguration} * * @author Biju Kunjummen */ @Configuration public class EurekaServerMarkerConfiguration { @Bean public Marker eurekaServerMarkerBean() { return new Marker(); } class Marker { } }
-
在 eureka-server 的 jar 包,也可以看到自动装配的配置文件 spring.factories,找到
EurekaServerAutoConfiguration
自动配置类 -
查看
EurekaServerAutoConfiguration
自动配置类,里面做了许多事,如提供仪表盘服务、对等节点信息更新@Configuration // 导入初始化配置类 @Import(EurekaServerInitializerConfiguration.class) // 当 Marker 这个 Bean 存在时才加载 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter { // 仪表盘,对外提供服务的接口 @Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); } // 对等节点感知实例注册器(集群环境下注册服务使用到的注册器) @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry( this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } // 封装对等节点相关的信息和操作,如更新集群中的对等节点 @Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) { return new RefreshablePeerEurekaNodes( registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager); } }
-
对外提供 RESTFUL 服务:
EurekaServerAutoConfiguration
配置类注册了一个Jersey
过滤器,用于发布 RESTFUL 服务(例如仪表盘),类似 Spring MVC@Bean public FilterRegistrationBean jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns( Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*")); return bean; }
-
对等节点信息更新:查看对等节点
PeerEurekaNodes
的start
方法,内部创建了线程池,用于定时更新节点信息,并且通过调用关系得到,start 方法是在DefaultEurekaServerContext
对象创建之后调用的// com.netflix.eureka.cluster.PeerEurekaNodes#start public void start() { // 定时任务线程池 taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { // 更新对等节点信息 updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; // 定时更新对等节点信息 taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } } // com.netflix.eureka.DefaultEurekaServerContext#initialize,使用了@PostConstruct @PostConstruct @Override public void initialize() { logger.info("Initializing ..."); peerEurekaNodes.start(); try { registry.init(peerEurekaNodes); } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized"); }
-
-
查看
EurekaServerInitializerConfiguration
配置类的start
方法,里面创建了一个线程用于初始化 EurekaServerContextpublic void start() { new Thread(new Runnable() { @Override public void run() { try { // 重点关注:初始化EurekaServerContext细节 eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); // 发布事件 publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); // 设置状态属性 EurekaServerInitializerConfiguration.this.running = true; // 发布事件 publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); }
-
org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized
public void contextInitialized(ServletContext context) { try { // 初始化环境 initEurekaEnvironment(); // 重点关注:初始化上下文 initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
-
org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#initEurekaServerContext
protected void initEurekaServerContext() throws Exception { // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } // Holder 类为非 IOC 容器提供获取 serverContext 对象的接口 EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // server 启动时从其他节点拷贝注册信息 int registryCount = this.registry.syncUp(); // 设置实例状态为 UP,并对外提供服务 this.registry.openForTraffic(this.applicationInfoManager, registryCount); // 注册统计器 EurekaMonitors.registerAllStats(); }
-
查看
syncUp
方法,其作用是从已有节点列表中获取注册信息并填充到新的节点public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; // 可能网络原因而没有连上server,做重试 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { // 注册:把远程节点的注册信息注册到自己的注册表中 // 注册表ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
-
查看
openForTraffic
方法,其作用是设置实例状态为 UP,同时开启 Timer 定时任务剔除失效服务// 集群模式的实现类com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); // 设置实例状态为 UP applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 通过 Timer 开启定时任务剔除失效服务 super.postInit(); } // super protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } // 任务逻辑: EvictionTask evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
-
2.1.4.1.2 Eureka Server 服务接口暴露策略
-
Eureka Server 启动时注册了 Jersey 过滤器,方法入参需要依赖注入 Application
/** * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources * required by the Eureka server. */ @Bean public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider( false, environment); // Filter to include only classes that have a particular annotation. // 找到贴了Path注解的类,类似@RequestMapping provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class)); // Find classes in Eureka packages (or subpackages) // 扫描指定包及子包下的注解,类似@ComponentScan // EUREKA_PACKAGES:"com.netflix.discovery", "com.netflix.eureka" Set<Class<?>> classes = new HashSet<>(); for (String basePackage : EUREKA_PACKAGES) { Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage); for (BeanDefinition bd : beans) { Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader()); classes.add(cls); } } // Construct the Jersey ResourceConfig // Map<String, Object> propsAndFeatures = new HashMap<>(); propsAndFeatures.put( // Skip static content used by the webapp ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX, EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*"); DefaultResourceConfig rc = new DefaultResourceConfig(classes); rc.setPropertiesAndFeatures(propsAndFeatures); return rc; }
-
对外提供的接口服务,在 Jersey 中称为资源,resources 包下就是 Eureka Server 使用 Jersey 发布的 RESTFUL 风格服务接口
2.1.4.1.3 Eureka Server服务注册接口
-
查看 com.netflix.eureka.resources.ApplicationResource#addInstance,这是接收客户端注册服务的接口
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // 参数校验 if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } // 注册 registry.register(info, "true".equals(isReplication)); // 完成注册返回204(留意 Eureka Client 注册成功之后会打印一个204) return Response.status(204).build(); // 204 to be backwards compatible }
-
查看 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
@Override public void register(final InstanceInfo info, final boolean isReplication) { // 服务时效间隔,默认90s int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { // 如果客户端自己设置了,以客户端配置为准 leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 调用父类的 register 方法注册实例(把实例信息存储到注册表) super.register(info, leaseDuration, isReplication); // 将注册到当前 server 的实例信息同步到其他 peer 节点 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
-
查看实例注册过程 com.netflix.eureka.registry.AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { // 读锁 read.lock(); // 从 registry 获取 appName(即spring.application.name)的所有实例信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); // appName 的实例信息为空,那么新建一个 map if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } // 获取实例的 Lease 租约信息 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); if (existingLease != null && (existingLease.getHolder() != null)) { // 如果有租约,则保留最后一个时间戳而不覆盖它 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); ... if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { // log registrant = existingLease.getHolder(); } } else { // 如果不存在租约,说明是新实例注册 synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; // 更新续约阈值(85%) updateRenewsPerMinThreshold(); } } // log } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } // 将实例信息放到注册表中 gMap.put(registrant.getId(), lease); // 同步维护最近注册队列 synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // 如果当前实例已经维护了 OverriddenStatus,将其也放到此 Eureka Server 的 overriddenInstanceStatusMap 中 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // 根据 overridden status 规则设置状态 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // 如果续约以 UP 状态注册,将当前时间戳设置为租赁服务时间戳 if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); // 更新最后更新时间 registrant.setLastUpdatedTimestamp(); // 使当前应用的 ResponseCache 失效 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { // 解锁 read.unlock(); } }
-
查看实例同步过程 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { // 计时器 Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // 如果没有集群或者已经复制了,结束复制 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { // 跳过自己的 url continue; } // 复制实例操作到节点,根据不同的 Action 来响应,如下线、心跳续约、注册等 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
2.1.4.1.4 Eureka Server 服务续约接口
-
查看 com.netflix.eureka.resources.InstanceResource#renewLease,这是接收客户端续约的接口
@PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); // 续约 boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a register if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } ... }
-
查看 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
public boolean renew(final String appName, final String id, final boolean isReplication) { // 调用父类的续约方法,内部其实是调用leaseToRenew.renew(),更新 lastUpdateTimestamp if (super.renew(appName, id, isReplication)) { // 续约成功后,将心跳续约操作同步到其他节点 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; }
2.1.4.2 Eureka Client 端源码
2.1.4.2.1 Eureka Client 启动过程主干
- 读取配置
- 启动时从 Eureka Server 获取服务实例信息
- 将自己注册到 Eureka Server(addInstance)
- 开启一些定时任务,如心跳续约,刷新本地服务缓存列表
2.1.4.2.2 Eureka Client 注册服务
-
要将一个服务注册到注册中心,只需要引入 eureka-client 的 jar 包(注解
@EnableEurekaClient
是可选的),可以看到自动装配的配置文件 spring.factories,找到EurekaClientAutoConfiguration
自动配置类,里面有个EurekaDiscoveryClientConfiguration
需要关注@Configuration @EnableConfigurationProperties // EurekaClientConfig 在 server-client 的 jar 里面,只要引入了就会被自动装配,成为客户端 @ConditionalOnClass(EurekaClientConfig.class) // 如果不想作为客户端,可以设置属性 eureka.client.enabled=false @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) public class EurekaDiscoveryClientConfiguration { // 自动装配过程中,这里就注入了 Marker,所以可以不使用 @EnableEurekaClient @Bean public Marker eurekaDiscoverClientMarker() { return new Marker(); } }
-
回到主类,查看
EurekaClientAutoConfiguration
... public class EurekaClientAutoConfiguration { // 读取配置文件,封装成 bean 注入到容器 @Bean @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT) public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) { String hostname = getProperty("eureka.instance.hostname"); boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address")); String ipAddress = getProperty("eureka.instance.ip-address"); ... } // 实例化客户端 @Bean(destroyMethod = "shutdown") // bean 销毁时调用的方法 @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } }
-
查看实例化客户端的过程
// org.springframework.cloud.netflix.eureka.CloudEurekaClient public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) { // 调用父类构造函数 super(applicationInfoManager, config, args); this.applicationInfoManager = applicationInfoManager; this.publisher = publisher; this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport"); ReflectionUtils.makeAccessible(this.eurekaTransportField); } // com.netflix.discovery.DiscoveryClient#DiscoveryClient public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) { // 调用另一个构造器 this(applicationInfoManager, config, args, new Provider<BackupRegistry>() { // 注册失败时降级处理,不关注 }); } // com.netflix.discovery.DiscoveryClient#DiscoveryClient @Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { ... // fetchRegistry:从注册中心获取注册信息列表 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { // 获取失败,降级处理 fetchRegistryFromBackup(); } ... if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { // 将自己注册到 Eureka Server if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // 初始化定时任务,如刷新本地缓存定时任务、心跳 initScheduledTasks(); }
-
分析 com.netflix.discovery.DiscoveryClient#fetchRegistry 方法,如果本地缓存没有注册信息,则全量获取,否则更新本地缓存
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 从本地缓存中获取信息列表
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) {
...
// 全量获取(首次注册或者服务被剔除,通过 jerseyClient 通信)
getAndStoreFullRegistry();
} else {
// 更新
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
}
...
}
-
分析 com.netflix.discovery.DiscoveryClient#register
boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { // 通过 jerseyClient 发起注册请求 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } ... // 状态码是204,即注册成功 return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
-
分析 com.netflix.discovery.DiscoveryClient#initScheduledTasks
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // 刷新本地缓存定时任务, int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { ... // 心跳定时任务, scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); ... } }
2.1.4.2.3 Eureka Client 心跳续约
-
HeartbeatThread -》调用 renew 与服务端通信成功 -》更新 lastSuccessfulHeartbeatTimestamp
// com.netflix.discovery.DiscoveryClient.HeartbeatThread private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
2.1.4.2.4 Eureka Client 下线服务
-
在实例化EurekaClient时,声明了bean 销毁时调用的方法 @Bean(destroyMethod = “shutdown”),用于下线服务
@PreDestroy @Override public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); // 注销服务 unregister(); } if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); } }
-
查看 com.netflix.discovery.DiscoveryClient#unregister,底层使用jerseyClient发送下线请求
void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e); } } }
2.2 负载均衡 Ribbon
2.2.1 负载均衡介绍
- 按负载均衡算法执行的位置,分为两种
- 服务端负载均衡,如Nginx、F5,负载均衡的算法在服务端执行
- 客户端负载均衡,如 Ribbon,负载均衡的算法在客户端执行
2.2.2 负载均衡策略
-
接口 com.netflix.loadbalancer.IRule,Ribbon 内置了多种负载均衡策略
-
负载均衡策略
负载均衡策略 描述 RoundRobinRule 轮询策略 默认超过 10 次获取的 server 都可不用时会返回一个空的 server RandomRule 随机策略 如果随机到的 server 为 null 或者不可用,会 while 不停的循环获取 RetryRule 重试策略 一定时限内循环重试 BestAvailableRule 最小连接数策略 遍历 serverList,选取可用的且连接数最小的一个 server AvailabilityFiltering 可用过滤策略 扩展了轮询策略,先通过默认的轮询选取一个 server,再判断该 server 是否可用、链接数是否超限 ZoneAvoidanceRule 区域隔离策略(默认) 扩展了轮询策略,继承了两个过滤器,会过滤超时和链接数过多的server,还会过滤掉不符合要求的zone区域里所有的节点 -
修改负载均衡策略
# 指定被调用服务的负载均衡策略,如果不指定则是全局 order-service: ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
2.2.3 Ribbon 源码分析
2.2.3.1 工作原理
-
请求过程
-
客户端通过 restTemplate.getForObject 发起请求
-
请求被拦截器拦截
-
拦截器通过请求 URL 里的服务名称获取对应的服务实例列表
-
按照负载均衡算法获取选取一个服务实例
-
最终再通过 RestTemplate 调用远程服务
-
-
Ribbon 组件
-
IRule ,负载均衡策略接口
-
IPing,心跳检测接口
-
ServerListFilter,过滤服务实例列表接口
-
ServerListUpdater,更新服务实例列表接口
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jlCyvuJP-1613892039385)(C:\Users\linjc\AppData\Roaming\Typora\typora-user-images\image-20210217104055598.png)]
负载均衡管理器 LoadBalancer 是核心,相当于大脑,负责协调上述组件(躯干)
-
2.2.3.2 @LoadBalanced 源码分析
-
查看注解
@LoadBalanced
,其作用是将普通的 RestTemplate 配置为一个LoadBalancerClient
来使用/** * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient. * @author Spencer Gibb */ @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Qualifier public @interface LoadBalanced { } public interface LoadBalancerClient extends ServiceInstanceChooser { // 根据服务执行请求 <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; // 根据服务执行请求 <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; // 生成URL,serviceName:port =》 ip:port URI reconstructURI(ServiceInstance instance, URI original); }
-
查看 netflix-ribbon 的 jar 包,找到自动装配文件 spring.factories
-
分析
RibbonAutoConfiguration
看到是先加载
RibbonAutoConfiguration
,然后再加载LoadBalancerAutoConfiguration
,其中注册 SpringClientFactory bean对象时还装配了RibbonClientConfiguration
@Configuration @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class) @RibbonClients @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration") // 加载 RibbonAutoConfiguration 后,再加载 LoadBalancerAutoConfiguration @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class}) @EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class}) public class RibbonAutoConfiguration { // 注册 SpringClientFactory,其构造器内部装配了 RibbonClientConfiguration // 而 RibbonClientConfiguration 又装配了 Ribbon 的大脑和躯干,如 ILoadBalancer @Bean public SpringClientFactory springClientFactory() { SpringClientFactory factory = new SpringClientFactory(); factory.setConfigurations(this.configurations); return factory; } // 注册 LoadBalancerClient,使接下来的 LoadBalancerAutoConfiguration 能装配 @Bean @ConditionalOnMissingBean(LoadBalancerClient.class) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); } ... }
-
分析
LoadBalancerAutoConfiguration
,主要是给 RestTemplate 添加负载均衡拦截器@Configuration // 只有 RestTemplate 这个类存在时才装配 @ConditionalOnClass(RestTemplate.class) // RibbonAutoConfiguration 装配时已注册 LoadBalancerClient @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration { // 注入添加了 @LoadBalanced 注解的 RestTemplate 集合 @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); @Configuration @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { // 注册负载均衡拦截器 @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } // 注册 RestTemplate 定制器,并给拦截链添加一个负载均衡拦截器 @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } } @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { return () -> restTemplateCustomizers.ifAvailable(customizers -> { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { // 使用定制器给 RestTemplate 对象添加一个拦截器 customizer.customize(restTemplate); } } }); } }
-
查看
LoadBalancerInterceptor
的intercept
方法@Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { // 获取请求URL final URI originalUri = request.getURI(); // 从URL里获取服务名称 String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); // 由 LoadBalancerClient 执行负载均衡逻辑 return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); }
-
查看
RibbonLoadBalancerClient
的execute
方法,这里分为三步public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { // 1.获取负载均衡器(RibbonClientConfiguration 装配时注册的) ILoadBalancer loadBalancer = getLoadBalancer(serviceId); // 2.根据负载均衡器,选取一个服务实例 Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer( serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); // 3.执行,调用远程服务 return execute(serviceId, ribbonServer, request); } // 分析下getServer方法,选取服务实例的逻辑,默认是区域隔离策略,即轮询 // com.netflix.loadbalancer.AbstractServerPredicate#incrementAndGetModulo // modulo 是 服务实例列表长度 private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); // 通过模运算得到下一个服务实例的索引值 int next = (current + 1) % modulo; // 通过CAS设置下一个服务实例的索引值(避免并发场景下产生数据错乱问题) if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } } // 分析下 execute 方法 @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { ... try { // 向服务实例发起请求,底层仍然是 RestTemplate 的底层代码 T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } ... }
-
-
分析
RibbonClientConfiguration
,看看在负载均衡调用 chooseServer 时,ServerList是如何注入的@Configuration @EnableConfigurationProperties @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class}) public class RibbonClientConfiguration { @Bean @ConditionalOnMissingBean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, name)) { return this.propertiesFactory.get(IRule.class, config, name); } // 如果没有配置负载均衡策略,默认使用区域隔离策略 ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); rule.initWithNiwsConfig(config); return rule; } // 注册 ServerList,但此时还是个空集合 @Bean @ConditionalOnMissingBean @SuppressWarnings("unchecked") public ServerList<Server> ribbonServerList(IClientConfig config) { if (this.propertiesFactory.isSet(ServerList.class, name)) { return this.propertiesFactory.get(ServerList.class, config, name); } ConfigurationBasedServerList serverList = new ConfigurationBasedServerList(); serverList.initWithNiwsConfig(config); return serverList; } // 注册 ILoadBalancer @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } // 将容器中的 ServerList 对象注入到 ILoadBalancer中 return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); } }
-
查看 ZoneAwareLoadBalancer 的构造器,找到 DynamicServerListLoadBalancer#restOfInit
void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); this.setEnablePrimingConnections(false); // 开启延时定时任务,获取新服务实例,然后更新到 Ribbon 本地缓存 enableAndInitLearnNewServersFeature(); // 立即获取服务实例(因为上面延时没有立马执行,这个也是上面定时任务最终要执行的逻辑) updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); }
-
2.2.3.3 轮询策略源码分析
-
查看
RoundRobinRule
的 choose 方法public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; // 最多获取服务实例 10 次 while (server == null && count++ < 10) { // 所有可用服务实例列表 List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } // 获取下一个服务实例的轮询索引 int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ // 服务实例为空,可能是没有注册到,放弃当前线程时间片,等一下再拿 Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; }
2.2.3.4 随机策略源码分析
-
查看
RandomRule
的 choose方法,与RoundRobinRule
相比有两点不同public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; // 1.一直while获取服务实例 while (server == null) { if (Thread.interrupted()) { return null; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } // 2.随机获取下一个服务实例的索引 int index = chooseRandomInt(serverCount); server = upList.get(index); if (server == null) { Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server; } protected int chooseRandomInt(int serverCount) { return ThreadLocalRandom.current().nextInt(serverCount); }
2.3 熔断器 Hystrix
2.3.1 微服务雪崩效应
- 现象:微服务调用链路中,下游服务响应时间过长,大量请求阻塞,大量线程无法释放,导致服务器资源耗尽,引起系统整体崩溃
- 解决:
- 服务熔断:下游服务不可用或响应时间过长,快速返回错误信息
- 服务降级:服务熔断后,调用者本地 fallback 返回一个预留的值(也叫兜底数据)
- 服务限流:有些场景不能服务降级,如秒杀
- 限制总并发数,如数据库连接池、线程池
- 限制瞬时并发数,如 nginx 限制瞬时并发连接数
- 限制时间窗口内的平均速率,如 Guava 的 RateLimiter,nginx 的 limite_req 模块
- 限制远程调用接口速率
2.3.2 Hystrix 介绍
- Hystrx 可以隔离访问远程系统、服务或者第三方库,防止级联失败,从而提升系统的可用性与容错性。Hystrix 通过以下几点实现延迟和容错:
- 包裹请求:使用 @HystrixCommand 包裹对依赖的调用逻辑
- 跳闸机制:当某服务的错误率超过阈值时,Hystrix 跳闸,一段时间内停止请求该服务
- 资源隔离:Hystrix 为每个依赖都维护了一个小型线程池(舱壁模式)
- 监控:Hystrix 可以通过 Hystrix Dashboard 和 Hystrix Turbine 近实时监控运行指标和配置的变化
- 回退机制:当请求失败、超时、被拒或者断路器打开时,执行回退逻辑,也就是服务降级
- 自我修复:断路器打开一段时间后,会自动进入“半开”状态,尝试请求服务
- 使用的注解
- @EnableCircuitBreaker,开启熔断器,可以使用 @EnableHystrix
- @HystrixCommand,贴在controller的方法上可以包裹请求,使用 Hystrix 的熔断与降级功能
2.3.3 @HystrixCommand 使用
@HystrixCommand(
/******************服务降级,自定义本地方法******************/
fallbackMethod = "myFallBack",
/******************线程池隔离策略,舱壁模式******************/
// 线程池标识,每个方法都要保持唯一,不唯一的话就共用了
threadPoolKey = "xxx",
// 线程池细节属性配置
threadPoolProperties = {
@HystrixProperty(name="coreSize",value = "2"), // 线程数
@HystrixProperty(name="maxQueueSize",value="20") // 等待队列长度
},
/******************服务熔断配置******************/
commandProperties = {
// 超时熔断
@HystrixProperty(
name="execution.isolation.thread.timeoutInMilliseconds",value="1000"),
// hystrix 高级配置,定制工作流程
// 统计时间窗口定义
@HystrixProperty(
name = "metrics.rollingStats.timeInMilliseconds",value = "10000"),
// 统计时间窗口内的最小请求数
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "20"),
// 统计时间窗口内的错误数量百分比阈值
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",value = "50"),
// 自我修复时的活动窗口长度
@HystrixProperty(
name = "circuitBreaker.sleepWindowInMilliseconds",value = "5000")
}
)
2.3.4 Hystrix 工作流程
- 当调用出现问题时,开启一个时间窗口(默认10s)
- 在时间窗口内,统计调用次数是否达到最小请求数
- 如果没有达到,重置请求数,回到第一步
- 如果达到了,走第三步
- 在时间窗口内,统计失败的调用次数占所有请求数的百分比是否达到阈值
- 如果没有达到,重置百分比,回到第一步
- 如果达到了,走第四部
- 跳闸,hystrix 状态变为 CIRCUIT_OPEN,停止调用远程服务,并且开启一个活动窗口(默认 5s),每隔 5s,Hystrix 会让一个请求尝试调用问题服务
- 如果调用失败,回调第三步
- 如果调用成功,重置断路器状态为 UP,回到第一步
2.3.5 Hystrix 监控
2.3.5.1 Hystrix Dashboard 断路监控仪表盘
- 新建监控工程,引入 netflix-hystrix-dashboard 依赖,在启动类上面贴上 @EnableHystrixDashboard
- 在被监控的工程中注册监控 servlet,提供监控数据给仪表盘
- 上述两个工程启动后,访问监控工程 http://dashboard-ip:port/hystrix,进入仪表盘
- 输入被监控的微服务端点地址 http://service-ip:port ,展示详细的监控数据
2.3.5.2 Hystrix Turbine 聚合监控
- 问题:一个微服务有多个实例,通过 ip:port 端口时每次只能查看一个实例
- 解决:使用 Hystrix Turbine 聚合监控
- 步骤
- 新建聚合监控工程,引入 netflix-turbine 和 eureka-client,在启动类上面贴上 @EnableTurbine
- 添加 turbine 的配置,turbine.appconfig = 被监控的服务名称A,被监控的服务名称B
- 在 Hystrix Dashboard 的基础下,访问聚合监控工程 http://turbine-ip:port/turbine.stream 可以得到一个微服务下多个实例的监控数据
- 回到仪表盘,输入 http://turbine-ip:port/turbine.stream,可以展详细的聚合监控数据
2.3.6 Hystrix 源码分析
-
入口:@EnableCircuitBreaker 激活熔断器,导入了一个 Selector
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableCircuitBreakerImportSelector.class) public @interface EnableCircuitBreaker { }
-
查看
EnableCircuitBreakerImportSelector
,关注父类方法,简单来说就是加载一个配置类public class EnableCircuitBreakerImportSelector extends SpringFactoryImportSelector<EnableCircuitBreaker> { @Override protected boolean isEnabled() { // 默认启用熔断器 return getEnvironment().getProperty( "spring.cloud.circuit.breaker.enabled", Boolean.class, Boolean.TRUE); } } // 父类 public abstract class SpringFactoryImportSelector<T> implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware { private ClassLoader beanClassLoader; private Class<T> annotationClass; private Environment environment; private final Log log = LogFactory.getLog(SpringFactoryImportSelector.class); @SuppressWarnings("unchecked") protected SpringFactoryImportSelector() { // 获取泛型的全限定名 // org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker this.annotationClass = (Class<T>) GenericTypeResolver .resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class); } @Override public String[] selectImports(AnnotationMetadata metadata) { if (!isEnabled()) { return new String[0]; } AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(this.annotationClass.getName(), true)); Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is " + metadata.getClassName() + " annotated with @" + getSimpleName() + "?"); // Find all possible auto configuration classes, filtering duplicates // 读取 spring.factories 文件,获取key为 annotationClass 的配置类,然后注入 List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); ... } return factories.toArray(new String[factories.size()]); } ... }
-
查看 netflix-hystrix 的 jar 包,找到 spring.factories,可知要加载的配置类是
HystrixCircuitBreakerConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\ org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\ org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
-
分析
HystrixCircuitBreakerConfiguration
配置类,注入了一个切面 bean,实现 Hystrix 的机制就是切面(代理模式)@Configuration public class HystrixCircuitBreakerConfiguration { // 注册切面 @Bean public HystrixCommandAspect hystrixCommandAspect() { return new HystrixCommandAspect(); }
-
分析
HystrixCommandAspect
,切点是注解 @HystrixCommand,做了个环绕通知增强@Aspect public class HystrixCommandAspect { private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP; static { META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder() .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory()) .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory()) .build(); } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { // 获取原始目标方法 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); // 封装元数据 MetaHolder metaHolder = metaHolderFactory.create(joinPoint); // create 方法创建了个 GenericCommand,有两个重要的方法,run 方法封装了对原始目标方法的调用,getFallback 方法封装了对回退方法的调用 HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { // 非 Observable result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; } }
-
GenericCommand
有个父类AbstractCommand
,其构造器做了大量 init 操作,其中包括了线程池的初始化
/* package */
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
// 从本地缓存里获取线程池
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
// 没有的话再创建,底层是 new ThreadPoolExecutor()
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
2.4 远程调用 Feign
2.4.1 远程调用方式
- 使用 RestTemplate 的问题
- 在代码里拼接 URL 麻烦且容易出错
- restTemplate.getForObject 代码很固定,模版化代码
- 使用 Feign 的好处
- 使用简单,类似 Dubbo 的本地方法调用风格,面向接口编程
2.4.2 Feign 介绍
-
Feign 是一个轻量级的 RESTFUL 的 HTTP 客户端,项目引入 openfeign 的 jar 后,相当于拥有 RestTemplate + Ribbon + Hystrix 三者的功能
-
Feign 对负载均衡的支持
#针对的被调用方微服务名称,不加就是全局生效 service-name: ribbon: #请求连接超时时间 ConnectTimeout: 2000 #请求处理超时时间(Feign超时时长设置) ReadTimeout: 3000 #对所有操作都进行重试 OkToRetryOnAllOperations: true ####根据如上配置,当访问到故障请求的时候,它会再尝试访问一次当前实例(次数由MaxAutoRetries配置), ####如果不行,就换一个实例进行访问,如果还不行,再换一次实例访问(更换次数由MaxAutoRetriesNextServer配置), ####如果依然不行,返回失败信息。 MaxAutoRetries: 0 #对当前选中实例重试次数,不包括第一次调用 MaxAutoRetriesNextServer: 0 #切换实例的重试次数 NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule #负载策略调整
-
Feign 对熔断器的支持
feign: hystrix: # 启用 Feign 的熔断功能 enabled: true hystrix: command: default: execution: isolation: thread: ####Hystrix的超时时长设置 # tips: 如果Feign、Hystrix都设置了超时时长,按最小值控制 timeoutInMilliseconds: 15000
-
Feign 对日志的支持
logging: level: # Feign日志只会对日志级别为debug的做出响应 com.lagou.edu.controller.service.ResumeServiceFeignClient: debug
-
Feign 对请求压缩和响应压缩的支持
feign: compression: request: # 开启请求压缩 enabled: true # 设置压缩的数据类型,此处也是默认值 mime-types: text/html,application/xml,application/json # 设置触发压缩的⼤⼩下限,此处也是默认值 min-request-size: 2048 response: # 开启响应压缩 enabled: true
2.4.3 Feign 源码分析
-
入口:@EnableFeignClients,导入了一个 Registrar
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(FeignClientsRegistrar.class) public @interface EnableFeignClients { ... }
-
查看
FeignClientsRegistrar
,实现了 ImportBeanDefinitionRegistrar 接口,重写 registerBeanDefinitions 方法,完成 Bean 的注入class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware { @Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { // 将全局默认配置注入到容器(@EnableFeignClients 的属性 defaultConfiguration) registerDefaultConfiguration(metadata, registry); // 将 Feign 客户端注入到容器 registerFeignClients(metadata, registry); } }
-
分析 registerFeignClients 方法,其作用是扫描 @EnableFeignClients 包下及子包下所有贴了 @FeignClient 的接口,然后再注册 Feign 客户端
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { // 定义扫描器,扫描 @FeignClient ClassPathScanningCandidateComponentProvider scanner = getScanner(); scanner.setResourceLoader(this.resourceLoader); Set<String> basePackages; Map<String, Object> attrs = metadata .getAnnotationAttributes(EnableFeignClients.class.getName()); AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter( FeignClient.class); final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients"); if (clients == null || clients.length == 0) { scanner.addIncludeFilter(annotationTypeFilter); // 获取 @EnableFeignClients 的包名 basePackages = getBasePackages(metadata); } else { final Set<String> clientClasses = new HashSet<>(); basePackages = new HashSet<>(); for (Class<?> clazz : clients) { basePackages.add(ClassUtils.getPackageName(clazz)); clientClasses.add(clazz.getCanonicalName()); } AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() { @Override protected boolean match(ClassMetadata metadata) { String cleaned = metadata.getClassName().replaceAll("\\$", "."); return clientClasses.contains(cleaned); } }; scanner.addIncludeFilter( new AllTypeFilter(Arrays.asList(filter, annotationTypeFilter))); } // 递归扫描 for (String basePackage : basePackages) { Set<BeanDefinition> candidateComponents = scanner .findCandidateComponents(basePackage); for (BeanDefinition candidateComponent : candidateComponents) { if (candidateComponent instanceof AnnotatedBeanDefinition) { // verify annotated class is an interface AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent; AnnotationMetadata annotationMetadata = beanDefinition.getMetadata(); Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface"); Map<String, Object> attributes = annotationMetadata .getAnnotationAttributes( FeignClient.class.getCanonicalName()); String name = getClientName(attributes); registerClientConfiguration(registry, name, attributes.get("configuration")); // 注册 Feign 客户端 registerFeignClient(registry, annotationMetadata, attributes); } } } }
-
分析 registerFeignClient 方法,给每一个客户端生成代理对象,这是实现 Feign 的机制
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) { String className = annotationMetadata.getClassName(); // 关注 FeignClientFactoryBean,里面生成了代理对象 BeanDefinitionBuilder definition = BeanDefinitionBuilder .genericBeanDefinition(FeignClientFactoryBean.class); ... }
-
查看 org.springframework.cloud.openfeign.FeignClientFactoryBean#getObject
@Override public Object getObject() throws Exception { return getTarget(); } /** * @param <T> the target type of the Feign client * @return a {@link Feign} client created with the specified data and the context information */ <T> T getTarget() { FeignContext context = applicationContext.getBean(FeignContext.class); Feign.Builder builder = feign(context); // 如果 @FeignClient 未指定URL,生成的 Feign 客户端应该是具备负载均衡的 // 如果 @FeignClient 指定了URL,适合开发测试时使用 if (!StringUtils.hasText(this.url)) { if (!this.name.startsWith("http")) { url = "http://" + this.name; } else { url = this.name; } url += cleanPath(); // 走负载均衡 return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type, this.name, url)); } ... } // org.springframework.cloud.openfeign.FeignClientFactoryBean#loadBalance protected <T> T loadBalance(Feign.Builder builder, FeignContext context, HardCodedTarget<T> target) { // LoadBalancerFeignClient Client client = getOptional(context, Client.class); if (client != null) { // 使用 builder 构造器包装 client builder.client(client); Targeter targeter = get(context, Targeter.class); // 执行 return targeter.target(this, builder, context, target); } throw new IllegalStateException( "No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?"); } // targeter.target =》feign.target =》build().newInstance // feign.ReflectiveFeign#newInstance,动态代理 @Override public <T> T newInstance(Target<T> target) { // 方法对应的增强 Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target); Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>(); List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>(); for (Method method : target.type().getMethods()) { if (method.getDeclaringClass() == Object.class) { continue; } else if (Util.isDefault(method)) { DefaultMethodHandler handler = new DefaultMethodHandler(method); defaultMethodHandlers.add(handler); methodToHandler.put(method, handler); } else { methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method))); } } InvocationHandler handler = factory.create(target, methodToHandler); // 动态代理 T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[] {target.type()}, handler); for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) { defaultMethodHandler.bindTo(proxy); } return proxy; }
-
分析方法对应的增加,feign.SynchronousMethodHandler#invoke
@Override public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); Retryer retryer = this.retryer.clone(); while (true) { try { // 执行 return executeAndDecode(template); } catch (RetryableException e) { ... continue; } } } // feign.SynchronousMethodHandler#executeAndDecode Object executeAndDecode(RequestTemplate template) throws Throwable { Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { // 执行请求,内部就是 Ribbon 选择 server 的过程 response = client.execute(request, options); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } ... }
2.5 网关 Gateway
-
网关在架构中的位置,接收外部请求并路由到内部服务
-
Gateway 只是网关中的一种实现,Gateway 基于 Reactor 模型实现,支持异步非阻塞,有三个核心的概念
- 路由 route:由 ID + URI + predicates + filters 组成,从配置文件看也能体现这一点
- 断言 predicate:判断条件
- 过滤器 filter
-
Gateway 工作过程,核心逻辑是路由转发 + 执行过滤器链
-
Gateway 内置了 11 种路由规则,常用的规则有请求路径正则匹配和远程地址匹配
-
Gateway 过滤器
- 根据影响时机点划分
- pre
- post
- 根据过滤器类型划分
- Gateway Filter 单路由过滤器
- Global Filter 全局过滤器
- 根据影响时机点划分
-
Gateway 需要保证高可用,通过部署多个 Gateway 和用 Nginx 做负载均衡来实现
# 配置多个GateWay实例 upstream gateway { server 127.0.0.1:9002; server 127.0.0.1:9003; } location / { proxy_pass http://gateway; }
-
Gateway 配置示例
spring: cloud: gateway: # 可以配置多个路由 routes: # 路由 id,需要保持唯一 - id: path_route # 目标服务地址,如果是动态路由,则配置 lb://服务名,从注册中心取具体的服务地址 uri: https://example.org # 断言 predicates: - Path=/red/{segment},/blue/{segment} # 过滤器 filters: # 移除 url 的第一段字符串 - StripPrefix=1
2.6 分布式配置中心 Spring Cloud Config
-
Config 结构示意图
- 服务端:访问外部 Git 获取配置文件,并以接口的形式将配置文件的内容提供给客户端
- 客户端:通过接口获取配置文件的内容,然后初始化自己的应用
-
构建步骤
-
在 git 上面创建新仓库 hello-config-repo(与应用名保持一致就好)
-
上传 yml 配置文件,命名规范为 {application}-{profile}.yml 或 {application}-{profile}.properties
-
构建 config server,使用注解 @EnableConfigServer 开启配置中心服务器功能,还需要增加以下核心配置
spring: cloud: config: server: # 访问 git 需要的连接参数 git: uri: https://github.com/xx/hello-config-repo.git username: 用户名 password: 密码 search-paths: - 仓库名 # 读取分支 label: master
-
启动服务配置中心,可以通过
http://localhost:port/master/{application}-{profile}.yml
查看配置文件内容 -
构建 config client
spring: cloud: # config客户端配置,和ConfigServer通信,并告知ConfigServer希望获取的配 置信息在哪个文件中 config: name: application #配置文件名称 profile: dev #后缀名称 label: master #分支名称 uri: http://localhost:port #ConfigServer配置中心地址
-
手动刷新 client 的配置,可以避免服务重启
-
引入 actuator,并暴露通信端点
management: endpoints: web: exposure: include: refresh
- 在配置类贴上 @RefreshScope
- 手动向Client客户端发起POST请求,http://localhost:port/actuator/refresh
- 缺点:其他客户端也需要访问一次才会更新配置
-
-
自动刷新 client 的配置,使用消息总线 Spring Cloud Bus 发布配置信息变更的广播消息
-
示意图
-
构建步骤
-
Config Server 引入 bus-amqp 依赖,增加消息总线支持
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest # 暴露通信端点 management: endpoints: web: exposure: include: bus-refresh
-
向配置中心服务端发送post请求 http://localhost:port/actuator/bus-refresh,各个客户端配置即可自动刷新
-
如果希望定向某个实例,可以访问 http://localhost:9006/actuator/bus-refresh/服务名:端口
-
-
-
2.7 消息驱动 Spring Cloud Stream
-
Stream的本质
屏蔽底层不同 MQ 消息中间件之间的差异,统一 MQ 的编程模型,降低学习、开发和维护的成本。
-
编程模型
Binder 绑定器用于绑定具体的 MQ 实现,目前支持 Rabbit MQ 和 Kafka
-
编程使用的注解
- @Input,标识输入通道,消费者通过输入通道获取消息
- @Output,标识输出通道,生产者将消息放进输出通道,传递给下游
- @StreamListener,监听队列,消费者监听消息的到来
- @EnableBinding,把 channel 和 Exchange(对 Rabbit MQ 而言)绑定在一起
-
生产者的配置
spring: cloud: stream: binders: # 绑定MQ服务信息(此处我们是RabbitMQ) binderName: # 给Binder定义的名称,用于后面的关联 type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka environment: # MQ环境配置(用户名、密码等) spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 关联整合通道和binder对象 output: # output是我们定义的通道名称,此处不能乱改 destination: exchangeName # 要使用的Exchange名称(消息队列主题名称) content-type: text/plain # application/json # 消息类型设置,比如json binder: binderName # 关联MQ服务
-
消费者的配置
spring: cloud: stream: binders: # 绑定MQ服务信息(此处我们是RabbitMQ) binderName: # 给Binder定义的名称,用于后面的关联 type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka environment: # MQ环境配置(用户名、密码等) spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 关联整合通道和binder对象 input: # output是我们定义的通道名称,此处不能乱改 destination: exchangeName # 要使用的Exchange名称(消息队列主题名称) content-type: text/plain # application/json # 消息类型设置,比如json binder: binderName # 关联MQ服务
-
自定义消息通道,支持多个输入通道、输出通道,步骤如下
-
定义接口
interface CustomChannel { String INPUT_LOG = "inputLog"; String OUTPUT_LOG = "outputLog"; @Input(INPUT_LOG) SubscribableChannel inputLog(); @Output(OUTPUT_LOG) MessageChannel outputLog(); }
-
使用
-
在使用 @EnableBinding 注解时,传入上述定义的接口
-
在配置文件中绑定
bindings: inputLog: destination: exchangeName1 outputLog: destination: exchangeName2
-
-
-
消息分组,避免重复消费
spring: cloud: stream: bindings: input: group: groupName # 分组名称,同一个组的消费者,只有一个消费者能处理消息
3.常见问题
3.1 Eureka 服务发现慢
-
原因
-
服务端存在缓存
一级缓存默认 30 s 从二级缓存同步一次,而二级缓存的过期时间是 180 s,客户端获取服务实例时,先从一级缓存获取,如果没有再从二级缓存获取,如果还是没有,触发缓存的加载,从存储层拉取数据到缓存中,再返回给客户端
-
客户端存在缓存
Eureka Client 从 Eureka Server 拉取服务实例列表,存在 30 的缓存;
Ribbon 从 Eureka Client 获取服务实例,也存在 30 s 的缓存
-
-
处理
- 对服务端而言,可以:
- 缩短只读缓存更新时间 eureka.server.response-cache-update-interval-ms
- 关闭只读缓存 eureka.server.use-read-only- response-cache
- 缩短心跳间隔时间 eureka.server.eviction-interval-timer-in-ms
- 对客户端而言,可以:
- 缩短拉取服务信息的时间 eureka.client.registryFetchIntervalSeconds
- 缩短缓存服务信息的时间 ribbon.serverListRefreshInterval,可以设置为 3 s
- 对服务端而言,可以:
3.2 Spring Cloud 各组件超时
-
Ribbon 超时
- ribbon.ReadTimeout
- ribbon.ConnectTimeout
-
Hystrix 超时
-
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds
需要注意的是,Hystrix 的超时时间设置要大于 Ribbon 的,而且如果 Ribbon 开启了重试机制,Hystrix 也要将这部时间计算在内,否则会出现 Ribbon 还在重试中而 Hystrix 发生超时
-
-
Feign 超时
- 如果 Feign 和 Ribbon 都设置了超时,优先以 Ribbon 的为准,因此超时设置推荐只配置 Ribbon 和 Hystrix