go微服务框架kratos学习笔记六(kratos 服务发现 discovery)

摘要:
=Nil{panic}defercancel()panic找不到节点。这是我们发现的节点地址,可以添加到环境变量中。
go微服务框架kratos学习笔记六(kratos 服务发现 discovery)

目录

除了上次的warden直连方式外,kratos有另一个服务发现sdk : discovery

discovery 可以先简单理解为一个http服务、

它最简单的发现过程可能是这样的:

1、service 向discovery 服务注册 appid
2、client 通过 appid 从discovery 查询 service 的addr

当然 远不止这么简单,还包含了很多功能在里面的,例如服务自发现负载均衡

本节仅先看个最简单的服务发现的demo

首先走一遍discovery的http的api

http api

// innerRouter init local router api path.
func innerRouter(e *bm.Engine) {
	group := e.Group("/discovery")
	{
		group.POST("/register", register)
		group.POST("/renew", renew)
		group.POST("/cancel", cancel)
		group.GET("/fetch/all", initProtect, fetchAll)
		group.GET("/fetch", initProtect, fetch)
		group.GET("/fetchs", initProtect, fetchs)
		group.GET("/poll", initProtect, poll)
		group.GET("/polls", initProtect, polls)
		//manager
		group.POST("/set", set)
		group.GET("/nodes", initProtect, nodes)
	}
}

discovery里面的bm引擎注册了这些接口, 接着我用postman 测了测。

register 服务注册

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)第1张

fetch 获取实例

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)第2张

fetchs 批量获取实例

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)第3张

polls 批量获取实例

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)第4张

nodes 批量获取节点

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)第5张

renew 心跳

POST http://HOST/discovery/renew

curl 'http://127.0.0.1:7171/discovery/renew' -d "zone=sh1&env=test&appid=provider&hostname=myhostname"

*****成功*****
{
    "code":0,
    "message":""
}
****失败****
{
    "code":-400,
    "message":"-400"
}

cancel 下线

POST http://HOST/discovery/cancel

curl 'http://127.0.0.1:7171/discovery/cancel' -d "zone=sh1&env=test&appid=provider&hostname=myhostname"

*****成功*****
{
    "code":0,
    "message":""
}
****失败****
{
    "code":-400,
    "message":"-400"
}

应用发现逻辑

官方应用发现实现逻辑

选择可用的节点,将应用appid加入poll的appid列表
如果polls请求返回err,则切换node节点,切换逻辑与自发现错误时切换逻辑一致
如果polls返回-304 ,说明appid无变更,重新发起poll监听变更
polls接口返回appid的instances列表,完成服务发现,根据需要选择不同的负载均衡算法进行节点的调度

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)第6张

服务注册

服务注册demo

直接new一个demo服务然后将demo服务注册到discovery

主函数里面服务注册部分添加类似下面注册代码。

	ip := "127.0.0.1"
	port := "9000"
	hn, _ := os.Hostname()
	dis := discovery.New(nil)
	ins := &naming.Instance{
		Zone:     env.Zone,
		Env:      env.DeployEnv,
		AppID:    "demo.service",
		Hostname: hn,
		Addrs: []string{
			"grpc://" + ip + ":" + port,
		},
	}

	cancel, err := dis.Register(context.Background(), ins)
	if err != nil {
		panic(err)
	}

	defer cancel()

panic 找不到节点,这个是我们discovery的节点地址 可以在环境变量里面添加。

I:VSProjectkratos-notekratos-notewardendiscoveryserver>kratos run
INFO 01/04-19:32:28.198 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/rpc/warden/server.go:329 warden: start grpc listen addr: [::]:9000
panic: invalid discovery config nodes:[] region:region01 zone:zone01 deployEnv:dev host:DESKTOP-NUEKD5O

配置discovery节点后成功注册

I:VSProjectkratos-notekratos-notewardendiscoveryserver>set DISCOVERY_NODES=127.0.0.1:7171

