注册中心 Eureka 源码解析 —— Eureka-Server 集群同步

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

源码精品专栏

 

摘要: 原创出处 http://www.iocoder.cn/Eureka/server-cluster/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本

  • 1. 概述
  • 2. 集群节点初始化与更新
  • 2.1 集群节点启动
  • 2.2 更新集群节点信息
  • 2.3 集群节点
  • 3. 获取初始注册信息
  • 4. 同步注册信息
  • 4.1 同步操作类型
  • 4.2 发起 Eureka-Server 同步操作
  • 4.3 接收 Eureka-Server 同步操作
  • 4.4 处理 Eureka-Server 同步结果

1. 概述

本文主要分享 Eureka-Server 集群同步注册信息

Eureka-Server 集群如下图:

注册中心 Eureka 源码解析 —— Eureka-Server 集群同步
  • Eureka-Server 集群不区分主从节点或者 Primary & Secondary 节点,所有节点相同角色( 也就是没有角色 ),完全对等
  • Eureka-Client 可以向任意 Eureka-Client 发起任意读写操作,Eureka-Server 将操作复制到另外的 Eureka-Server 以达到最终一致性。注意,Eureka-Server 是选择了 AP 的组件。

Eureka-Server 可以使用直接配置所有节点的服务地址,或者基于 DNS 配置。推荐阅读:《Spring Cloud构建微服务架构(六)高可用服务注册中心》 。

本文主要类在 com.netflix.eureka.cluster 包下。

OK,让我们开始愉快的遨游在代码的海洋。

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

推荐 Spring Cloud 视频

  • Java 微服务实践 - Spring Boot
  • Java 微服务实践 - Spring Cloud
  • Java 微服务实践 - Spring Boot / Spring Cloud

ps :注意,本文提到的同步,准确来说是复制( Replication )

2. 集群节点初始化与更新

com.netflix.eureka.cluster.PeerEurekaNodes ,Eureka-Server 集群节点集合 。构造方法如下 :

public class PeerEurekaNodes {

    private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);

    /**
     * 应用实例注册表
     */

    protected final PeerAwareInstanceRegistry registry;
    /**
     * Eureka-Server 配置
     */

    protected final EurekaServerConfig serverConfig;
    /**
     * Eureka-Client 配置
     */

    protected final EurekaClientConfig clientConfig;
    /**
     * Eureka-Server 编解码
     */

    protected final ServerCodecs serverCodecs;
    /**
     * 应用实例信息管理器
     */

    private final ApplicationInfoManager applicationInfoManager;

    /**
     * Eureka-Server 集群节点数组
     */

    private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
    /**
     * Eureka-Server 服务地址数组
     */

    private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();

    /**
     * 定时任务服务
     */

    private ScheduledExecutorService taskExecutor;

    @Inject
    public PeerEurekaNodes(
            PeerAwareInstanceRegistry registry,
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            ApplicationInfoManager applicationInfoManager)
 
{
        this.registry = registry;
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.applicationInfoManager = applicationInfoManager;
    }
}
  • peerEurekaNodespeerEurekaNodeUrlstaskExecutor 属性,在构造方法中未设置和初始化,而是在 PeerEurekaNodes#start() 方法,设置和初始化,下文我们会解析这个方法。
  • Eureka-Server 在初始化时,调用 EurekaBootStrap#getPeerEurekaNodes(…) 方法,创建 PeerEurekaNodes ,点击 链接 查看该方法的实现。

2.1 集群节点启动

调用 PeerEurekaNodes#start() 方法,集群节点启动,主要完成两个逻辑:

  • 初始化集群节点信息
  • 初始化固定周期( 默认:10 分钟,可配置 )更新集群节点信息的任务

