go-micro是一个使用go语言实现的微服务生态系统,之所以称为生态系统是因为它既有用于开发微服务的框架(go-micro),也有微服务相关的一整套工具(micro)。

这个项目历经四年的开发,在2019年3月终于发布了第一个正式release(1.0.0),宣称足够稳定。

相比与其他go语言的微服务框架(e.g. gizmo, go kit)而言,这个框架的优势是允许开发人员足够快的开发一个系统,并且提供各种中间件的选择性。但是,它的坏处是它是一个opinionated的框架,给开发者的自定义程度较低。

下面,记录我在使用这个项目进行微服务开发过程中的经验和教训。

go-micro 实现proto的rpc接口

和直接使用grpc相比的一个区别是,使用micro/protoc-gen-micro生成出来的xxx.micro.go文件可以让service的实现者去实现service对应的handler,然后在server/client的启动过程中调用RegisterXXXServiceHandler()来注册你的实现。

特别地,这些handler将rpc的接口原型中的response作为输入参数,而返回值仅为一个error

例如:

针对如下的rpc:

service Example {
    rpc Call(Request) returns (Response) {}
}

需要实现的handler为:

type ExampleHandler interface {
	Call(context.Context, *Request, *Response) error
}

client/server 使用grpc

默认情况下,go-micro的client和server使用go-micro库中的server和client的实现,它们是基于http协议进行通讯。

除了这个默认的实现,它也允许用户使用grpc来创建client和server。使用的方法为:

  1. proto文件的编译:在编译proto的时候将--go_out=<some_dir>改为--go_out=plugins=grpc:<some_dir>
  2. server和client的创建:在创建server/client的时候使用micro/go-grpc提供的NewService()(而不是micro/go-microNewService())
  3. (仅当前版本) 需要在go.mod中加入:

     replace github.com/testcontainers/testcontainer-go => github.com/testcontainers/testcontainers-go v0.0.0-20181115231424-8e868ca12c0f
     replace github.com/golang/lint => github.com/golang/lint v0.0.0-20190227174305-8f45f776aaf1
    

当然,client和server要配合使用,否则由于协议不同而无法通讯。

selector

详见官方blog

go-micro的selector是基于服务发现做负载均衡和流量控制的,它主要有以下的功能:

另外,它内置一些辅助功能,帮助用户更好地实现一个健壮的微服务系统,包括:

以下,主要根据官方的例子来展示一下怎么使用filter。参考 micro/examples/server/wrappermicro/examples/client/dc_filter,其中server端在metadata中加入了datacenter=local:

func main() {
	// optionally setup command line usage
	cmd.Init()

	md := server.DefaultOptions().Metadata
	md["datacenter"] = "local"

	server.DefaultServer = server.NewServer(
		server.WrapHandler(logWrapper),
		server.WrapSubscriber(logSubWrapper),
		server.Metadata(md),
	)
    ...

client端则在调用的时候进行metadata的过滤:

    ...
    // create context with metadata
    ctx := metadata.NewContext(context.Background(), map[string]string{
        "datacenter": "local",
    })

    ...

    // Call service with the context
    if err := client.Call(ctx, req, rsp); err != nil {
        fmt.Println("call err: ", err, rsp)
        return
    }
    ...

// wrap the client, do filter before actual call
func (dc *dcWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    // fetch metadata from context
	md, _ := metadata.FromContext(ctx)

    // define a filter closure to only select server with matching metadata configured
	filter := func(services []*registry.Service) []*registry.Service {
		for _, service := range services {
			var nodes []*registry.Node
			for _, node := range service.Nodes {
				if node.Metadata["datacenter"] == md["datacenter"] {
					nodes = append(nodes, node)
				}
			}
			service.Nodes = nodes
		}
		return services
	}

    // extend options with SelectionOption
	callOptions := append(opts, client.WithSelectOption(
		selector.WithFilter(filter),
	))

    // do actual call (with extended option passed in)
	return dc.Client.Call(ctx, req, rsp, callOptions...)
}

当然,客户端的filter可以直接调用默认提供的FilterLabel(),而不是像例子中的那样自己实现一套,如下:

// wrap the client, do filter before actual call
func (dc *dcWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    // fetch metadata from context
	md, _ := metadata.FromContext(ctx)

    // extend options with SelectionOption
	callOptions := append(opts, client.WithSelectOption(
		selector.WithFilter(selector.FilterLabel("datacenter"), md["datacenter"]),
	))

    // do actual call (with extended option passed in)
	return dc.Client.Call(ctx, req, rsp, callOptions...)
}

有了filter,我们可以做服务的金丝雀发布。

而金丝雀发布又有很多做法。

一种做法是完全布一套新的服务,然后从最上层进行流量控制。这种做法比较简单可靠。

另一种做法是通过某种机制对每一个服务流量控制。我的一个设想大致如下:

canary

假设我们对某个feature按用户进行灰度,我们必须知道这个feature是由哪几个服务提供的,并且这个feature要应用于哪些用户。将这些记录到consul。

每个服务都内置一段作为client的filter逻辑,这套逻辑包括三部分:

wrapper

go-micro 中的wrapper一般是在micro.NewService()的时候传入的,大致有以下几种:

option

go-micro 中的几乎每一个组件的包(例如:client, server, broker, registry…)都会提供一个options.go的文件,其中包含了这个包在初始的时候所支持的选项。

例如,默认的server在收到关闭的信号(SIGTERM, SIGKILL, SIGINT)的时候只会停止监听,但是不会等待当前正在进行的那些请求结束。如果,你希望你的服务支持graceful shutdown的话,就要在创建micro.NewService()的时候带上server包提供的Wait(bool)选项。

broker

对于server,起一个subscriber的流程如下:

  1. micro.NewService()
  2. service.Init()
  3. service.RegisterSubscriber(topic, service.Server(), func/obj): 注册subscriber,但是还没有连接到*broker
  4. service.Run():在server.Start()中会去连接broker,至此才算真正的注册了subscriber

(当然,你可以直接用broker,那就需要手动的connect)

对于client,发送一个topic的流程如下:

  1. micro.NewService()
  2. service.Init()
  3. micro.NewPublisher(topic, service.Client())
  4. publisher.Publish(ctx, event)

(当然,你可以直接用broker,那就需要手动的connect)

有几个点需要注意:

  1. 所有的message默认只支持protobuf的二进制格式
  2. publisher发送之后如果进程立刻退出,会导致发送实际没有成功。这应该是和broker的库的实现有关(例如:nats的publish是通过bufio,然后隔一段时间做flush)

client 默认超时时间:5秒

如果在调用pb生成的client接口时传入了不带超时的context(例如:context.TODO())。那么,go-micro会为你自动加上一个全局超时时间,默认为5s:

// go-micro/client/rpc_client.go

    func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
        // make a copy of call opts
        callOpts := r.opts.CallOptions
        for _, opt := range opts {
            opt(&callOpts)
        }

        next, err := r.next(request, callOpts)
        if err != nil {
            return err
        }

        // check if we already have a deadline
        d, ok := ctx.Deadline()
        if !ok {
            // no deadline so we create a new one
            ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
        } else {
            // got a deadline so no need to setup context
            // but we need to set the timeout we pass along
            opt := WithRequestTimeout(d.Sub(time.Now()))
            opt(&callOpts)
        }
        ...
    }

// go-micro/client/client.go

    // DefaultRequestTimeout is the default request timeout
    DefaultRequestTimeout = time.Second * 5

subscription handler 的 context

broker中subscription handler 的context不会带上超时时间(这是符合直觉的,因为publisher无法判断subscriptor接收到event之后会做什么,所以超时时间应该由subscriptor自己定义)。但是,其他某些元数据(例如:opentracing中的span信息)依然会带上。