I:VSProjectkratos-notekratos-notewardendiscoveryserver>kratos run
INFO 01/04-19:40:25.426 I:/VSProject/kratos-note/kratos-note/warden/discovery/server/cmd/main.go:23 abc start
2020/01/04 19:40:25 start watch filepath: I:VSProjectkratos-notekratos-notewardendiscoveryserverconfigs
INFO 01/04-19:40:25.497 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/http/blademaster/server.go:98 blademaster: start http listen addr: 0.0.0.0:8000
[warden] config is Deprecated, argument will be ignored. please use -grpc flag or GRPC env to configure warden server.
INFO 01/04-19:40:25.500 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/rpc/warden/server.go:329 warden: start grpc listen addr: [::]:9000
INFO 01/04-19:40:25.501 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:248 disocvery: AddWatch(infra.discovery) already watch(false)
INFO 01/04-19:40:25.514 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:631 discovery: successfully polls(http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=DESKTOP-NUEKD5O&latest_timestamp=0) instances ({"infra.discovery":{"instances":{"sh001":[{"region":"sh","zone":"sh001","env":"dev","appid":"infra.discovery","hostname":"test1","addrs":["http://127.0.0.1:7171"],"version":"","latest_timestamp":1578122538945305700,"metadata":null,"status":1}]},"latest_timestamp":1578122538945305700,"scheduler":null}})
INFO 01/04-19:40:25.527 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:414 discovery: register client.Get(http://127.0.0.1:7171/discovery/register) env(dev) appid(demo.service) addrs([grpc://127.0.0.1:9000]) success

服务注册逻辑

现在我们跟着日志走一遍。

如图理解,服务注册逻辑应该是register -> renew ->cancel 注册 然后 不停给心跳 最后取消注册。

截取一条本地服务注册日志

操作大概是:

1、启动discovery服务
2、启动demo.server 注册demo.server appid 服务
3、过一小会等待心跳,关闭demo.server

接着可以看到整个日志的过程大致上是 :

1、 0 : 启动dicovery服务
2、 2/3/4 : 服务初始化
3、 5 : polls 长轮循 infra.discovery 服务自发现
4、 6/7: 新的连接 & 服务注册、这时候我们起动的demo.server服务注册上来了
5、 9 : polls 长轮循 infra.discovery 服务自发现
6、 10 : renew心跳
7、 12 : 最后我杀掉了注册的服务,出现了cancel请求。

从日志看逻辑理解基本上也没有太多偏差,接着看看服务发现。

0:discovery -conf discovery-example.toml -log.v=0
1:
2:INFO 01/10-10:31:19.575 C:/server/src/go/src/discovery/discovery/syncup.go:160 discovery 3:changed nodes:[127.0.0.1:7171] zones:map[]
4:INFO 01/10-10:31:19.575 C:/server/src/go/pkg/mod/github.com/bilibili/kratos@v0.1.0/pkg/net/http/blademaster/server.go:98 blademaster: start http listen addr: 127.0.0.1:7171
INFO 01/10-10:31:19.575 C:/server/src/go/src/discovery/registry/registry.go:219 Polls from(test1) new connection(1)

5:INFO 01/10-10:31:31.796 http-access-log ts=0 method=GET ip=127.0.0.1 traceid= user=no_user params=appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=0 msg=0 stack=<nil> err= timeout_quota=39.98 path=/discovery/polls ret=0

6:INFO 01/10-10:31:31.798 C:/server/src/go/src/discovery/registry/registry.go:219 Polls from(DESKTOP-9NFHKD0) new connection(1)

7:INFO 01/10-10:31:31.799 http-access-log method=POST user=no_user path=/discovery/register err= ts=0 params=addrs=grpc%3A%2F%2F127.0.0.1%3A9000&appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0&metadata=&region=region01&status=1&version=&zone=zone01 stack=<nil> ret=0 timeout_quota=39.98 ip=127.0.0.1 msg=0 traceid=

8:INFO 01/10-10:32:01.799 C:/server/src/go/src/discovery/registry/registry.go:370 DelConns from(DESKTOP-9NFHKD0) delete(1)

9:ERROR 01/10-10:32:01.799 http-access-log method=GET ip=127.0.0.1 err=-304 timeout_quota=39.98 user=no_user path=/discovery/polls params=appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=1578623479566211700 ret=-304 msg=-304 stack=-304 ts=30.0011342 traceid=

10:INFO 01/10-10:32:01.799 http-access-log msg=0 err= timeout_quota=39.98 method=POST ip=127.0.0.1 user=no_user ret=0 path=/discovery/renew traceid= params=appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0&region=region01&zone=zone01 stack=<nil> ts=0

11:INFO 01/10-10:32:01.800 C:/server/src/go/src/discovery/registry/registry.go:219 Polls from(DESKTOP-9NFHKD0) new connection(1)

12:INFO 01/10-10:32:08.499 http-access-log timeout_quota=39.98 path=/discovery/cancel ret=0 stack=<nil> ip=127.0.0.1 msg=0 traceid= ts=0 method=POST user=no_user err= params=appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0&region=region01&zone=zone01

服务发现

同样先配置discovert节点 set DISCOVERY_NODES=127.0.0.1:7171

NewClient()改成如下方式

package dao

import (
	"context"

	"github.com/bilibili/kratos/pkg/naming/discovery"
	"github.com/bilibili/kratos/pkg/net/rpc/warden"
	"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver"

	"google.golang.org/grpc"
)

// AppID your appid, ensure unique.
const AppID = "demo.service" // NOTE: example

func init(){
	// NOTE: 注意这段代码,表示要使用discovery进行服务发现
	// NOTE: 还需注意的是,resolver.Register是全局生效的,所以建议该代码放在进程初始化的时候执行
	// NOTE: !!!切记不要在一个进程内进行多个不同中间件的Register!!!
	// NOTE: 在启动应用时,可以通过flag(-discovery.nodes) 或者 环境配置(DISCOVERY_NODES)指定discovery节点
	resolver.Register(discovery.Builder())
}

// NewClient new member grpc client
func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (DemoClient, error) {
	client := warden.NewClient(cfg, opts...)
	conn, err := client.Dial(context.Background(), "discovery://default/"+AppID)
	if err != nil {
		return nil, err
	}
	// 注意替换这里:
	// NewDemoClient方法是在"api"目录下代码生成的
	// 对应proto文件内自定义的service名字,请使用正确方法名替换
	return NewDemoClient(conn), nil
}

同时嵌入dao结构里面、和上次warden direct方式一样做SayHello接口测试调用。

// dao dao.
type dao struct {
	db          *sql.DB
	redis       *redis.Redis
	mc          *memcache.Memcache
	demoClient  demoapi.DemoClient
	cache *fanout.Fanout
	demoExpire int32
}

// New new a dao and return.
func New(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d Dao, err error) {
	var cfg struct{
		DemoExpire xtime.Duration
	}
	if err = paladin.Get("application.toml").UnmarshalTOML(&cfg); err != nil {
		return
	}
	
	grpccfg := &warden.ClientConfig{
		Dial:              xtime.Duration(time.Second * 10),
		Timeout:           xtime.Duration(time.Millisecond * 250),
		Subset:            50,
		KeepAliveInterval: xtime.Duration(time.Second * 60),
		KeepAliveTimeout:  xtime.Duration(time.Second * 20),
	}
	//paladin.Get("grpc.toml").UnmarshalTOML(grpccfg)
	var grpcClient demoapi.DemoClient
	grpcClient, err = NewClient(grpccfg)

	d = &dao{
		db: db,
		redis: r,
		mc: mc,
		demoClient : grpcClient,
		cache: fanout.New("cache"),
		demoExpire: int32(time.Duration(cfg.DemoExpire) / time.Second),
	}
	return
}

测试调用

操作流程

1、启动discovery服务
2、启动demo.server 注册为 demo.server 服务
3、启动demo.client、
4、最后从demo.client的SayHello http接口 调到demo.server的grpc SayHello 接口。

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)第7张

