// InstanceInfoReplicator.javapublicboolean onDemandUpdate(){if(rateLimiter.acquire(burstSize, allowedRatePerMinute)){// 限流相关,跳过
scheduler.submit(newRunnable(){
@Overridepublicvoid run(){
logger.debug("Executing on-demand update of local InstanceInfo");// 取消任务Future latestPeriodic = scheduledPeriodicRef.get();if(latestPeriodic != null&&!latestPeriodic.isDone()){
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);}// 再次调用
InstanceInfoReplicator.this.run();}});returntrue;}else{
logger.warn("Ignoring onDemand update due to rate limiter");returnfalse;}}
// InstanceInfoReplicator.java
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相关,跳过
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
// 取消任务
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
// 再次调用
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
// ApplicationInfoManager.javapublicvoid refreshDataCenterInfoIfRequired(){// hostnameString existingAddress = instanceInfo.getHostName();String newAddress;if(config instanceof RefreshableInstanceConfig){// Refresh data center info, and return up to date address
newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);}else{
newAddress = config.getHostName(true);}// ipString newIp = config.getIpAddress();if(newAddress != null&&!newAddress.equals(existingAddress)){
logger.warn("The address changed from : {} => {}", existingAddress, newAddress);// :( in the legacy code here the builder is acting as a mutator.// This is hard to fix as this same instanceInfo instance is referenced elsewhere.// We will most likely re-write the client at sometime so not fixing for now.
InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
builder.setHostName(newAddress)// hostname
.setIPAddr(newIp)// ip
.setDataCenterInfo(config.getDataCenterInfo());// dataCenterInfo
instanceInfo.setIsDirty();}}publicabstractclass AbstractInstanceConfig implements EurekaInstanceConfig {privatestaticfinal Pair<String, String> hostInfo = getHostInfo();
@OverridepublicString getHostName(boolean refresh){return hostInfo.second();}
@OverridepublicString getIpAddress(){return hostInfo.first();}privatestatic Pair<String, String> getHostInfo(){
Pair<String, String> pair;try{InetAddress localHost = InetAddress.getLocalHost();
pair = new Pair<String, String>(localHost.getHostAddress(), localHost.getHostName());}catch(UnknownHostException e){
logger.error("Cannot get host info", e);
pair = new Pair<String, String>("", "");}return pair;}}
// ApplicationInfoManager.java
public void refreshDataCenterInfoIfRequired() {
// hostname
String existingAddress = instanceInfo.getHostName();
String newAddress;
if (config instanceof RefreshableInstanceConfig) {
// Refresh data center info, and return up to date address
newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
} else {
newAddress = config.getHostName(true);
}
// ip
String newIp = config.getIpAddress();
if (newAddress != null && !newAddress.equals(existingAddress)) {
logger.warn("The address changed from : {} => {}", existingAddress, newAddress);
// :( in the legacy code here the builder is acting as a mutator.
// This is hard to fix as this same instanceInfo instance is referenced elsewhere.
// We will most likely re-write the client at sometime so not fixing for now.
InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
builder.setHostName(newAddress) // hostname
.setIPAddr(newIp) // ip
.setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo
instanceInfo.setIsDirty();
}
}
public abstract class AbstractInstanceConfig implements EurekaInstanceConfig {
private static final Pair<String, String> hostInfo = getHostInfo();
@Override
public String getHostName(boolean refresh) {
return hostInfo.second();
}
@Override
public String getIpAddress() {
return hostInfo.first();
}
private static Pair<String, String> getHostInfo() {
Pair<String, String> pair;
try {
InetAddress localHost = InetAddress.getLocalHost();
pair = new Pair<String, String>(localHost.getHostAddress(), localHost.getHostName());
} catch (UnknownHostException e) {
logger.error("Cannot get host info", e);
pair = new Pair<String, String>("", "");
}
return pair;
}
}
publicclassLease<T>{/**
* 实体
*/private T holder;/**
* 注册时间戳
*/privatelong registrationTimestamp;/**
* 开始服务时间戳
*/privatelong serviceUpTimestamp;/**
* 取消注册时间戳
*/privatelong evictionTimestamp;/**
* 最后更新时间戳
*/// Make it volatile so that the expiration task would see this quickerprivatevolatilelong lastUpdateTimestamp;/**
* 租约持续时长,单位:毫秒
*/privatelong duration;publicLease(T r, int durationInSecs){
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
duration = (durationInSecs *1000);}}
public class Lease<T> {
/**
* 实体
*/
private T holder;
/**
* 注册时间戳
*/
private long registrationTimestamp;
/**
* 开始服务时间戳
*/
private long serviceUpTimestamp;
/**
* 取消注册时间戳
*/
private long evictionTimestamp;
/**
* 最后更新时间戳
*/
// Make it volatile so that the expiration task would see this quicker
private volatile long lastUpdateTimestamp;
/**
* 租约持续时长,单位:毫秒
*/
private long duration;
public Lease(T r, int durationInSecs) {
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
duration = (durationInSecs * 1000);
}
}
publicvoid register(InstanceInfo registrant, int leaseDuration, boolean isReplication){try{// 获取读锁
read.lock();Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());// 增加 注册次数 到 监控
REGISTER.increment(isReplication);// 获得 应用实例信息 对应的 租约if(gMap == null){finalConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = newConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);// 添加 应用if(gMap == null){// 添加 应用 成功
gMap = gNewMap;}}Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif(existingLease != null&&(existingLease.getHolder()!= null)){// 已存在时,使用数据不一致的时间大的应用注册信息为有效的Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();// Server 注册的 InstanceInfoLong registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();// Client 请求的 InstanceInfo
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if(existingLastDirtyTimestamp > registrationLastDirtyTimestamp){
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();}}else{// The lease does not exist and hence it is a new registration// 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin`synchronized(lock){if(this.expectedNumberOfRenewsPerMin>0){// Since the client wants to cancel it, reduce the threshold// (1// for 30 seconds, 2 for a minute)this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;this.numberOfRenewsPerMinThreshold =
(int)(this.expectedNumberOfRenewsPerMin* serverConfig.getRenewalPercentThreshold());}}
logger.debug("No previous lease information found; it is new registration");}// 创建 租约Lease<InstanceInfo> lease = newLease<InstanceInfo>(registrant, leaseDuration);if(existingLease != null){// 若租约已存在,设置 租约的开始服务的时间戳
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}// 添加到 租约映射
gMap.put(registrant.getId(), lease);// 添加到 最近注册的调试队列synchronized(recentRegisteredQueue){
recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));}// 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用)// This is where the initial state transfer of overridden status happensif(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())){
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if(!overriddenInstanceStatusMap.containsKey(registrant.getId())){
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if(overriddenStatusFromMap != null){
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);}// 获得应用实例最终状态,并设置应用实例的状态// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 设置 租约的开始服务的时间戳(只有第一次有效)// If the lease is registered with UP status, set lease service up timestampif(InstanceStatus.UP.equals(registrant.getStatus())){
lease.serviceUp();}// 设置 应用实例信息的操作类型 为 添加
registrant.setActionType(ActionType.ADDED);// 添加到 最近租约变更记录队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));// 设置 租约的最后更新时间戳
registrant.setLastUpdatedTimestamp();// 设置 响应缓存 过期
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);}finally{// 释放锁
read.unlock();}}
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 获取读锁
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 增加 注册次数 到 监控
REGISTER.increment(isReplication);
// 获得 应用实例信息 对应的 租约
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用
if (gMap == null) { // 添加 应用 成功
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) { // 已存在时,使用数据不一致的时间大的应用注册信息为有效的
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin`
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 创建 租约
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) { // 若租约已存在,设置 租约的开始服务的时间戳
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 添加到 租约映射
gMap.put(registrant.getId(), lease);
// 添加到 最近注册的调试队列
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用)
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// 获得应用实例最终状态,并设置应用实例的状态
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// 设置 租约的开始服务的时间戳(只有第一次有效)
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 设置 应用实例信息的操作类型 为 添加
registrant.setActionType(ActionType.ADDED);
// 添加到 最近租约变更记录队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 设置 租约的最后更新时间戳
registrant.setLastUpdatedTimestamp();
// 设置 响应缓存 过期
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
// 释放锁
read.unlock();
}
}