代码如下:

  1public void start() {
  2:     // 创建 定时任务服务
  3:     taskExecutor = Executors.newSingleThreadScheduledExecutor(
  4:             new ThreadFactory() {
  5:                 @Override
  6:                 public Thread newThread(Runnable r) {
  7:                     Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
  8:                     thread.setDaemon(true);
  9:                     return thread;
 10:                 }
 11:             }
 12:     );
 13:     try {
 14:         // 初始化 集群节点信息
 15:         updatePeerEurekaNodes(resolvePeerUrls());
 16:         // 初始化 初始化固定周期更新集群节点信息的任务
 17:         Runnable peersUpdateTask = new Runnable() {
 18:             @Override
 19:             public void run() {
 20:                 try {
 21:                     updatePeerEurekaNodes(resolvePeerUrls());
 22:                 } catch (Throwable e) {
 23:                     logger.error("Cannot update the replica Nodes", e);
 24:                 }
 25
 26:             }
 27:         };
 28:         taskExecutor.scheduleWithFixedDelay(
 29:                 peersUpdateTask,
 30:                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
 31:                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
 32:                 TimeUnit.MILLISECONDS
 33:         );
 34:     } catch (Exception e) {
 35:         throw new IllegalStateException(e);
 36:     }
 37:     // 打印 集群节点信息
 38:     for (PeerEurekaNode node : peerEurekaNodes) {
 39:         logger.info("Replica node URL:  " + node.getServiceUrl());
 40:     }
 41: }
  • 第 15 行 && 第 21 行 :调用 #updatePeerEurekaNodes() 方法,更新集群节点信息。

2.2 更新集群节点信息

调用 #resolvePeerUrls() 方法,获得 Eureka-Server 集群服务地址数组,代码如下:

  1protected List<String> resolvePeerUrls() {
  2:     // 获得 Eureka-Server 集群服务地址数组
  3:     InstanceInfo myInfo = applicationInfoManager.getInfo();
  4:     String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
  5:     List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
  6
  7:     // 移除自己(避免向自己同步)
  8:     int idx = 0;
  9:     while (idx < replicaUrls.size()) {
 10:         if (isThisMyUrl(replicaUrls.get(idx))) {
 11:             replicaUrls.remove(idx);
 12:         } else {
 13:             idx++;
 14:         }
 15:     }
 16:     return replicaUrls;
 17: }
  • 第 2 至 5 行 :获得 Eureka-Server 集群服务地址数组。EndpointUtils#getDiscoveryServiceUrls(…) 方法,逻辑与 《Eureka 源码解析 —— EndPoint 与 解析器》「3.4 ConfigClusterResolver」 基本类似。EndpointUtils 正在逐步,猜测未来这里会替换。
  • 第 7 至 15 行 :移除自身节点,避免向自己同步。

调用 #updatePeerEurekaNodes() 方法,更新集群节点信息,主要完成两部分逻辑:

  • 添加新增的集群节点
  • 关闭删除的集群节点