简单看看官方grpc服务发现逻辑

context deadline exceeded

我发现个别时候调用做服务发现,会发现client起不来, context deadline exceeded。

因为我把new client加在了dao里面,超时的话,demo.client就直接pannic了

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)第8张

根据client日志可以发现
warden client: dial discovery://default/demo.service?subset=50 error context deadline exceeded!panic: context deadline exceeded

client : host:127.0.0.1:7171, url:http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=1578902420717217500
在调用discovery polls的时候超时了,我配置的grpc dial 期限为10s, 在官方discovery文档介绍中写到discovery在做服务节点自发现的时候,如果server节点实例没有变更,则接口会阻塞直到30s返回-304。(poll(polls) 接口为长轮训接口)

关于服务自发现的话,这里不细看了,本节只关注应用发现逻辑,感兴趣可以去discovery上看看。
go微服务框架kratos学习笔记六(kratos 服务发现 discovery)第9张

INFO 01/10-15:22:34.436 http-access-log method=GET path=/discovery/polls user=no_user params=appid=infra.discovery&env=dev&hostname=CLII&latest_timestamp=0 stack=<nil> err= timeout_quota=39.98 ts=0 msg=0 traceid= ip=127.0.0.1 ret=0
INFO 01/10-15:22:34.438 C:/server/src/go/src/discovery/registry/registry.go:222 Polls from(CLII) new connection(1)
INFO 01/10-15:22:34.440 C:/server/src/go/src/discovery/registry/registry.go:228 Polls from(CLII) reuse connection(2)
INFO 01/10-15:22:44.219 C:/server/src/go/src/discovery/registry/registry.go:373 DelConns from(DESKTOP-9NFHKD0) delete(1)
ERROR 01/10-15:22:44.219 http-access-log path=/discovery/polls ret=-304 msg=-304 timeout_quota=39.98 ip=127.0.0.1 params=appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=1578637331623587200 user=no_user ts=39.9808023 err=-304 traceid= method=GET stack=-304
INFO 01/10-15:22:44.221 C:/server/src/go/src/discovery/registry/registry.go:222 Polls from(DESKTOP-9NFHKD0) new connection(1)
INFO 01/10-15:22:44.525 http-access-log ts=0 method=POST ip=127.0.0.1 user=no_user stack=<nil> path=/discovery/renew err= traceid= ret=0 msg=0 timeout_quota=39.98 params=appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0&region=region01&zone=zone01
INFO 01/10-15:23:04.438 C:/server/src/go/src/discovery/registry/registry.go:370 DelConns from(CLII) count decr(2)
ERROR 01/10-15:23:04.438 http-access-log msg=-304 ts=30.0002154 method=GET err=-304 stack=-304 timeout_quota=39.98 ip=127.0.0.1 user=no_user path=/discovery/polls params=appid=infra.discovery&env=dev&hostname=CLII&latest_timestamp=1578637331623587200 ret=-304 traceid=
INFO 01/10-15:23:04.440 C:/server/src/go/src/discovery/registry/registry.go:373 DelConns from(CLII) delete(1)
ERROR 01/10-15:23:04.440 http-access-log ts=30.0013758 traceid= user=no_user path=/discovery/polls ret=-304 err=-304 method=GET ip=127.0.0.1 params=appid=infra.discovery&appid=demo.service&env=dev&hostname=CLII&latest_timestamp=1578637331623587200&latest_timestamp=0 msg=-304 stack=-304 timeout_quota=39.98

