首页 简历|笔试面试

手绘图+源码,彻底说清楚Nacos的注册中心(已完结)

  • 25年9月4日 发布
  • 536.69KB 共16页
手绘图+源码,彻底说清楚Nacos的注册中心(已完结)手绘图+源码,彻底说清楚Nacos的注册中心(已完结)手绘图+源码,彻底说清楚Nacos的注册中心(已完结)手绘图+源码,彻底说清楚Nacos的注册中心(已完结)手绘图+源码,彻底说清楚Nacos的注册中心(已完结)

手绘图+源码,彻底说清楚 Nacos 的注册中心(已完结)

Nacos 可以作为配置中心和注册中心,这篇文章从原理到源码,给大家讲解 Nacos 注册中

心的 4 个维度。from 星球嘉宾楼仔。

不 BB,上文章目录:

640

01 什么是动态服务发现?

服务发现是指使用一个注册中心来记录分布式系统中的全部服务的信息,以便其他服务能

够快速的找到这些已注册的服务。

在单体应用中,DNS+Nginx 可以满足服务发现的要求,此时服务的 IP 列表配置在 nginx

上。在微服务架构中,由于服务粒度变的更细,服务的上下线更加频繁,我们需要一款注

册中心来动态感知服务的上下线,并且推送 IP 列表变化给服务消费者,架构如下图。

640

02 Nacos 实现动态服务发现的原理

Nacos 实现动态服务发现的核心原理如下图,我们接下来的内容将围绕这个图来进行。

640

2.1 通讯协议

整个服务注册与发现过程,都离不开通讯协议,在 1.x 的 Nacos 版本中服务端只支持 http

协议,后来为了提升性能在 2.x 版本引入了谷歌的 grpc,grpc 是一款长连接协议,极大的

减少了 http 请求频繁的连接创建和销毁过程,能大幅度提升性能,节约资源。

据官方测试,Nacos 服务端 grpc 版本,相比 http 版本的性能提升了 9 倍以上。

2.2 Nacos 服务注册

简单来讲,服务注册的目的就是客户端将自己的 ip 端口等信息上报给 Nacos 服务端,过

程如下:

• 创建长连接:Nacos SDK 通过 Nacos 服务端域名解析出服务端 ip 列表,选择其中一个

ip 创建 grpc 连接,并定时检查连接状态,当连接断开,则自动选择服务端 ip 列表中

的下一个 ip 进行重连。

• 健康检查请求:在正式发起注册之前,Nacos SDK 向服务端发送一个空请求,服务端

回应一个空请求,若 Nacos SDK 未收到服务端回应,则认为服务端不健康,并进行一

定次数重试,如果都未收到回应,则注册失败。

• 发起注册:当你查看 Nacos java SDK 的注册方法时,你会发现没有返回值,这是因为

Nacos SDK 做了补偿机制,在真实给服务端上报数据之前,会先往缓存中插入一条记

录表示开始注册,注册成功之后再从缓存中标记这条记录为注册成功,当注册失败

时,缓存中这条记录是未注册成功的状态,Nacos SDK 开启了一个定时任务,定时查

询异常的缓存数据,重新发起注册。

Nacos SDK 注册失败时的自动补偿机制时序图。

640

相关源码如下:

@Override

public void registerService(String serviceName, String groupName, Instance insta

nce) throws NacosException {

NAMING_LOGGER.info("[REGISTER-

SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,

instance);

//添加 redo 日志

redoService.cacheInstanceForRedo(serviceName, groupName, instance);

doRegisterService(serviceName, groupName, instance);

}

public void doRegisterService(String serviceName, String groupName, Instance in

stance) throws NacosException {

//向服务端发起注册

InstanceRequest request = new InstanceRequest(namespaceId, serviceName, grou

pName,

NamingRemoteConstants.REGISTER_INSTANCE, instance);

requestToServer(request, Response.class);

//标记注册成功

redoService.instanceRegistered(serviceName, groupName);

}

执行补偿定时任务 RedoScheduledTask。

@Override