代码如下:

  1protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
  2:     if (newPeerUrls.isEmpty()) {
  3:         logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
  4:         return;
  5:     }
  6
  7:     // 计算 新增的集群节点地址
  8:     Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
  9:     toShutdown.removeAll(newPeerUrls);
 10
 11:     // 计算 删除的集群节点地址
 12:     Set<String> toAdd = new HashSet<>(newPeerUrls);
 13:     toAdd.removeAll(peerEurekaNodeUrls);
 14
 15:     if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
 16:         return;
 17:     }
 18
 19:     // 关闭删除的集群节点
 20:     // Remove peers no long available
 21:     List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
 22:     if (!toShutdown.isEmpty()) {
 23:         logger.info("Removing no longer available peer nodes {}", toShutdown);
 24:         int i = 0;
 25:         while (i < newNodeList.size()) {
 26:             PeerEurekaNode eurekaNode = newNodeList.get(i);
 27:             if (toShutdown.contains(eurekaNode.getServiceUrl())) {
 28:                 newNodeList.remove(i);
 29:                 eurekaNode.shutDown(); // 关闭
 30:             } else {
 31:                 i++;
 32:             }
 33:         }
 34:     }
 35
 36:     // 添加新增的集群节点
 37:     // Add new peers
 38:     if (!toAdd.isEmpty()) {
 39:         logger.info("Adding new peer nodes {}", toAdd);
 40:         for (String peerUrl : toAdd) {
 41:             newNodeList.add(createPeerEurekaNode(peerUrl));
 42:         }
 43:     }
 44
 45:     // 赋值
 46:     this.peerEurekaNodes = newNodeList;
 47:     this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
 48: }
  • 第 7 至 9 行 :计算新增的集群节点地址。

  • 第 11 至 13 行 :计算删除的集群节点地址。

  • 第 19 至 34 行 :关闭删除的集群节点。

  • 第 36 至 43 行 :添加新增的集群节点。调用 #createPeerEurekaNode(peerUrl) 方法,创建集群节点,代码如下:

      1protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
      2:     HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
      3:     String targetHost = hostFromUrl(peerEurekaNodeUrl);
      4:     if (targetHost == null) {
      5:         targetHost = "host";
      6:     }
      7:     return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
      8: }
    • 第 2 行 :创建 Eureka-Server 集群通信客户端,在 《Eureka 源码解析 —— 网络通信》「4.2 JerseyReplicationClient」 有详细解析。
    • 第 7 行 :创建 PeerEurekaNode ,在 「2.3 PeerEurekaNode」 有详细解析。

2.3 集群节点

com.netflix.eureka.cluster.PeerEurekaNode ,单个集群节点。

点击 链接 查看构造方法

  • 第 129 行 :创建 ReplicationTaskProcessor 。在 「4.1.2 同步操作任务处理器」 详细解析
  • 第 131 至 140 行 :创建批量任务分发器,在 《Eureka 源码解析 —— 任务批处理》 有详细解析。
  • 第 142 至 151 行 :创建单任务分发器,用于 Eureka-Server 向亚马逊 AWS 的 ASG ( Autoscaling Group ) 同步状态。暂时跳过。

3. 获取初始注册信息

Eureka-Server 启动时,调用 PeerAwareInstanceRegistryImpl#syncUp() 方法,从集群的一个 Eureka-Server 节点获取初始注册信息,代码如下:

  1@Override
  2public int syncUp() {
  3:     // Copy entire entry from neighboring DS node
  4:     int count = 0;
  5
  6:     for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
  7:         // 未读取到注册信息,sleep 等待
  8:         if (i > 0) {
  9:             try {
 10:                 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
 11:             } catch (InterruptedException e) {
 12:                 logger.warn("Interrupted during registry transfer..");
 13:                 break;
 14:             }
 15:         }
 16
 17:         // 获取注册信息
 18:         Applications apps = eurekaClient.getApplications();
 19:         for (Application app : apps.getRegisteredApplications()) {
 20:             for (InstanceInfo instance : app.getInstances()) {
 21:                 try {
 22:                     if (isRegisterable(instance)) { // 判断是否能够注册
 23:                         register(instance, instance.getLeaseInfo().getDurationInSecs(), true); // 注册
 24:                         count++;
 25:                     }
 26:                 } catch (Throwable t) {
 27:                     logger.error("During DS init copy", t);
 28:                 }
 29:             }
 30:         }
 31:     }
 32:     return count;
 33: }
  • 第 7 至 15 行 :未获取到注册信息,sleep 等待再次重试。
  • 第 17 至 30 行 :获取注册信息,若获取到,注册到自身节点。
    • 第 22 行 :判断应用实例是否能够注册到自身节点。主要用于亚马逊 AWS 环境下的判断,若非部署在亚马逊里,都返回 `true` 。点击 链接 查看实现。
    • 第 23 行 :调用 `#register()` 方法,注册应用实例到自身节点。在 《Eureka 源码解析 —— 应用实例注册发现(一)之注册》 有详细解析。