结合discovery 日志
15:22:34的client发dial
15:22:45左右client panic
15:23:04dicovery才回复一个-304 (实例信息无变更)


这实际上是因为 client.Dial() 里面封装了grpc官方的服务发现,当然最终走的是kratos warden里面的实现的grpc官方服务发现逻辑。

下面简单看看这层逻辑,很绕,我也没看懂,只能简单看看,有机会接触再补个详细的。

简单看看官方grpc服务发现逻辑

// NewClient new grpc client
func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (demoapi.DemoClient, error) {
	client := warden.NewClient(cfg, opts...)
	cc, err := client.Dial(context.Background(), fmt.Sprintf("discovery://default/%s", AppID))
	if err != nil {
		return nil, err
	}
	return demoapi.NewDemoClient(cc), nil
}

实际上 client.Dial() 里面会有会有这么一个流程 :

client.Dial() - > grpc里面DialContext() -> parser target 的 scheme 然后获取 (这里是discovery) 对应的Builder

	if cc.dopts.resolverBuilder == nil {
		// Only try to parse target when resolver builder is not already set.
		cc.parsedTarget = parseTarget(cc.target)
		grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
		cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
		if cc.dopts.resolverBuilder == nil {
			// If resolver builder is still nil, the parsed target's scheme is
			// not registered. Fallback to default resolver and set Endpoint to
			// the original target.
			grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
			cc.parsedTarget = resolver.Target{
				Scheme:   resolver.GetDefaultScheme(),
				Endpoint: target,
			}
			cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
		}
	} else {
		cc.parsedTarget = resolver.Target{Endpoint: target}
	}

