玩轉(zhuǎn)Eureka

Register - 注冊(cè)

Eureka Instance注冊(cè)的REST入口在com.netflix.eureka.resources.ApplicationResource#addInstance

/**? ? * Registers information about a particular instance for an? ? * {@linkcom.netflix.discovery.shared.Application}.? ? *? ? *@paraminfo? ? *? ? ? ? ? ? {@linkInstanceInfo} information of the instance.? ? *@paramisReplication? ? *? ? ? ? ? ? a header parameter containing information whether this is? ? *? ? ? ? ? ? replicated from other nodes.? ? */@POST@Consumes({"application/json","application/xml"})publicResponseaddInstance(InstanceInfo info,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? @HeaderParam(PeerEurekaNode.HEADER_REPLICATION)String isReplication){? ? ? ? logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// *********** 字段校驗(yàn) ************// validate that the instanceinfo contains all the necessary required fieldsif(isBlank(info.getId())) {returnResponse.status(400).entity("Missing instanceId").build();? ? ? ? }elseif(isBlank(info.getHostName())) {returnResponse.status(400).entity("Missing hostname").build();? ? ? ? }elseif(isBlank(info.getAppName())) {returnResponse.status(400).entity("Missing appName").build();? ? ? ? }elseif(!appName.equals(info.getAppName())) {returnResponse.status(400).entity("Mismatched appName, expecting "+ appName +" but was "+ info.getAppName()).build();? ? ? ? }elseif(info.getDataCenterInfo() ==null) {returnResponse.status(400).entity("Missing dataCenterInfo").build();? ? ? ? }elseif(info.getDataCenterInfo().getName() ==null) {returnResponse.status(400).entity("Missing dataCenterInfo Name").build();? ? ? ? }// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();// 僅當(dāng)DataCenterInfo為AmazonInfo實(shí)例的時(shí)候,其父類有可能是UniqueIdentifierif(dataCenterInfoinstanceofUniqueIdentifier) {// ......}// *********** 字段校驗(yàn) END ************registry.register(info,"true".equals(isReplication));// (1)returnResponse.status(204).build();// 204 to be backwards compatible}

真正的注冊(cè)操作在(1)處,需要注意的是isReplication變量取決于HTTP頭x-netflix-discovery-replication的值。繼續(xù)追蹤(1)的調(diào)用棧,發(fā)現(xiàn)執(zhí)行注冊(cè)操作的方法是是com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

注意該方法的javadoc,他告訴了我們一個(gè)比較重要的訊息:將InstanceInfo實(shí)例信息注冊(cè)到Eureka并且復(fù)制該信息到其他peer。如果當(dāng)前收到的注冊(cè)信息是來自其他peer的復(fù)制事件,那么將不會(huì)將這個(gè)注冊(cè)信息繼續(xù)復(fù)制到其他peer,這個(gè)標(biāo)志位就是上面所述的isReplication。

/**? ? * Registers the information about the {@linkInstanceInfo} and replicates? ? * this information to all peer eureka nodes. If this is replication event? ? * from other replica nodes then it is not replicated.? ? *? ? *@paraminfo? ? *? ? ? ? ? ? the {@linkInstanceInfo} to be registered and replicated.? ? *@paramisReplication? ? *? ? ? ? ? ? true if this is a replication event from other replica nodes,? ? *? ? ? ? ? ? false otherwise.? ? */@Overridepublicvoidregister(finalInstanceInfo info,finalbooleanisReplication){// 默認(rèn)租約有效時(shí)長(zhǎng)為90sintleaseDuration = Lease.DEFAULT_DURATION_IN_SECS;// 注冊(cè)信息里包含則依照注冊(cè)信息的租約時(shí)長(zhǎng)if(info.getLeaseInfo() !=null&& info.getLeaseInfo().getDurationInSecs() >0) {? ? ? ? ? ? leaseDuration = info.getLeaseInfo().getDurationInSecs();? ? ? ? }// super為AbstractInstanceRegistrysuper.register(info, leaseDuration, isReplication);// 復(fù)制到其他peerreplicateToPeers(Action.Register, info.getAppName(), info.getId(), info,null, isReplication);? ? }

我們看到是先獲取到租約的有效時(shí)長(zhǎng),然后才是真真正正地委托給super執(zhí)行注冊(cè)操作super.register(...)并將注冊(cè)信息復(fù)制到其他peer。register方法非常長(zhǎng),我們重點(diǎn)觀察一下他的注冊(cè)表的結(jié)構(gòu):

privatefinalConcurrentHashMap>> registry

該注冊(cè)表是一個(gè)以app name為key(在Spring Cloud里就是spring.application.name),嵌套Map為value的ConcurrentHashMap結(jié)構(gòu)。其嵌套Map是以Instance ID為key,Lease對(duì)象為value的鍵值結(jié)構(gòu)。這個(gè)registry注冊(cè)表在Eureka Server或SpringBoot Admin的監(jiān)控面板上以Eureka Service這個(gè)角色出現(xiàn)。

/**? ? * Registers a new instance with a given duration.? ? *? ? *@seecom.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)? ? */publicvoidregister(InstanceInfo registrant,intleaseDuration,booleanisReplication){try{? ? ? ? ? ? read.lock();// 可以看出registry是一個(gè)以info的app name為key的Map結(jié)構(gòu), 也就是以spring.application.name的大寫串為keyMap> gMap = registry.get(registrant.getAppName());? ? ? ? ? ? REGISTER.increment(isReplication);if(gMap ==null) {finalConcurrentHashMap> gNewMap =newConcurrentHashMap>();? ? ? ? ? ? ? ? gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if(gMap ==null) {? ? ? ? ? ? ? ? ? ? gMap = gNewMap;? ? ? ? ? ? ? ? }? ? ? ? ? ? }// registry的value的Map結(jié)構(gòu)是以info的id為key,這里的id就是Eureka文檔上的Instance ID,給你個(gè)例子你就想起是什么東西了:10.8.88.233:config-server:10888Lease existingLease = gMap.get(registrant.getId());// .......Lease lease =newLease(registrant, leaseDuration);if(existingLease !=null) {? ? ? ? ? ? ? ? lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());? ? ? ? ? ? }? ? ? ? ? ? gMap.put(registrant.getId(), lease);// .......}finally{? ? ? ? ? ? read.unlock();? ? ? ? }? ? }

上面是register(...)中關(guān)于registry的大致操作,其中有相當(dāng)一部分的操作被略去了,如果感興趣的話可以細(xì)致地研究一下。

Renew and Cancel Lease - 續(xù)約與取消租約

續(xù)約的REST入口在com.netflix.eureka.resources.InstanceResource#renewLease

而取消租約的REST入口在com.netflix.eureka.resources.InstanceResource#cancelLease

兩者的基本思想相似,經(jīng)由InstanceRegistry->AbstractInstanceRegistry->PeerAwareInstanceRegistryImpl,其中PeerAwareInstanceRegistryImpl裝飾了添加復(fù)制信息到其他節(jié)點(diǎn)的功能。其中register、renew、cancel、statusUpdate和deleteStatusOverride都會(huì)將其信息復(fù)制到其他節(jié)點(diǎn)。

Fetch Registry - 獲取注冊(cè)信息

獲取所有Eureka Instance的注冊(cè)信息,com.netflix.eureka.resources.ApplicationsResource#getContainers,其注冊(cè)信息由ResponseCacheImpl緩存,緩存的過期時(shí)間在其構(gòu)造函數(shù)中由EurekaServerConfig.getResponseCacheUpdateIntervalMs()所控制,默認(rèn)緩存時(shí)間為30s。而差量注冊(cè)信息在Server端會(huì)保存得更為長(zhǎng)一些(大約3分鐘),因此獲取的差量可能會(huì)重復(fù)返回相同的實(shí)例。Eureka Client會(huì)自動(dòng)處理這些重復(fù)信息。

Evcition

Eureke Server定期進(jìn)行失效節(jié)點(diǎn)的清理,執(zhí)行該任務(wù)的定時(shí)器被定義在com.netflix.eureka.registry.AbstractInstanceRegistry#evictionTimer,真正的任務(wù)是由他的內(nèi)部類AbstractInstanceRegistry#EvictionTask所執(zhí)行,默認(rèn)為每60s執(zhí)行一次清理任務(wù),其執(zhí)行間隔由EurekaServerConfig#getEvictionIntervalTimerInMs[eureka.server.eviction-interval-timer-in-ms]所決定。

回顧一下上面剛說完的注冊(cè)流程,在PeerAwareInstanceRegistryImpl#register里面特別指出了默認(rèn)的租約時(shí)長(zhǎng)為90s[eureka.Instance.lease-expiration-duration-in-seconds],即如果90s后都沒有收到特定的Eureka Instance的Heartbeats,則會(huì)認(rèn)為這個(gè)Instance已經(jīng)失效(Instance在正常情況下默認(rèn)每隔30s發(fā)送一個(gè)Heartbeats[eureka.Instance.lease-renewal-interval-in-seconds],對(duì)以上兩個(gè)默認(rèn)值有疑問的可以翻閱LeaseInfo),EvictionTask則會(huì)把這個(gè)Instance納入清理的范圍。我們看看EvictionTask的清理代碼是怎么寫的。

publicvoidevict(longadditionalLeaseMs){? ? ? ? logger.debug("Running the evict task");if(!isLeaseExpirationEnabled()) {? ? ? ? ? ? logger.debug("DS: lease expiration is currently disabled.");return;? ? ? ? }// We collect first all expired items, to evict them in random order. For large eviction sets,// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,// the impact should be evenly distributed across all applications.// (2) 下面的for循環(huán)就是把registry中所有的Lease提取到局部變量expiredLeasesList> expiredLeases =newArrayList<>();for(Entry>> groupEntry : registry.entrySet()) {? ? ? ? ? ? Map> leaseMap = groupEntry.getValue();if(leaseMap !=null) {for(Entry> leaseEntry : leaseMap.entrySet()) {? ? ? ? ? ? ? ? ? ? Lease lease = leaseEntry.getValue();if(lease.isExpired(additionalLeaseMs) && lease.getHolder() !=null) {? ? ? ? ? ? ? ? ? ? ? ? expiredLeases.add(lease);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for// triggering self-preservation. Without that we would wipe out full registry.intregistrySize = (int) getLocalRegistrySize();intregistrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());// (3)intevictionLimit = registrySize - registrySizeThreshold;inttoEvict = Math.min(expiredLeases.size(), evictionLimit);if(toEvict >0) {? ? ? ? ? ? logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);? ? ? ? ? ? Random random =newRandom(System.currentTimeMillis());for(inti =0; i < toEvict; i++) {// Pick a random item (Knuth shuffle algorithm)intnext = i + random.nextInt(expiredLeases.size() - i);? ? ? ? ? ? ? ? Collections.swap(expiredLeases, i, next);? ? ? ? ? ? ? ? Lease lease = expiredLeases.get(i);? ? ? ? ? ? ? ? String appName = lease.getHolder().getAppName();? ? ? ? ? ? ? ? String id = lease.getHolder().getId();? ? ? ? ? ? ? ? EXPIRED.increment();? ? ? ? ? ? ? ? logger.warn("DS: Registry: expired lease for {}/{}", appName, id);? ? ? ? ? ? ? ? internalCancel(appName, id,false);? ? ? ? ? ? }? ? ? ? }? ? }

在(2)中把本地的registry中的租約信息全部提取出來,并在(3)通過serverConfig.getRenewalPercentThreshold()[eureka.server.renewal-percent-threshold,默認(rèn)85%]計(jì)算出一個(gè)最大可剔除的閾值evictionLimit。

新增Peer Node時(shí)的初始化

在有多個(gè)Eureka Server的情況下,每個(gè)Eureka Server之間是如何發(fā)現(xiàn)對(duì)方的呢?

通過調(diào)試之后,我們根據(jù)調(diào)用鏈從下往上追溯,其初始入口為org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized

publicvoidcontextInitialized(ServletContext context){try{? ? ? ? ? ? initEurekaEnvironment();? ? ? ? ? ? initEurekaServerContext();// (4)context.setAttribute(EurekaServerContext.class.getName(),this.serverContext);? ? ? ? }catch(Throwable e) {? ? ? ? ? ? log.error("Cannot bootstrap eureka server :", e);thrownewRuntimeException("Cannot bootstrap eureka server :", e);? ? ? ? }? ? }

由下個(gè)入口(4)最終可以定位到方法com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp,從對(duì)應(yīng)的javadoc上我們可以知道該方法從peer eureka節(jié)點(diǎn)往自己填充注冊(cè)表信息。 如果操作失敗則此同步操作將failover到其他節(jié)點(diǎn),直到遍歷完列表(service urls)為止。該方法與普通的Eureka Client注冊(cè)到Eureka Server不同的一點(diǎn)是,其標(biāo)志位isReplication為true,如果不記得這是什么作用的話可以翻閱到上面的Register - 注冊(cè)小節(jié)。

Peer Node信息的定時(shí)更新

首先我們看Eureka Server的上下文實(shí)體中的方法com.netflix.eureka.DefaultEurekaServerContext#initialize

@PostConstruct@Overridepublicvoidinitialize()throwsException{? ? ? ? logger.info("Initializing ...");? ? ? ? peerEurekaNodes.start();// (5)registry.init(peerEurekaNodes);? ? ? ? logger.info("Initialized");? ? }

該方法明確指出這是一個(gè)Spring Bean,在構(gòu)建Bean完成后執(zhí)行此方法,繼續(xù)追蹤(5)。

publicvoidstart(){? ? ? ? taskExecutor = Executors.newSingleThreadScheduledExecutor(newThreadFactory() {@OverridepublicThreadnewThread(Runnable r){? ? ? ? ? ? ? ? ? ? ? ? Thread thread =newThread(r,"Eureka-PeerNodesUpdater");? ? ? ? ? ? ? ? ? ? ? ? thread.setDaemon(true);returnthread;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? );try{? ? ? ? ? ? updatePeerEurekaNodes(resolvePeerUrls());// (6)Runnable peersUpdateTask =newRunnable() {// (7)@Overridepublicvoidrun(){try{? ? ? ? ? ? ? ? ? ? ? ? updatePeerEurekaNodes(resolvePeerUrls());// (6)}catch(Throwable e) {? ? ? ? ? ? ? ? ? ? ? ? logger.error("Cannot update the replica Nodes", e);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? };? ? ? ? ? ? taskExecutor.scheduleWithFixedDelay(? ? ? ? ? ? ? ? ? ? peersUpdateTask,// (7)serverConfig.getPeerEurekaNodesUpdateIntervalMs(),// (8)serverConfig.getPeerEurekaNodesUpdateIntervalMs(),? ? ? ? ? ? ? ? ? ? TimeUnit.MILLISECONDS? ? ? ? ? ? );? ? ? ? }catch(Exception e) {thrownewIllegalStateException(e);? ? ? ? }for(PeerEurekaNode node : peerEurekaNodes) {? ? ? ? ? ? logger.info("Replica node URL:? "+ node.getServiceUrl());? ? ? ? }? ? }

上面這段代碼很清晰地告訴我們?cè)趩?dòng)Eureka Server的時(shí)候就會(huì)調(diào)用updatePeerEurekaNodes(...)更新peer的狀態(tài),并封裝為一個(gè)Runnable進(jìn)行周期性更新。這個(gè)定時(shí)時(shí)間由serverConfig.getPeerEurekaNodesUpdateIntervalMs()[eureka.server.peer-eureka-nodes-update-interval-ms]所控制,默認(rèn)值為600s,即10min。一直經(jīng)由EndpointUtils#getDiscoveryServiceUrls、EndpointUtils#getServiceUrlsFromConfig至EurekaClientConfigBean#getEurekaServerServiceUrls獲得對(duì)應(yīng)zone的service urls,如有需要可以覆蓋上述getEurekaServerServiceUrls方法以動(dòng)態(tài)獲取service urls,而不是選擇Spring Cloud默認(rèn)從properties文件讀取。

Self Preservation - 自我保護(hù)

當(dāng)新增Eureka Server時(shí),他會(huì)先嘗試從其他Peer上獲取所有Eureka Instance的注冊(cè)信息。如果在獲取時(shí)出現(xiàn)問題,該Eureka Server會(huì)在放棄之前嘗試在其他Peer上獲取注冊(cè)信息。如果這個(gè)Eureka Server成功獲取到所有Instance的注冊(cè)信息,那么他就會(huì)根據(jù)所獲取到的注冊(cè)信息設(shè)置應(yīng)該接收到的續(xù)約閾值。如果在任何時(shí)候續(xù)約的閾值低于所設(shè)定的值(在15分鐘[eureka.server.renewal-threshold-update-interval-ms]內(nèi)低于85%[eureka.server.renewal-percent-threshold]),則該Eureka Server會(huì)出于保護(hù)當(dāng)前注冊(cè)列表的目的而停止將任何Instance進(jìn)行過期處理。

在Netflix中上述保護(hù)措施被成為自我保護(hù)模式,主要是用于Eureka Server與Eureka Client存在網(wǎng)絡(luò)分區(qū)情況下的場(chǎng)景。在這種情況下,Eureka Server嘗試保護(hù)其已有的實(shí)例信息,但如果出現(xiàn)大規(guī)模的網(wǎng)絡(luò)分區(qū)時(shí),相應(yīng)的Eureka Client會(huì)獲取到大量無法響應(yīng)的服務(wù)。所以,Eureka Client必須確保對(duì)于一些不存在或者無法響應(yīng)的Eureka Instance具備更加彈性的應(yīng)對(duì)策略,例如快速超時(shí)并嘗試其他實(shí)例。

在網(wǎng)絡(luò)分區(qū)出現(xiàn)時(shí)可能會(huì)發(fā)生以下幾種情況:

Peer之間的心跳可能會(huì)失敗,某Eureka Server檢測(cè)到這種情況并為了保護(hù)當(dāng)前的注冊(cè)列表而進(jìn)入了自我保護(hù)模式。新的注冊(cè)可能發(fā)生在某些孤立的Eureka Server上,某些Eureka Client可能會(huì)擁有新的注冊(cè)列表,而另外一些則可能沒有(不同的實(shí)例視圖)。

當(dāng)網(wǎng)絡(luò)恢復(fù)到穩(wěn)定狀態(tài)后,Eureka Server會(huì)進(jìn)行自我修復(fù)。當(dāng)Peer能正常通信之后注冊(cè)信息會(huì)被重新同步。

最重要的一點(diǎn)是,在網(wǎng)絡(luò)中斷期間Eureka Server應(yīng)該更距彈性,但在這段期間Eureka Client可能會(huì)有不同的實(shí)例視圖。

作者:Chrisdon

鏈接:http://m.itdecent.cn/p/4e43acbad7ae

來源:簡(jiǎn)書

簡(jiǎn)書著作權(quán)歸作者所有,任何形式的轉(zhuǎn)載都請(qǐng)聯(lián)系作者獲得授權(quán)并注明出處。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容