若调用 #syncUp() 方法,未获取到应用实例,则 Eureka-Server 会有一段时间( 默认:5 分钟,可配 )不允许被 Eureka-Client 获取注册信息,避免影响 Eureka-Client 。

  • 标记 Eureka-Server 启动时,未获取到应用实例,代码如下:

    // PeerAwareInstanceRegistryImpl.java

    private boolean peerInstancesTransferEmptyOnStartup = true;

    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // ... 省略其他代码
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        // ... 省略其他代码
    }
  • 判断 Eureka-Server 是否允许被 Eureka-Client 获取注册信息,代码如下:

    // PeerAwareInstanceRegistryImpl.java
    public boolean shouldAllowAccess(boolean remoteRegionRequired) {
       if (this.peerInstancesTransferEmptyOnStartup) {
           // 设置启动时间
           this.startupTime = System.currentTimeMillis();
           if (!(System.currentTimeMillis() > this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {
               return false;
           }
       }
       // ... 省略其他代码
       return true;
    }

4. 同步注册信息

Eureka-Server 集群同步注册信息如下图:

注册中心 Eureka 源码解析 —— Eureka-Server 集群同步
  • Eureka-Server 接收到 Eureka-Client 的 Register、Heartbeat、Cancel、StatusUpdate、DeleteStatusOverride 操作,固定间隔( 默认值 :500 毫秒,可配 )向 Eureka-Server 集群内其他节点同步( 准实时,非实时 )。

4.1 同步操作类型

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.Action ,同步操作类型,代码如下:

public enum Action {
   Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;

   // ... 省略监控相关属性
}
  • Register ,在 《Eureka 源码解析 —— 应用实例注册发现(一)之注册》 有详细解析
  • Heartbeat ,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租》 有详细解析
  • Cancel ,在 《Eureka 源码解析 —— 应用实例注册发现(三)之下线》 有详细解析
  • StatusUpdate ,在 《Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态》 有详细解析
  • DeleteStatusOverride ,在 《Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态》 有详细解析

4.2 发起 Eureka-Server 同步操作

Eureka-Server 在完成 Eureka-Client 发起的上述操作在自身节点的执行后,向集群内其他 Eureka-Server 发起同步操作。以 Register 操作举例子,代码如下:

// PeerAwareInstanceRegistryImpl.java
public void register(final InstanceInfo info, final boolean isReplication) {
   // 租约过期时间
   int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
   if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
       leaseDuration = info.getLeaseInfo().getDurationInSecs();
   }
   // 注册应用实例信息
   super.register(info, leaseDuration, isReplication);
   // Eureka-Server 复制
   replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
  • 最后一行,调用 #replicateToPeers(…) 方法,传递对应的同步操作类型,发起同步操作。

#replicateToPeers(...) 方法,代码如下:

  1private void replicateToPeers(Action action, String appName, String id,
  2:                               InstanceInfo info /* optional */,
  3:                               InstanceStatus newStatus /* optional */boolean isReplication)
 
{
  4:     Stopwatch tracer = action.getTimer().start();
  5:     try {
  6:         if (isReplication) {
  7:             numberOfReplicationsLastMin.increment();
  8:         }
  9
 10:         // Eureka-Server 发起的请求 或者 集群为空
 11:         // If it is a replication already, do not replicate again as this will create a poison replication
 12:         if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
 13:             return;
 14:         }
 15
 16:         for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
 17:             // If the url represents this host, do not replicate to yourself.
 18:             if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
 19:                 continue;
 20:             }
 21:             replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
 22:         }
 23:     } finally {
 24:         tracer.stop();
 25:     }
 26: }
  • 第 10 至 14 行 :Eureka-Server 在处理上述操作( Action ),无论来自 Eureka-Client 发起请求,还是 Eureka-Server 发起同步,调用的内部方法相同,通过 isReplication=true 参数,避免死循环同步。
  • 第 16 至 22 行 :循环集群内每个节点,调用 #replicateInstanceActionsToPeers(…) 方法,发起同步操作。