public void run() {

if (!redoService.isConnected()) {

LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current red

o task");

return;

}

try {

redoForInstances();

redoForSubscribes();

} catch (Exception e) {

LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);

}

}

private void redoForInstances() {

for (InstanceRedoData each : redoService.findInstanceRedoData()) {

try {

redoForInstance(each);

} catch (NacosException e) {

LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}

@@{} failed. ", each.getRedoType(),

each.getGroupName(), each.getServiceName(), e);

}

}

}

• 服务端数据同步(Distro 协议):Nacos SDK 只会与服务端某个节点建立长连接,当服

务端接受到客户端注册的实例数据后,还需要将实例数据同步给其他节点。Nacos 自

己实现了一个一致性协议名为 Distro,服务注册的时候会触发 Distro 一次同步,每个

Nacos 节点之间会定时互相发送 Distro 数据,以此保证数据最终一致。

• 服务实例上线推送:Nacos 服务端收到服务实例数据后会将服务的最新实例列表通过

grpc 推送给该服务的所有订阅者。

• 服务注册过程源码时序图:整理了一下服务注册过程整体时序图,对源码实现感兴趣

的可以按照根据这个时序图 view 一下源码。

640

2.3 Nacos 心跳机制

目前主流的注册中心,比如 Consul、Eureka、Zk 包括我们公司自研的 Gsched,都是通过

心跳机制来感知服务的下线。Nacos 也是通过心跳机制来实现的。

Nacos 目前 SDK 维护了两个分支的版本(1.x、2.x),这两个版本心跳机制的实现不一

样。其中 1.x 版本的 SDK 通过 http 协议来定时向服务端发送心跳维持自己的健康状态,

2.x 版本的 SDK 则通过 grpc 自身的心跳机制来保活,当 Nacos 服务端接受不到服务实例的

心跳,会认为实例下线。如下图:

640

grpc 监测到连接断开事件,发送 ClientDisconnectEvent。

public class ConnectionBasedClientManager extends ClientConnectionEventListen

er implements ClientManager {

//连接断开,发送连接断开事件

public boolean clientDisconnected(String clientId) {

Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and su

bscribers", clientId);

ConnectionBasedClient client = clients.remove(clientId);

if (null == client) {

return true;

}

client.release();

NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));

return true;

}

}

移除客户端注册的服务实例

public class ClientServiceIndexesManager extends SmartSubscriber {

@Override

public void onEvent(Event event) {

//接收失去连接事件

if (event instanceof ClientEvent.ClientDisconnectEvent) {

handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);

} else if (event instanceof ClientOperationEvent) {

handleClientOperation((ClientOperationEvent) event);

}

}

private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {

Client client = event.getClient();

for (Service each : client.getAllSubscribeService()) {

removeSubscriberIndexes(each, client.getClientId());

}

//移除客户端注册的服务实例

for (Service each : client.getAllPublishedService()) {

removePublisherIndexes(each, client.getClientId());

}

}

//移除客户端注册的服务实例

private void removePublisherIndexes(Service service, String clientId) {

if (!publisherIndexes.containsKey(service)) {

return;

}

publisherIndexes.get(service).remove(clientId);

NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));

}

}

2.4 Nacos 服务订阅

当一个服务发生上下线,Nacos 如何知道要推送给哪些客户端?

Nacos SDK 提供了订阅和取消订阅方法,当客户端向服务端发起订阅请求,服务端会记录

发起调用的客户端为该服务的订阅者,同时将服务的最新实例列表返回。当客户端发起了

取消订阅,服务端就会从该服务的订阅者列表中把当前客户端移除。

当客户端发起订阅时,服务端除了会同步返回最新的服务实例列表,还会异步的通过

grpc 推送给该订阅者最新的服务实例列表,这样做的目的是为了异步更新客户端本地缓

存的服务数据。

当客户端订阅的服务上下线,该服务所有的订阅者会立刻收到最新的服务列表并且将服务

最新的实例数据更新到内存。

640

我们也看一下相关源码,服务端接收到订阅数据,首先保存到内存中。

@Override

