go-micro 防坑指南
- go-micro 实现proto的rpc接口
 - client/server 使用grpc
 - selector
 - wrapper
 - option
 - broker
 - client 默认超时时间:5秒
 - subscription handler 的 context
 
go-micro是一个使用go语言实现的微服务生态系统,之所以称为生态系统是因为它既有用于开发微服务的框架(go-micro),也有微服务相关的一整套工具(micro)。
这个项目历经四年的开发,在2019年3月终于发布了第一个正式release(1.0.0),宣称足够稳定。
相比与其他go语言的微服务框架(e.g. gizmo, go kit)而言,这个框架的优势是允许开发人员足够快的开发一个系统,并且提供各种中间件的选择性。但是,它的坏处是它是一个opinionated的框架,给开发者的自定义程度较低。
下面,记录我在使用这个项目进行微服务开发过程中的经验和教训。
go-micro 实现proto的rpc接口
和直接使用grpc相比的一个区别是,使用micro/protoc-gen-micro生成出来的xxx.micro.go文件可以让service的实现者去实现service对应的handler,然后在server/client的启动过程中调用RegisterXXXServiceHandler()来注册你的实现。
特别地,这些handler将rpc的接口原型中的response作为输入参数,而返回值仅为一个error。
例如:
针对如下的rpc:
service Example {
    rpc Call(Request) returns (Response) {}
}
需要实现的handler为:
type ExampleHandler interface {
	Call(context.Context, *Request, *Response) error
}
client/server 使用grpc
默认情况下,go-micro的client和server使用go-micro库中的server和client的实现,它们是基于http协议进行通讯。
除了这个默认的实现,它也允许用户使用grpc来创建client和server。使用的方法为:
- proto文件的编译:在编译proto的时候将
--go_out=<some_dir>改为--go_out=plugins=grpc:<some_dir> - server和client的创建:在创建server/client的时候使用
micro/go-grpc提供的NewService()(而不是micro/go-micro的NewService()) - 
    
(仅当前版本) 需要在go.mod中加入:
replace github.com/testcontainers/testcontainer-go => github.com/testcontainers/testcontainers-go v0.0.0-20181115231424-8e868ca12c0f replace github.com/golang/lint => github.com/golang/lint v0.0.0-20190227174305-8f45f776aaf1 
当然,client和server要配合使用,否则由于协议不同而无法通讯。
selector
详见官方blog
go-micro的selector是基于服务发现做负载均衡和流量控制的,它主要有以下的功能:
- filter: 通过预设的filter(包括:label-based, endpoint-based, version-based)或者自定义的filter进行服务的过滤。例如:有一个服务名为micro.service.foo,用户可以部署不同版本的该服务。然后,在客户端调用的时候根据version进行filter,获取想要调用的版本
 - strategy: 对于filter的结果,根据不同算法进行负载均衡
 
另外,它内置一些辅助功能,帮助用户更好地实现一个健壮的微服务系统,包括:
- selection cache: 避免了每次服务发现都要查询后端,同时,通过一个watcher做必要的reload
 - blacklisting node:自动将问题节点列入黑名单,过一段时间再将其解除黑名单
 - timeout & retries
 - connection pool
 