#replicateInstanceActionsToPeers(...) 方法,代码如下:

 // ... 省略代码,太长了。
  • Cancel :调用 PeerEurekaNode#cancel(…) 方法,点击 链接 查看实现。

  • Heartbeat :调用 PeerEurekaNode#heartbeat(…) 方法,点击 链接 查看实现。

  • Register :调用 PeerEurekaNode#register(…) 方法,点击 链接 查看实现。

  • StatusUpdate :调用 PeerEurekaNode#statusUpdate(…) 方法,点击 链接 查看实现。

  • DeleteStatusOverride :调用 PeerEurekaNode#deleteStatusOverride(…) 方法,点击 链接 查看实现。

  • 上面的每个方法实现,我们会看到类似这么一段代码 :

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      
      batchingDispatcher.process(
          taskId("${action}", appName, id)// id
          new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
          @Override
          public EurekaHttpResponse&lt;Void&gt; execute() {
              return replicationClient.doString(...);
          }
       
          @Override
          public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
              // do Something...
          }
       
      }// ReplicationTask 子类
      expiryTime
      )
    • 相同应用实例的相同同步操作使用相同任务编号。在 《Eureka 源码解析 —— 任务批处理》「2. 整体流程」 中,我们看到" 接收线程( Runner )合并任务,将相同任务编号的任务合并,只执行一次。 ",因此,相同应用实例的相同同步操作就能被合并,减少操作量。例如,Eureka-Server 同步某个应用实例的 Heartbeat 操作,接收同步的 Eureak-Server 挂了,一方面这个应用的这次操作会重试,另一方面,这个应用实例会发起新的 Heartbeat 操作,通过任务编号合并,接收同步的 Eureka-Server 恢复后,减少收到重复积压的任务。
    • #task(...) 方法,生成同步操作任务编号。代码如下:

      private static String taskId(String requestType, String appName, String id) {
         return requestType + '#' + appName + '/' + id;
      }
  • InstanceReplicationTask ,同步操作任务,在 「4.1.1 同步操作任务」 详细解析。

  • expiryTime ,任务过期时间。

4.1.1 同步操作任务

注册中心 Eureka 源码解析 —— Eureka-Server 集群同步
  • com.netflix.eureka.cluster.ReplicationTask ,同步任务抽象类
    • 点击 链接 查看 ReplicationTask 代码。
    • 定义了 `#getTaskName()` 抽象方法。
    • 定义了 `#execute()` 抽象方法,执行同步任务。
    • 实现了 `#handleSuccess()` 方法,处理成功执行同步结果。
    • 实现了 `#handleFailure(…)` 方法,处理失败执行同步结果。
  • com.netflix.eureka.cluster.InstanceReplicationTask ,同步应用实例任务抽象类
    • 点击 链接 查看 InstanceReplicationTask 代码。
    • 实现了父类 `#getTaskName()` 抽象方法。
  • com.netflix.eureka.cluster.AsgReplicationTask ,亚马逊 AWS 使用,暂时跳过。

从上面 PeerEurekaNode#同步操作(...) 方法,全部实现了 InstanceReplicationTask 类的 #execute() 方法,部分重写了 #handleFailure(...) 方法。

4.1.2 同步操作任务处理器

com.netflix.eureka.cluster.InstanceReplicationTask ,实现 TaskProcessor 接口,同步操作任务处理器。

  • TaskProcessor ,在 《Eureka 源码解析 —— 任务批处理》「10. 任务执行器【执行任务】」 有详细解析。
  • 点击 链接 查看 InstanceReplicationTask 代码。