public void subscribeService(Service service, Subscriber subscriber, String clientId

){

Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orE

lse(service);

Client client = clientManager.getClient(clientId);

//校验长连接是否正常

if (!clientIsLegal(client, clientId)) {

return;

}

//保存订阅数据

client.addServiceSubscriber(singleton, subscriber);

client.setLastUpdatedTime();

//发送订阅事件

NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEven

t(singleton, clientId));

}

private void handleClientOperation(ClientOperationEvent event) {

Service service = event.getService();

String clientId = event.getClientId();

if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {

addPublisherIndexes(service, clientId);

} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {

removePublisherIndexes(service, clientId);

} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {

//处理订阅操作

addSubscriberIndexes(service, clientId);

} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {

removeSubscriberIndexes(service, clientId);

}

}

然后发布订阅事件。

private void addSubscriberIndexes(Service service, String clientId) {

//保存订阅数据

subscriberIndexes.computeIfAbsent(service, (key) -

> new ConcurrentHashSet<>());

// Fix #5404, Only first time add need notify event.

if (subscriberIndexes.get(service).add(clientId)) {

//发布订阅事件

NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clie

ntId));

}

}

服务端自己消费订阅事件,并且推送给订阅的客户端最新的服务实例数据。

@Override

public void onEvent(Event event) {

if (!upgradeJudgement.isUseGrpcFeatures()) {

return;

}

if (event instanceof ServiceEvent.ServiceChangedEvent) {

// If service changed, push to all subscribers.

ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.Service

ChangedEvent) event;

Service service = serviceChangedEvent.getService();

delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInst

ance().getPushTaskDelay()));

} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {

// If service is subscribed by one client, only push this client.

ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSu

bscribedEvent) event;

Service service = subscribedEvent.getService();

delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInst

ance().getPushTaskDelay(),

subscribedEvent.getClientId()));

}

}

2.5 Nacos 推送

推送方式

前面说了服务的注册和订阅都会发生推送(服务端->客户端),那推送到底是如何实现的

呢?

在早期的 Nacos 版本,当服务实例变化,服务端会通过 udp 协议将最新的数据发送给客

户端,后来发现 udp 推送有一定的丢包率,于是新版本的 Nacos 支持了 grpc 推送。Nacos

服务端会自动判断客户端的版本来选择哪种方式来进行推送,如果你使用 1.4.2 以前的

SDK 进行注册,那 Nacos 服务端会使用 udp 协议来进行推送,反之则使用 grpc。

推送失败重试

当发送推送时,客户端可能正在重启,或者连接不稳定导致推送失败,这个时候 Nacos 会

进行重试。Nacos 将每个推送都封装成一个任务对象,放入到队列中,再开启一个线程不

停的从队列取出任务执行,执行之前会先删除该任务,如果执行失败则将任务重新添加到

队列,该线程会记录任务执行的时间,如果超过 1 秒,则会记录到日志。

推送部分源码

添加推送任务到执行队列中。

private static class PushDelayTaskProcessor implements NacosTaskProcessor {

private final PushDelayTaskExecuteEngine executeEngine;

public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {

this.executeEngine = executeEngine;

}

@Override

public boolean process(NacosTask task) {

PushDelayTask pushDelayTask = (PushDelayTask) task;

Service service = pushDelayTask.getService();

NamingExecuteTaskDispatcher.getInstance()

.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeE

ngine, pushDelayTask));

return true;

}

}

推送任务 PushExecuteTask 的执行。

public class PushExecuteTask extends AbstractExecuteTask {

//..省略

@Override

public void run() {

try {

//封装要推送的服务实例数据

PushDataWrapper wrapper = generatePushData();

ClientManager clientManager = delayTaskEngine.getClientManager();

//如果是服务上下线导致的推送,获取所有订阅者

//如果是订阅导致的推送,获取订阅者

for (String each : getTargetClientIds()) {

Client client = clientManager.getClient(each);

if (null == client) {

// means this client has disconnect

continue;

}

Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);

//推送给订阅者

delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrappe

r,

new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.

isPushToAll()));

}

} catch (Exception e) {

Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() +

" execute failed ", e);