DialContext() 成功会得到 -> 结构体ClientConn -> ClientConn.resolverWrapper 初始化 -> 调用build()

	defer ccr.resolverMu.Unlock()

	ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {
	ctx    context.Context
	cancel context.CancelFunc

	target       string
	parsedTarget resolver.Target
	authority    string
	dopts        dialOptions
	csMgr        *connectivityStateManager

	balancerBuildOpts balancer.BuildOptions
	blockingpicker    *pickerWrapper

	mu              sync.RWMutex
	resolverWrapper *ccResolverWrapper
	sc              *ServiceConfig
	conns           map[*addrConn]struct{}
	// Keepalive parameter can be updated if a GoAway is received.
	mkp             keepalive.ClientParameters
	curBalancerName string
	balancerWrapper *ccBalancerWrapper
	retryThrottler  atomic.Value

	firstResolveEvent *grpcsync.Event

	channelzID int64 // channelz unique identification number
	czData     *channelzData
}

用户Builder的实现进行UpdateState —> ClientConn的updateResolverState -> updateResolverState -> Address初始化等grpc官方逻辑

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
	Scheme() string
}

// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
	// UpdateState updates the state of the ClientConn appropriately.
	UpdateState(State)
	// ReportError notifies the ClientConn that the Resolver encountered an
	// error.  The ClientConn will notify the load balancer and begin calling
	// ResolveNow on the Resolver with exponential backoff.
	ReportError(error)
	// NewAddress is called by resolver to notify ClientConn a new list
	// of resolved addresses.
	// The address list should be the complete list of resolved addresses.
	//
	// Deprecated: Use UpdateState instead.
	NewAddress(addresses []Address)
	// NewServiceConfig is called by resolver to notify ClientConn a new
	// service config. The service config should be provided as a json string.
	//
	// Deprecated: Use UpdateState instead.
	NewServiceConfig(serviceConfig string)
	// ParseServiceConfig parses the provided service config and returns an
	// object that provides the parsed config.
	ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}

kratos discovery

warden包装了gRPC的整个服务发现实现逻辑,代码分别位于pkg/naming/naming.go和warden/resolver/resolver.go中

naming.go定义了用于描述业务实例的Instance结构、用于服务注册的Registry接口、用于服务发现的Resolver接口。

// Resolver resolve naming service
type Resolver interface {
	Fetch(context.Context) (*InstancesInfo, bool)
	Watch() <-chan struct{}
	Close() error
}

// Registry Register an instance and renew automatically.
type Registry interface {
	Register(ctx context.Context, ins *Instance) (cancel context.CancelFunc, err error)
	Close() error
}

// InstancesInfo instance info.
type InstancesInfo struct {
	Instances map[string][]*Instance `json:"instances"`
	LastTs    int64                  `json:"latest_timestamp"`
	Scheduler *Scheduler             `json:"scheduler"`
}

resolver.go内实现了gRPC官方的resolver.Builderresolver.Resolver接口,同时也暴露了naming.go内的naming.Buildernaming.Resolver接口

// Resolver resolve naming service
type Resolver interface {
	Fetch(context.Context) (*InstancesInfo, bool)
	Watch() <-chan struct{}
	Close() error
}

// Builder resolver builder.
type Builder interface {
	Build(id string) Resolver
	Scheme() string
}

kratos对grpc的Build做了包装,只需要传对应的服务的appid即可:warden/resolver/resolver.go在gRPC进行调用后,会根据Scheme方法查询对应的naming.Builder实现并调用Build将id传入。而实现naming.Resolver即可通过appid去对应的服务发现中间件(这里是discovery服务)进行实例信息的查询(Fetch接口)、除了简单进行Fetch操作外还多了Watch方法,用于监听服务发现中间件的节点变化情况,能够实时的进行服务实例信息的更新。

在naming/discovery内实现了基于discovery为中间件的服务注册与发现逻辑。大致上也可以在这里面看到做了对discovery服务中间件的polls请求。