ReplicationTaskProcessor#process(task) ,处理单任务,用于 Eureka-Server 向亚马逊 AWS 的 ASG ( Autoscaling Group ) 同步状态,暂时跳过,感兴趣的同学可以点击 链接 查看方法代码。

ReplicationTaskProcessor#process(tasks) ,处理批量任务,用于 Eureka-Server 集群注册信息的同步操作任务,通过调用被同步的 Eureka-Server 的 peerreplication/batch/ 接口,一次性将批量( 多个 )的同步操作任务发起请求,代码如下:

 // ... 省略代码,太长了。
  • 第 4 行 :创建批量提交同步操作任务的请求对象( ReplicationList ) 。比较易懂,咱就不啰嗦贴代码了。
    • ReplicationList ,点击 链接 查看类。
    • ReplicationInstance ,点击 链接 查看类。
    • `#createReplicationListOf(…)` ,点击 链接 查看方法。
    • `#createReplicationInstanceOf(…)` ,点击 链接 查看方法。
  • 第 7 行 :调用 JerseyReplicationClient#submitBatchUpdates(…) 方法,请求 peerreplication/batch/ 接口,一次性将批量( 多个 )的同步操作任务发起请求。
    • `JerseyReplicationClient#submitBatchUpdates(…)` 方法,点击 链接 查看方法。
    • ReplicationListResponse ,点击 链接 查看类。
    • ReplicationInstanceResponse ,点击 链接 查看类。
  • 第 9 至 31 行 :处理批量提交同步操作任务的响应,在 「4.4 处理 Eureka-Server 同步结果」 详细解析。

4.3 接收 Eureka-Server 同步操作

com.netflix.eureka.resources.PeerReplicationResource ,同步操作任务 Resource ( Controller )。

peerreplication/batch/ 接口,映射 PeerReplicationResource#batchReplication(...) 方法,代码如下:

 // ... 省略代码,太长了。
  • 第 7 至 15 行 :逐个处理单个同步操作任务,并将处理结果( ReplicationInstanceResponse ) 添加到 ReplicationListResponse 。
  • 第 23 至 50 行 :处理单个同步操作任务,返回处理结果( ReplicationInstanceResponse )。
    • 第 24 至 25 行 :创建 ApplicationResource , InstanceResource 。我们看到,实际该方法是把单个同步操作任务提交到其他 Resource ( Controller ) 处理,Eureka-Server 收到 Eureka-Client 请求响应的 Resource ( Controller ) 是相同的逻辑
    • Register :点击 链接 查看 `#handleRegister(…)` 方法。
    • Heartbeat :点击 链接 查看 `#handleHeartbeat(…)` 方法。
    • Cancel :点击 链接 查看 `#handleCancel(…)` 方法。
    • StatusUpdate :点击 链接 查看 `#handleStatusUpdate(…)` 方法。
    • DeleteStatusOverride :点击 链接 查看 `#handleDeleteStatusOverride(…)` 方法。

4.4 处理 Eureka-Server 同步结果

😈 想想就有小激动,终于写到这里了。

接 ReplicationTaskProcessor#process(tasks) 方法,处理批量提交同步操作任务的响应,代码如下:

 // ... 省略代码,太长了。
  • 第 10 行 ,调用 #isSuccess(…) 方法,判断请求是否成功,响应状态码是否在 [200, 300) 范围内。

  • 第 11 至 13 行 :状态码 503 ,目前 Eureka-Server 返回 503 的原因是被限流。在 《Eureka 源码解析 —— 基于令牌桶算法的 RateLimiter》 详细解析。该情况为瞬时错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。

  • 第 14 至 18 行 :非预期状态码,目前 Eureka-Server 在代码上看下来,不会返回这样的状态码。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。

  • 第 20 行 :请求成功,调用 #handleBatchResponse(…) 方法,逐个处理每个 ReplicationTask 和 ReplicationInstanceResponse 。这里有一点要注意下,请求成功指的是整个请求成功,实际每个 ReplicationInstanceResponse 可能返回的状态码不在 [200, 300) 范围内。该方法下文详细解析。

  • 第 23 至 25 行 :请求发生网络异常,例如网络超时,打印网络异常日志。目前日志的打印为部分采样,条件为网络发生异常每间隔 10 秒打印一条,避免网络发生异常打印超级大量的日志。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。

    • #isNetworkConnectException(…) ,点击 链接 查看方法。
    • #logNetworkErrorSample(…) ,点击 链接 查看方法。
  • 第 26 至 29 行 :非预期异常,目前 Eureka-Server 在代码上看下来,不会抛出这样的异常。该情况为永久错误,会重试该同步操作任务,在 《Eureka 源码解析 —— 任务批处理》「3. 任务处理器」 有详细解析。