以下,主要根据官方的例子来展示一下怎么使用filter。参考 micro/examples/server/wrapper 和 micro/examples/client/dc_filter,其中server端在metadata中加入了datacenter=local:
func main() {
	// optionally setup command line usage
	cmd.Init()
	md := server.DefaultOptions().Metadata
	md["datacenter"] = "local"
	server.DefaultServer = server.NewServer(
		server.WrapHandler(logWrapper),
		server.WrapSubscriber(logSubWrapper),
		server.Metadata(md),
	)
    ...
client端则在调用的时候进行metadata的过滤:
    ...
    // create context with metadata
    ctx := metadata.NewContext(context.Background(), map[string]string{
        "datacenter": "local",
    })
    ...
    // Call service with the context
    if err := client.Call(ctx, req, rsp); err != nil {
        fmt.Println("call err: ", err, rsp)
        return
    }
    ...
// wrap the client, do filter before actual call
func (dc *dcWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    // fetch metadata from context
	md, _ := metadata.FromContext(ctx)
    // define a filter closure to only select server with matching metadata configured
	filter := func(services []*registry.Service) []*registry.Service {
		for _, service := range services {
			var nodes []*registry.Node
			for _, node := range service.Nodes {
				if node.Metadata["datacenter"] == md["datacenter"] {
					nodes = append(nodes, node)
				}
			}
			service.Nodes = nodes
		}
		return services
	}
    // extend options with SelectionOption
	callOptions := append(opts, client.WithSelectOption(
		selector.WithFilter(filter),
	))
    // do actual call (with extended option passed in)
	return dc.Client.Call(ctx, req, rsp, callOptions...)
}
当然,客户端的filter可以直接调用默认提供的FilterLabel(),而不是像例子中的那样自己实现一套,如下:
// wrap the client, do filter before actual call
func (dc *dcWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    // fetch metadata from context
	md, _ := metadata.FromContext(ctx)
    // extend options with SelectionOption
	callOptions := append(opts, client.WithSelectOption(
		selector.WithFilter(selector.FilterLabel("datacenter"), md["datacenter"]),
	))
    // do actual call (with extended option passed in)
	return dc.Client.Call(ctx, req, rsp, callOptions...)
}
有了filter,我们可以做服务的金丝雀发布。
而金丝雀发布又有很多做法。
一种做法是完全布一套新的服务,然后从最上层进行流量控制。这种做法比较简单可靠。
另一种做法是通过某种机制对每一个服务流量控制。我的一个设想大致如下:

假设我们对某个feature按用户进行灰度,我们必须知道这个feature是由哪几个服务提供的,并且这个feature要应用于哪些用户。将这些记录到consul。
每个服务都内置一段作为client的filter逻辑,这套逻辑包括三部分:
- 启动时从consul载入这些feature的kv信息;
 - 启动一个watcher来watch consul中的feature相关的kv的更新
 - 每当要发起一个请求,根据这个请求的服务接收方(
service)和这个请求对应的user_id,进行对服务的基于label的filter 
wrapper
go-micro 中的wrapper一般是在micro.NewService()的时候传入的,大致有以下几种:
- 
    
ClientWrapper: 调用
micro.WrapClient()返回,该函数的输入参数可以是多个wrapper,这些wrapper的作用是在封装当前的client,所以,它们会返回一个新的client。需要注意的是,这个封装的顺序和传入的顺序相反(实际上这样更好理解,一个请求发生的时候,先调用第一个传入的wrapper内的逻辑,再调用第二个,以此类推,最后调用实际的client逻辑):func WrapClient(w ...client.Wrapper) Option { return func(o *Options) { // apply in reverse for i := len(w); i > 0; i-- { o.Client = w[i-1](o.Client) } } }例如,你想对一个client加入以下两个wrapper:
- tracing wrapper: 封装了client的
Call()等函数,这些函数会在调用前创建一个span,并且将span作为context传入rpc的调用 - log wrapper: 封装了client的
Call()等函数,这些函数会在调用前从ctx中获取span,并且使用span记录log 
显然,这里的log wrapper依赖于tracing wrapper,也就是说,实际client的
Call()应按先完成tracing wrapper的工作,然后再完成log wrapper的工作。所以传入的顺序如下:service := micro.NewService( micro.WrapClient( opentracingWrapper.NewClientWrapper(tracer), logWrap, ), ) - tracing wrapper: 封装了client的
 - 
    
CallWrapper: ClientWrapper的简化版,仅影响
Client.Call() - 
    
HandlerWrapper: 调用
micro.WrapHandler()返回,该函数的输入参数可以是多个wrapper,这些wrapper的作用是返回一个返回封装当前server的handler函数的函数,并且将这些函数记录在server的options中。在之后实际接收到某个请求的时候,会按照options中记录的wrapper的逆序进行应用:// FROM: go-micro/options.go // WrapHandler adds a handler Wrapper to a list of options passed into the server func WrapHandler(w ...server.HandlerWrapper) Option { return func(o *Options) { var wrappers []server.Option for _, wrap := range w { wrappers = append(wrappers, server.WrapHandler(wrap)) } // Init once o.Server.Init(wrappers...) } } // FROM: go-micro/server/rpc_server.go func (s *rpcServer) ServeConn(sock transport.Socket) { ... // create a wrapped function handler := func(ctx context.Context, req Request, rsp interface{}) error { return r.ServeRequest(ctx, req, rsp.(Response)) } for i := len(s.opts.HdlrWrappers); i > 0; i-- { handler = s.opts.HdlrWrappers[i-1](handler) } ... }所以,和
ClientWrapper一样,传入的顺序即反映了请求被执行的顺序。 - 
    
SubscriberWrapper:broker subscription 的wrapper,如上。
 
option
go-micro 中的几乎每一个组件的包(例如:client, server, broker, registry…)都会提供一个options.go的文件,其中包含了这个包在初始的时候所支持的选项。
例如,默认的server在收到关闭的信号(SIGTERM, SIGKILL, SIGINT)的时候只会停止监听,但是不会等待当前正在进行的那些请求结束。如果,你希望你的服务支持graceful shutdown的话,就要在创建micro.NewService()的时候带上server包提供的Wait(bool)选项。
broker
对于server,起一个subscriber的流程如下:
micro.NewService()service.Init()service.RegisterSubscriber(topic, service.Server(), func/obj): 注册subscriber,但是还没有连接到*brokerservice.Run():在server.Start()中会去连接broker,至此才算真正的注册了subscriber
(当然,你可以直接用broker,那就需要手动的connect)
对于client,发送一个topic的流程如下:
micro.NewService()service.Init()micro.NewPublisher(topic, service.Client())publisher.Publish(ctx, event)
(当然,你可以直接用broker,那就需要手动的connect)
有几个点需要注意:
- 所有的message默认只支持protobuf的二进制格式
 - publisher发送之后如果进程立刻退出,会导致发送实际没有成功。这应该是和broker的库的实现有关(例如:nats的publish是通过
bufio,然后隔一段时间做flush) 
client 默认超时时间:5秒
如果在调用pb生成的client接口时传入了不带超时的context(例如:context.TODO())。那么,go-micro会为你自动加上一个全局超时时间,默认为5s:
// go-micro/client/rpc_client.go
    func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
        // make a copy of call opts
        callOpts := r.opts.CallOptions
        for _, opt := range opts {
            opt(&callOpts)
        }
        next, err := r.next(request, callOpts)
        if err != nil {
            return err
        }
        // check if we already have a deadline
        d, ok := ctx.Deadline()
        if !ok {
            // no deadline so we create a new one
            ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
        } else {
            // got a deadline so no need to setup context
            // but we need to set the timeout we pass along
            opt := WithRequestTimeout(d.Sub(time.Now()))
            opt(&callOpts)
        }
        ...
    }
// go-micro/client/client.go
    // DefaultRequestTimeout is the default request timeout
    DefaultRequestTimeout = time.Second * 5
subscription handler 的 context
broker中subscription handler 的context不会带上超时时间(这是符合直觉的,因为publisher无法判断subscriptor接收到event之后会做什么,所以超时时间应该由subscriptor自己定义)。但是,其他某些元数据(例如:opentracing中的span信息)依然会带上。
Comments