//当推送发生异常,重新将推送任务放入执行队列

delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));

}

}

//如果是服务上下线导致的推送,获取所有订阅者

//如果是订阅导致的推送,获取订阅者

private Collection<String> getTargetClientIds() {

return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClie

ntsSubscribeService(service)

: delayTask.getTargetClients();

}

执行推送任务线程 InnerWorker 的执行。

/**

* Inner execute worker.

*/

private class InnerWorker extends Thread {

InnerWorker(String name) {

setDaemon(false);

setName(name);

}

@Override

public void run() {

while (!closed.get()) {

try {

//从队列中取出任务 PushExecuteTask

Runnable task = queue.take();

long begin = System.currentTimeMillis();

//执行 PushExecuteTask

task.run();

long duration = System.currentTimeMillis() - begin;

if (duration > 1000L) {

log.warn("task {} takes {}ms", task, duration);

}

} catch (Throwable e) {

log.error("[TASK-FAILED] " + e.toString(), e);

}

}

}

}

2.6 Nacos SDK 查询服务实例

服务消费者首先需要调用 Nacos SDK 的接口来获取最新的服务实例,然后才能从获取到的

实例列表中以加权轮询的方式选择出一个实例(包含 ip,port 等信息),最后再发起调

用。

前面已经提到 Nacos 服务发生上下线、订阅的时候都会推送最新的服务实例列表到当客户

端,客户端再更新本地内存中的缓冲数据,所以调用 Nacos SDK 提供的查询实例列表的接

口时,不会直接请求服务端获取数据,而是会优先使用内存中的服务数据,只有内存中查

不到的情况下才会发起订阅请求服务端数据。

Nacos SDK 内存中的数据除了接受来自服务端的推送更新之外,自己本地也会有一个定时

任务定时去获取服务端数据来进行兜底。Nacos SDK 在查询的时候也了容灾机制,即从磁

盘获取服务数据,而这个磁盘的数据其实也是来自于内存,有一个定时任务定时从内存缓

存中获取然后加载到磁盘。Nacos SDK 的容灾机制默认关闭,可通过设置环境变量

failover-mode=true 来开启。

架构图

640

用户查询流程

640

查询服务实例部分源码

private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;

@Override

public List<Instance> getAllInstances(String serviceName, String groupName, Lis

t<String> clusters,

boolean subscribe) throws NacosException {

ServiceInfo serviceInfo;

String clusterString = StringUtils.join(clusters, ",");

//这里默认传过来是 true

if (subscribe) {

//从本地内存获取服务数据,如果获取不到则从磁盘获取

serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterS

tring);

if (null == serviceInfo || !

clientProxy.isSubscribed(serviceName, groupName, clusterString)) {

//如果从本地获取不到数据,则调用订阅方法

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);

}

} else {

//适用于不走订阅,直接从服务端获取数据的情况

serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clus

terString, 0, false);

}

List<Instance> list;

if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {

return new ArrayList<Instance>();

}

return list;

}

}

//从本地内存获取服务数据,如果开启了故障转移则直接从磁盘获取,因为当服务端挂了,本地

启动时内存中也没有数据

public ServiceInfo getServiceInfo(final String serviceName, final String groupNam

e, final String clusters) {

NAMING_LOGGER.debug("failover-

mode: {}", failoverReactor.isFailoverSwitch());

String groupedServiceName = NamingUtils.getGroupedName(serviceName, group

Name);

String key = ServiceInfo.getKey(groupedServiceName, clusters);

//故障转移则直接从磁盘获取

if (failoverReactor.isFailoverSwitch()) {

return failoverReactor.getService(key);

}

//返回内存中数据

return serviceInfoMap.get(key);

}

3. 结语

本篇文章向大家介绍 Nacos 服务发现的基本概念和核心能力以及实现的原理,旨在让大

家对 Nacos 的服务注册与发现功能有更多的了解,做到心中有数。

开通会员 本次下载免费

所有资料全部免费下载! 推荐用户付费下载获取返佣积分! 积分可以兑换商品!
一键复制 下载文档 联系客服