#handleBatchResponse(...) 方法,代码如下:

 // ... 省略代码,太长了。
  • ReplicationTask#handleSuccess() 方法,无任务同步操作任务重写,是个空方法,代码如下:

    // ReplicationTask.java
    public void handleSuccess() {
    }
  • ReplicationTask#handleFailure() 方法,有两个同步操作任务重写:

    • x
    • Cancel :当 Eureka-Server 不存在下线的应用实例时,返回 404 状态码,此时打印错误日志,代码如下:

      // PeerEurekaNode#cancel(...)
      @Override
      public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
          super.handleFailure(statusCode, responseEntity);
          if (statusCode == 404) {
              logger.warn("{}: missing entry.", getTaskName());
          }
      }   
    • Heartbeat :情况较为复杂,我们换一行继续说,避免排版有问题,影响阅读。

噔噔噔恰,本文的重要头戏来啦!Last But Very Importment !!!

Eureka-Server 是允许同一时刻允许在任意节点被 Eureka-Client 发起写入相关的操作,网络是不可靠的资源,Eureka-Client 可能向一个 Eureka-Server 注册成功,但是网络波动,导致 Eureka-Client 误以为失败,此时恰好 Eureka-Client 变更了应用实例的状态,重试向另一个 Eureka-Server 注册,那么两个 Eureka-Server 对该应用实例的状态产生冲突。

再例如…… 我们不要继续举例子,网络波动真的很复杂。我们来看看 Eureka 是怎么处理的。

应用实例( InstanceInfo ) 的 lastDirtyTimestamp 属性,使用时间戳,表示应用实例的版本号,当请求方( 不仅仅是 Eureka-Client ,也可能是同步注册操作的 Eureka-Server ) 向 Eureka-Server 发起注册时,若 Eureka-Server 已存在拥有更大 lastDirtyTimestamp 该实例( 相同应用并且相同应用实例编号被认为是相同实例 ),则请求方注册的应用实例( InstanceInfo ) 无法覆盖注册此 Eureka-Server 的该实例( 见 AbstractInstanceRegistry#register(...) 方法 )。例如我们上面举的例子,第一个 Eureka-Server 向 第二个 Eureka-Server 同步注册应用实例时,不会注册覆盖,反倒是第二个 Eureka-Server 同步注册应用到第一个 Eureka-Server ,注册覆盖成功,因为 lastDirtyTimestamp ( 应用实例状态变更时,可以设置 lastDirtyTimestamp 为当前时间,见 ApplicationInfoManager#setInstanceStatus(status) 方法 )。

但是光靠注册请求判断 lastDirtyTimestamp 显然是不够的,因为网络异常情况下时,同步操作任务多次执行失败到达过期时间后,此时在 Eureka-Server 集群同步起到最终一致性最最最关键性出现了:Heartbeat 。因为 Heartbeat 会周期性的执行,通过它一方面可以判断 Eureka-Server 是否存在心跳对应的应用实例,另外一方面可以比较应用实例的 lastDirtyTimestamp 。当满足下面任意条件,Eureka-Server 返回 404 状态码:

  • 1)Eureka-Server 应用实例不存在,点击 链接 查看触发条件代码位置。
  • 2)Eureka-Server 应用实例状态为 UNKNOWN,点击 链接 查看触发条件代码位置。为什么会是 UNKNOWN,在 《Eureka 源码解析 —— 应用实例注册发现(八)之覆盖状态》「 4.3 续租场景」 有详细解析。
  • 3)请求的 lastDirtyTimestamp 更大,点击 链接 查看触发条件代码位置。