// Build disovery resovler builder.
func (d *Discovery) Build(appid string, opts ...naming.BuildOpt) naming.Resolver {
	r := &Resolve{
		id:    appid,
		d:     d,
		event: make(chan struct{}, 1),
		opt:   new(naming.BuildOptions),
	}
	for _, opt := range opts {
		opt.Apply(r.opt)
	}
	d.mutex.Lock()
	app, ok := d.apps[appid]
	if !ok {
		app = &appInfo{
			resolver: make(map[*Resolve]struct{}),
		}
		d.apps[appid] = app
		cancel := d.cancelPolls
		if cancel != nil {
			cancel()
		}
	}
	app.resolver[r] = struct{}{}
	d.mutex.Unlock()
	if ok {
		select {
		case r.event <- struct{}{}:
		default:
		}
	}
	log.Info("disocvery: AddWatch(%s) already watch(%v)", appid, ok)
	d.once.Do(func() {
		go d.serverproc()
	})
	return r
}

func (d *Discovery) serverproc() {
	var (
		retry  int
		ctx    context.Context
		cancel context.CancelFunc
	)
	ticker := time.NewTicker(time.Minute * 30)
	defer ticker.Stop()
	for {
		if ctx == nil {
			ctx, cancel = context.WithCancel(d.ctx)
			d.mutex.Lock()
			d.cancelPolls = cancel
			d.mutex.Unlock()
		}
		select {
		case <-d.ctx.Done():
			return
		case <-ticker.C:
			d.switchNode()
		default:
		}
		apps, err := d.polls(ctx)
		if err != nil {
			d.switchNode()
			if ctx.Err() == context.Canceled {
				ctx = nil
				continue
			}
			time.Sleep(time.Second)
			retry++
			continue
		}
		retry = 0
		d.broadcast(apps)
	}
}

免责声明:文章转载自《go微服务框架kratos学习笔记六(kratos 服务发现 discovery)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇ELK 6安装配置 nginx日志收集 kabana汉化JavaScript异步编程 ( 一 )下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Spring Test 整合 JUnit 4 使用总结

转自:https://blog.csdn.net/hgffhh/article/details/83712924 这两天做Web开发,发现通过spring进行对象管理之后,做测试变得复杂了。因为所有的Bean都需要在applicationContext.xml中加载好,之后再通过@Resource去取得。如果每次都要整个业务流做的差不多了再去测试,这样效率...

Tomcat学习总结(2)——Tomcat使用详解

一、Tomcat服务器端口的配置 Tomcat的所有配置都放在conf文件夹之中,里面的server.xml文件是配置的核心文件。 如果想修改Tomcat服务器的启动端口,则可以在server.xml配置文件中的Connector节点进行的端口修改 例如:将Tomcat服务器的启动端口由默认的8080改成8081端口 Tomcat服务器启动端口默认配置 1...

Android定位服务关闭和定位(悬浮)等权限拒绝的判断

public voidcheckLocationPermission() { if (!PermissionHelper.isLocServiceEnable(this)) {//检测是否开启定位服务 DlgUtils.showLocServiceDialog(this); } else{//检测...

V8 初次接触(Qt5) 1+1=2 博客频道 CSDN.NET

V8 初次接触(Qt5) - 1+1=2 - 博客频道 - CSDN.NET V8 初次接触(Qt5) 分类:QtQt52011-09-03 18:411881人阅读评论(4)收藏举报 在Qt5中,javascript 和 C++一样,成了Qt中的一等公民;而Qt选中的javascript引擎则是 V8。 看来,有必要简单看看这个东西了。 V8...

XFS文件系统的备份与恢复

永久修改主机名:hostnamectl set-hostname oldboy临时修改主机名:hostname xfsdump备份xfsdump -f 备份的文件位置 要备份的分区或者磁盘 免交互备份做定时备份:xfsdump -f /opt/dump_sdb_01 /sdb -L dump_sdb_1 -M passwd 备份/boot分区下grub2进...

跨站请求伪造(CSRF)

1. 什么是跨站请求伪造(CSRF)   CSRF(Cross-site request forgery跨站请求伪造,也被称为“One Click Attack”或者Session Riding,通常缩写为CSRF或者XSRF,是一种对网站的恶意利用。尽管听起来像跨站脚本(XSS),但它与XSS非常不同,并且攻击方式几乎相左。XSS利用站点内的信任用户,而...