请求方接收到 404 状态码返回后,认为 Eureka-Server 应用实例实际是不存在的,重新发起应用实例的注册。以本文的 Heartbeat 为例子,代码如下:

// PeerEurekaNode#heartbeat(...)
  1@Override
  2public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
          // ... 省略代码,太长了。
 17: }
  • 第 4 至 10 行 :接收到 404 状态码,调用 #register(...) 方法,向该被心跳同步操作失败的 Eureka-Server 发起注册本地的应用实例的请求。

    • 上述 3) ,会使用请求参数 overriddenStatus 存储到 Eureka-Server 的应用实例覆盖状态集合( AbstractInstanceRegistry.overriddenInstanceStatusMap ),点击 链接 查看触发条件代码位置。
  • 第 11 至 16 行 :恰好是 3) 反过来的情况,本地的应用实例的 lastDirtyTimestamp 小于 Eureka-Server 该应用实例的,此时 Eureka-Server 返回 409 状态码,点击 链接 查看触发条件代码位置。调用 #syncInstancesIfTimestampDiffers() 方法,覆盖注册本地应用实例,点击 链接 查看方法。

OK,撒花!记住:Eureka 通过 Heartbeat 实现 Eureka-Server 集群同步的最终一致性。

666. 彩蛋

写的比较嗨皮,所以就送胖友一只胖友

注册中心 Eureka 源码解析 —— Eureka-Server 集群同步

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

以下是草稿,可以凑合看

eureka server 集群假定是 s1 s2

1)client 向 s1 注册,有一个 lastDirtyTime ,正常情况下成功, s1 会向 s2 同步 
2)client 向 s1 注册(成功,但是网络波动),然后 client 发生状态的变化,lastDirtyTime 变化,向 s2 注册。 
这个时候,s1 s2 是冲突的,但是他们会互相同步,实际 s2 => s1 的注册会真正成功,s1 => s2 的注册不会返回失败,但是实际 s2 处理的时候,用的是自身的。

心跳只是最终去校验。

理论来说,心跳不应该带 lastDirtyTime 参数。带的原因就是为了做固定周期的比较。

最优解是 注册 就处理掉数据不一致 
次优解是 心跳 处理掉数据不一致

如果在类比,

注册,相当于 insertOrUpdate 
心跳,附加了校验是否要发起【注册】



如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。

注册中心 Eureka 源码解析 —— Eureka-Server 集群同步

知识星球

目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:

  • 01. 调试环境搭建
    02. 项目结构一览
    03. 配置 Configuration
    04. 核心流程一览
  • 05. 拓展机制 SPI
  • 06. 线程池
  • 07. 服务暴露 Export
  • 08. 服务引用 Refer
  • 09. 注册中心 Registry
  • 10. 动态编译 Compile
  • 11. 动态代理 Proxy
  • 12. 服务调用 Invoke
  • 13. 调用特性
  • 14. 过滤器 Filter
  • 15. NIO 服务器
  • 16. P2P 服务器
  • 17. HTTP 服务器
  • 18. 序列化 Serialization
  • 19. 集群容错 Cluster
  • 20. 优雅停机
  • 21. 日志适配
  • 22. 状态检查
  • 23. 监控中心 Monitor
  • 24. 管理中心 Admin
  • 25. 运维命令 QOS
  • 26. 链路追踪 Tracing
  • ...
    一共 60 篇++

源码不易↓↓↓

点赞支持老艿艿↓↓