gRPC ProtoBuf

https://doc.oschina.net/grpc?t=60133
https://chai2010.cn/advanced-go-programming-book/ch4-rpc/ch4-05-grpc-hack.html
https://blog.didiyun.com/index.php/2018/12/12/grpc-golang-1/



gRPC是由Google主导开发的RPC框架,使用HTTP/2协议并用ProtoBuf作为序列化工具。其客户端提供Objective-C、Java接口,服务器侧则有Java、Golang、C++等接口,从而为移动端(iOS/Androi)到服务器端通讯提供了一种解决方案。 当然在当下的环境下,这种解决方案更热门的方式是RESTFull API接口。该方式需要自己去选择编码方式、服务器架构、自己搭建框架(JSON-RPC)。gRPC官方对REST的声音是:



和REST一样遵循HTTP协议(明确的说是HTTP/2),但是gRPC提供了全双工流
和传统的REST不同的是gRPC使用了静态路径,从而提高性能
用一些格式化的错误码代替了HTTP的状态码更好的标示错误
至于是否要选择用gRPC。对于已经有一套方案的团队,可以参考下。如果是从头来做,可以考虑下gRPC提供的从客户端到服务器的整套解决方案,这样不用客户端去实现http的请求会话,JSON等的解析,服务器端也有现成的框架用。从15年3月到现在gRPC也发展了一年了,慢慢趋于成熟。下面我们就以gRPC的Golang版本看下其在golang上面的表现。至于服务端的RPC,感觉golang标准库的RPC框架基本够用了,没必要再去用另一套方案。



  1. 安装protobuf
    虽然gRPC也支持protobuf2.x,但是建议还是使用protobuf3.x,尽管还没有正式版本,不过golang版本基本没有什么问题,另外3.x官方支持了Objective-C,这也是我们使用gRPC的初衷:提供一个移动端到服务器的解决方案。去到Protocol Buffers下载最新版本(Version3.0.0 beta2),然后解压到本地。本地需要已经安装好autoconf automake libtool.rpm系列(fedora/centos/redheat)可以用yum安装。Mac上可以用brew进行安装



brew install autoconf automake libtool
然后执行



./configure –prefix=your_pb_install_path
接着



make
make install
set your_pb_install_path to your $PATH
检查是否安装完成



protoc –version
libprotoc 3.0.0
然后安装golang protobuf直接使用golang的get即可



go get -u github.com/golang/protobuf/proto // golang protobuf 库
go get -u github.com/golang/protobuf/protoc-gen-go //protoc –go_out 工具



  1. 安装gRPC-go
    gRPC-go可以通过golang 的get命令直接安装,非常方便。



go get google.golang.org/grpc
这里大家可能比较奇怪,为什么gRPC-go在github的地址是”https://github.com/grpc/grpc-go”,但是为什么要用“google.golang.org/grpc”进行安装呢?应该grpc原本是google内部的项目,归属golang,就放在了google.golang.org下面了,后来对外开放,又将其迁移到github上面了,又因为golang比较坑爹的import路径规则,所以就都没有改路径名了。



先看PB的描述:



syntax = “proto3”;



option objc_class_prefix = “HLW”;



package helloworld;



// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}



// The request message containing the user’s name.
message HelloRequest {
string name = 1;
}



// The response message containing the greetings
message HelloReply {
string message = 1;
}
这里定义了一个服务Greeter,其中有个API SayHello。其接受参数为HelloRequest类型,返回HelloReply类型。这里HelloRequest和HelloReply就是普通的PB定义



服务定义为:



// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
service定义了一个server。其中的接口可以是四种类型



rpc GetFeature(Point) returns (Feature) {}
类似普通的函数调用,客户端发送请求Point到服务器,服务器返回相应Feature.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
客户端发起一次请求,服务器端返回一个流式数据,比如一个数组中的逐个元素
rpc RecordRoute(stream Point) returns (RouteSummary) {}
客户端发起的请求是一个流式的数据,比如数组中的逐个元素,服务器返回一个相应
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
客户端发起的请求是一个流式数据,比如数组中的逐个元素,二服务器返回的也是一个类似的数据结构
后面三种可以参考官方的route_guide示例。



使用protoc命令生成相关文件:



protoc –go_out=plugins=grpc:. helloworld.proto
ls
helloworld.pb.go helloworld.proto
生成对应的pb.go文件。这里用了plugins选项,提供对grpc的支持,否则不会生成Service的接口。



3.2 服务器端程序
然后编辑服务器端程序:



package main



import (
“log”
“net”



pb "your_path_to_gen_pb_dir/helloworld"
"golang.org/x/net/context"
"google.golang.org/grpc" )


const (
port = “:50051”
)



// server is used to implement helloworld.GreeterServer.
type server struct{}



// SayHello implements helloworld.GreeterServer
func (s server) SayHello(ctx context.Context, in *pb.HelloRequest) (pb.HelloReply, error) {
return &pb.HelloReply{Message: “Hello “ + in.Name}, nil
}



func main() {
lis, err := net.Listen(“tcp”, port)
if err != nil {
log.Fatalf(“failed to listen: %v”, err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
s.Serve(lis)
}
这里首先定义一个server结构,然后实现SayHello的接口,其定义在“your_path_to_gen_pb_dir/helloworld”



SayHello(context.Context, HelloRequest) (HelloReply, error)
然后调用grpc.NewServer() 创建一个server s。接着注册这个server s到结构server上面 pb.RegisterGreeterServer(s, &server{}) 最后将创建的net.Listener传给s.Serve()。就可以开始监听并服务了,类似HTTP的ListenAndServe。



3.3 客户端程序
客户端程序:



package main



import (
“log”
“os”



pb "your_path_to_gen_pb_dir/helloworld"
"golang.org/x/net/context"
"google.golang.org/grpc" )


const (
address = “localhost:50051”
defaultName = “world”
)



func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf(“did not connect: %v”, err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)



// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.Message) } 这里通过pb.NewGreeterClient()传入一个conn创建一个client,然后直接调用client上面对应的服务器的接口


SayHello(context.Context, HelloRequest) (HelloReply, error)
接口,返回*HelloReply 对象。



先运行服务器,在运行客户端,可以看到。



./greeter_server &



./greeter_client
2016/03/10 21:42:19 Greeting: Hello world



grpc,它是基于protobuf 这个由google 推出来,号称比json 更方便、更快速、更简短的一种沟通格式。



grpc 其实重点很简单,就是你定义出来的东西,不只代表格式,也代表你伺服器的接口,所以不管在client 端或server 端,只要遵照这个grpc 格式下去实作,api 就接起来,也省去之前用restful api ,server 端要写一堆router ,也不用另外写文件,刚好符合一句话『程式码即文件』,大家遵照grpc 的契约各自去实作。



再开始介绍之前,有一些基础工具要先安装,首先执行下面指令,安装grpc



go get -u google.golang.org/grpc



再来安装



go get -u github.com/golang/protobuf/protoc-gen-go



grpc 宣告
接下来就能来介绍正题,我们来介绍一下,基础的grpc 格式宣告



//這邊使用 proto3 的格式 也就是 protobuf 第三版的意思,要注意,第二版跟第三版有一些語法上的差異喔
syntax = “proto3”;



package example;



//定義了一個 EchoServer
service EchoServer {
rpc Echo (EchoRequest) returns(EchoReply){}
}



//定義了 Echo Server EchoRequest
message EchoRequest {
string message = 1;
}



//定義了 Echo Response
//這裡多回傳了一個 叫做 unixtime
message EchoReply {
string message = 1;
int64 unixtime = 2;
}
上面这样就是一个简易的grpc的宣告,更多型别上可以参阅官网



接下来,在你撰写这只档案的地方,执行



protoc –go_out=plugins=grpc:. *.proto



就会看到产生出一个另一个有带pb.go 的档案,接下来我们分别来实作client 、server 端



grpc客户
func main() {
conn, err := grpc.Dial(“localhost:9999”, grpc.WithInsecure())
if err != nil {
log.Fatalf(“連線失敗:%v”, err)
}
defer conn.Close()



c := pb.NewEchoClient(conn)

r, err := c.Echo(context.Background(), &pb.EchoRequest{Msg: "HI HI HI HI"})
if err != nil {
log.Fatalf("無法執行 Plus 函式:%v", err)
}
log.Printf("回傳結果:%s , 時間:%d", r.Msg, r.Unixtime)


}
grpc服务器



type EchoServer struct{}



func (e *EchoServer) Echo(ctx context.Context, req *pb.EchoRequest) (resp *pb.EchoReply, err error) {



log.Printf("receive client request, client send:%s\n", req.Msg)
return &pb.EchoReply{
Msg: req.Msg,
Unixtime: time.Now().Unix(),
}, nil


}



func main() {
apiListener, err := net.Listen(“tcp”, “:9999”)
if err != nil {
log.Println(err)
return
}



// 註冊 grpc
es := &EchoServer{}

grpc := grpc.NewServer()
//pb.Re(grpc, es)
pb.RegisterEchoServer(grpc, es)

reflection.Register(grpc)
if err := grpc.Serve(apiListener); err != nil {
log.Fatal(" grpc.Serve Error: ", err)
return
} } 最后只要先启动grpc server,再执行grpc client ,就能看到有相对应的讯息连接上了。


grpc是一个通用的rpc框架,用google实现,当然也有go语言的版本。在工作中主要用到这个库,所以看看源码加强自己对框架的了解。目前来说主要分析的都以go版本为主(并没有看其他语言版本).由于个人水平有限,代码中的有些思想也是个人揣测,难免有些错误,如果发现错误,还望帮忙指出。



2 源码目录浏览
grpc使用protobuf(google的序列化框架)作为通信协议,底层上使用http2作为其传输协议,grpc源码中自己实现了http2的服务端跟客户端,而并没有用net/http包。http2有很多特性能够高效的传输数据,具体特点可以看相关链接详细了解。



看名字大概能看出这些目录中代码是哪些关系,documentation目录是存放一些文档,benchmark是压测,credentials是验证,examples是例子,grpclb是负载均衡,grpclog是日志,health是服务健康检查,metadata是元数据(用户客户端给服务端传送一些特殊数据,具体可以看相关链接),naming目录是提供名字服务需要实现的接口(相当于一个dns),stats是统计信息,transport 传输层实现(主要是http2的客户端与服务端时实现, 不会详细说这个目录),还有其他一些比较无关紧要的目录就不一一介绍了。



3 客户端
在example目录中有两个比较简单的例子,就先从这里入手吧,



func main() {
// Set up a connection to the server.
//建立一个链接
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf(“did not connect: %v”, err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)



// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
//调用函数
r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf(“could not greet: %v”, err)
}
log.Printf(“Greeting: %s”, r.Message)
}
grcp.WithInsecure参数是在链接https服务端时不用检查服务端的证书(要是你相信服务端就不用检查).Dial函数对服务端建立一个连接, grpc.Dial函数:



func DialContext(ctx context.Context, target string, opts …DialOption) (conn ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[Address]
addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
default:
}



if err != nil {
cc.Close()
} }()


//设置grpc的各种选项
for _, opt := range opts {
opt(&cc.dopts)
}



// Set defaults.
if cc.dopts.codec == nil {
//默认用protobuf编解码
cc.dopts.codec = protoCodec{}
}
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig
}
creds := cc.dopts.copts.TransportCredentials
//验证信息
if creds != nil && creds.Info().ServerName != “” {
cc.authority = creds.Info().ServerName
} else {
colonPos := strings.LastIndex(target, “:”)
if colonPos == -1 {
colonPos = len(target)
}
cc.authority = target[:colonPos]
}
var ok bool
waitC := make(chan error, 1)
//启动一个goroutine启动名字服务器(类似dns)
go func() {
var addrs []Address
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
// 如果没设置负载均衡器,则直接连接
addrs = append(addrs, Address{Addr: target})
} else {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
}
config := BalancerConfig{
DialCreds: credsClone,
}
//启动负载均衡服务
if err := cc.dopts.balancer.Start(target, config); err != nil {
waitC <- err
return
}
ch := cc.dopts.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
addrs = append(addrs, Address{Addr: target})
} else {
addrs, ok = <-ch
if !ok || len(addrs) == 0 {
waitC <- errNoAddr
return
}
}
}
for _, a := range addrs {
//给每个地址一个conn,连接池
if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
return
}
}
close(waitC)
}()
var timeoutCh <-chan time.Time
if cc.dopts.timeout > 0 {
timeoutCh = time.After(cc.dopts.timeout)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-waitC:
if err != nil {
return nil, err
}
case <-timeoutCh:
return nil, ErrClientConnTimeout
}
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
return cc, nil
}
通过dial这个函数,grpc已经建立了到服务端的连接,启动了自定义负载平衡(如果有的话). pb.NewGreeterClient这行代码是通过protoc工具自动生成的,它包一个grpc连接包裹在一个struct内方便调用生成的客户端grpc调用代码。接下来grpc客户端调用SayHello向服务器发送rpc请求。



func (c greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts …grpc.CallOption) (HelloReply, error) {
out := new(HelloReply)
//调用实际的发送请求函数
err := grpc.Invoke(ctx, “/helloworld.Greeter/SayHello”, in, out, c.cc, opts…)
if err != nil {
return nil, err
}
return out, nil
}



//最后主要是invoke函数
func invoke(ctx context.Context, method string, args, reply interface{}, cc ClientConn, opts …CallOption) (e error) {
c := defaultCallInfo
for _, o := range opts {
//调用之前的hook
if err := o.before(&c); err != nil {
return toRPCErr(err)
}
}
defer func() {
for _, o := range opts {
//执行完后的hook
o.after(&c)
}
}()
//trace相关代码
if EnableTracing {
c.traceInfo.tr = trace.New(“grpc.Sent.”+methodFamily(method), method)
defer c.traceInfo.tr.Finish()
c.traceInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
}
c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
defer func() {
if e != nil {
c.traceInfo.tr.LazyLog(&fmtStringer{“%v”, []interface{}{e}}, true)
c.traceInfo.tr.SetError()
}
}()
}
//统计相关代码
if stats.On() {
ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
FailFast: c.failFast,
}
stats.HandleRPC(ctx, begin)
}
defer func() {
//结束后的统计相关代码
if stats.On() {
end := &stats.End{
Client: true,
EndTime: time.Now(),
Error: e,
}
stats.HandleRPC(ctx, end)
}
}()
topts := &transport.Options{
Last: true,
Delay: false,
}
for {
var (
err error
t transport.ClientTransport
stream *transport.Stream
// Record the put handler from Balancer.Get(…). It is called once the
// RPC has completed or failed.
put func()
)
// TODO(zhaoq): Need a formal spec of fail-fast.
//传输层的配置
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
}
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
//得到传输成连接,在http2中一个传输单位是一个流。
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := err.(
rpcError); ok {
return err
}
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return Errorf(codes.Unavailable, “%v”, err)
}
continue
}
// All the other errors are treated as Internal errors.
return Errorf(codes.Internal, “%v”, err)
}
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
// 发送请求
stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
if err != nil {
if put != nil {
put()
put = nil
}
// Retry a non-failfast RPC when
// i) there is a connection error; or
// ii) the server started to drain before this RPC was initiated.
// 在这两种情况下重试,1 链接错误 2 在rpc初始化之前服务端已经开始服务
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
return toRPCErr(err)
}
//收消息
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
return toRPCErr(err)
}
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
}
//关闭一个http2流
t.CloseStream(stream, nil)
if put != nil {
put()
put = nil
}
//Errorf会判断返回十分ok
return Errorf(stream.StatusCode(), “%s”, stream.StatusDesc())
}
}
在这个函数最主要是两个函数,一个是sendRequest,一个是recvResponse,首先看看sendRequest函数:



func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
// 创建一个http2流
stream, err := t.NewStream(ctx, callHdr)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
// If err is connection error, t will be closed, no need to close stream here.
if _, ok := err.(transport.ConnectionError); !ok {
t.CloseStream(stream, err)
}
}
}()
var (
cbuf *bytes.Buffer
outPayload *stats.OutPayload
)
//压缩不为空
if compressor != nil {
cbuf = new(bytes.Buffer)
}
//统计
if stats.On() {
outPayload = &stats.OutPayload{
Client: true,
}
}
//编码并压缩数据
outBuf, err := encode(codec, args, compressor, cbuf, outPayload)
if err != nil {
return nil, Errorf(codes.Internal, “grpc: %v”, err)
}
//写入流
err = t.Write(stream, outBuf, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
stats.HandleRPC(ctx, outPayload)
}
// t.NewStream(…) could lead to an early rejection of the RPC (e.g., the service/method
// does not exist.) so that t.Write could get io.EOF from wait(…). Leave the following
// recvResponse to get the final status.
if err != nil && err != io.EOF {
return nil, err
}
// Sent successfully.
return stream, nil
}
可以看到这个函数相当简单,做了两件事情,编码压缩数据并发送.再来看看recvResponse函数:



func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// Try to acquire header metadata from the server if there is any.
defer func() {
if err != nil {
if _, ok := err.(transport.ConnectionError); !ok {
t.CloseStream(stream, err)
}
}
}()
c.headerMD, err = stream.Header()
if err != nil {
return
}
p := &parser{r: stream}
var inPayload *stats.InPayload
if stats.On() {
inPayload = &stats.InPayload{
Client: true,
}
}
for {
//一直读到流关闭
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {
if err == io.EOF {
break
}
return
}
}
if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
// Fix the order if necessary.
stats.HandleRPC(ctx, inPayload)
}
c.trailerMD = stream.Trailer()
return nil
}



func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {
//接受数据
pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
if inPayload != nil {
inPayload.WireLength = len(d)
}
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
return err
}
if pf == compressionMade {
//解压
d, err = dc.Do(bytes.NewReader(d))
if err != nil {
return Errorf(codes.Internal, “grpc: failed to decompress the received message %v”, err)
}
}
if len(d) > maxMsgSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return Errorf(codes.Internal, “grpc: received a message of %d bytes exceeding %d limit”, len(d), maxMsgSize)
}
//数据解码
if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, “grpc: failed to unmarshal the received message %v”, err)
}
if inPayload != nil {
inPayload.RecvTime = time.Now()
inPayload.Payload = m
// TODO truncate large payload.
inPayload.Data = d
inPayload.Length = len(d)
}
return nil
}
这里可以看到一个recvRespon可能会处理多个返回,但是确实在同一个for循环中处理的,有点奇怪。客户端代码大概就是这个流程。代码来说不算太复杂。(主要不钻进http2的实现,刚开始我就去看http2,一头雾水) 其中还有重要的地方就是负载均衡,通过它我们可以根据算法自动选择要连接的ip跟地址,还有验证的使用,放到下一篇吧



4 相关链接
https://github.com/grpc/grpc/blob/master/doc/load-balancing.md 负载均衡
https://www.gitbook.com/book/ye11ow/http2-explained/details 介绍http2的书籍,写的非常好
http://www.grpc.io/docs/guides/concepts.html#metadata metadata介绍,在源码的Documentation目录有metadata的详细介绍



golang版本grpc服务端



2 注册流程
想象一个rpc的流程, 客户端指明一个函数的函数名跟参数,服务端找到这个函数,然后把参数应用到这个函数上。所以一个注册服务流程,就是告诉grpc让收到一个rpc调用的时候,如何找到我们的函数。看一个最简单的服务端程序.



type server struct{}
func (s server) SayHello(ctx context.Context, in *pb.HelloRequest) (pb.HelloReply, error) {
return &pb.HelloReply{Message: “Hello “ + in.Name}, nil
}
func main() {
lis, err := net.Listen(“tcp”, port)
if err != nil {
log.Fatalf(“failed to listen: %v”, err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf(“failed to serve: %v”, err)
}
}
一般的流程是先定义个proto文件,然后根据protoc工具生成go的grpc代码,这里的SayHello函数是你proto文件中声明的函数。然后你实现你声明的接口,然后再注册到server中,就可以供客户端调用了。 在grpc注册的过程中有几个概念:



Server
grpc Service
ServiceDesc
自己的service
自己的service很简单,就是上面的server结构体,这个结构实现了相应的interface。servicedesc相当于是一个根据proto文件来描述你server需要实现的功能的结构。grpc service, 你proto文件可能会声明多个service,你的Server也可以注册多个service。Server 原则上来说只需要一个server,一个server下有多个service,一个service下有多个方法。 我们看下RegisterGreeterServer这个方法



/描述一个service具有哪些方法
var _Greeter_serviceDesc = grpc.ServiceDesc{
ServiceName: “helloworld.Greeter”,
HandlerType: (GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: “SayHello”,
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: “helloworld.proto”,
}
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
s.RegisterService(&_Greeter_serviceDesc, srv)
}
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
//判断是否实现相应的接口
if !st.Implements(ht) {
grpclog.Fatalf(“grpc: Server.RegisterService found the handler of type %v that does not satisfy %v”, st, ht)
}
s.register(sd, ss)
}
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf(“RegisterService(%q)”, sd.ServiceName)
if _, ok := s.m[sd.ServiceName]; ok {
grpclog.Fatalf(“grpc: Server.RegisterService found duplicate service registration for %q”, sd.ServiceName)
}
//初始化一个服务
srv := &service{
server: ss,
md: make(map[string]
MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
//根据名字映射方法
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
把服务注册到server中
s.m[sd.ServiceName] = srv
}
其实根据这个注册名字,我们已经能够大概猜到当客户端的请求到来时如何找到我们的方法。首先根据service name找到相应的service再根据方法名找到相应的方法,然后拿到我们自定义的结构调用这个方法。



3 调用流程
注册流程看完了,接下来再来看下让一个请求到达服务端的时候是如何处理。 在Server中的Serve方法中,服务端通过监听请求的到来,通过for循环不断接受到来的连接。



//去掉一些错误处理的代码后,十分简洁
func (s *Server) Serve(lis net.Listener) error {
……….



for {
rawConn, err := lis.Accept()
.............

// Start a new goroutine to deal with rawConn
// so we don't stall this Accept loop goroutine.
go s.handleRawConn(rawConn)
} } //最后通过http2创建一个stream,来收取消息


func (s *Server) serveStreams(st transport.ServerTransport) {
defer s.removeConn(st)
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New(“grpc.Recv.”+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}



func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method()
if sm != “” && sm[0] == ‘/’ {
sm = sm[1:]
}
pos := strings.LastIndex(sm, “/”)



…………
//从这里到服务和注册的方法
service := sm[:pos]
method := sm[pos+1:]
srv, ok := s.m[service]



……………..



// Unary RPC or Streaming RPC?
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}



………..



}



//接下来是最主要的处理函数
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
if stats.On() {
begin := &stats.Begin{
BeginTime: time.Now(),
}
stats.HandleRPC(stream.Context(), begin)
}
defer func() {
if stats.On() {
end := &stats.End{
EndTime: time.Now(),
}
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
stats.HandleRPC(stream.Context(), end)
}
}()
if trInfo != nil {
defer trInfo.tr.Finish()
trInfo.firstLine.client = false
trInfo.tr.LazyLog(&trInfo.firstLine, false)
defer func() {
if err != nil && err != io.EOF {
trInfo.tr.LazyLog(&fmtStringer{“%v”, []interface{}{err}}, true)
trInfo.tr.SetError()
}
}()
}
if s.opts.cp != nil {
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
stream.SetSendCompress(s.opts.cp.Type())
}
p := &parser{r: stream}
for {
pf, req, err := p.recvMsg(s.opts.maxMsgSize)
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
}



.............

if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
switch err := err.(type) {
case *rpcError:
if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
return err
default:
if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
// TODO checkRecvPayload always return RPC error. Add a return here if necessary.
}
}
var inPayload *stats.InPayload
if stats.On() {
inPayload = &stats.InPayload{
RecvTime: time.Now(),
}
}
statusCode := codes.OK
statusDesc := ""
//df函数主要用于读取http2请求并反序列化到v中(相当于一个解包的操作)
df := func(v interface{}) error {
if inPayload != nil {
inPayload.WireLength = len(req)
}
if pf == compressionMade {
var err error
req, err = s.opts.dc.Do(bytes.NewReader(req))
if err != nil {
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
}
return Errorf(codes.Internal, err.Error())
}
}
if len(req) > s.opts.maxMsgSize {
// TODO: Revisit the error code. Currently keep it consistent with
// java implementation.
statusCode = codes.Internal
statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
}
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
if inPayload != nil {
inPayload.Payload = v
inPayload.Data = req
inPayload.Length = len(req)
stats.HandleRPC(stream.Context(), inPayload)
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
return nil
}
//这些我们注册的函数
reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)

.....................

opts := &transport.Options{
Last: true,
Delay: false,
}
//发结果发送出去
if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
switch err := err.(type) {
case transport.ConnectionError:
// Nothing to do here.
case transport.StreamError:
statusCode = err.Code
statusDesc = err.Desc
default:
statusCode = codes.Unknown
statusDesc = err.Error()
}
return err
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
errWrite := t.WriteStatus(stream, statusCode, statusDesc)
if statusCode != codes.OK {
return Errorf(statusCode, statusDesc)
}
return errWrite } } grpc服务端的基本流程就是这样了,我只是看的一个基本流程,再没遇到问题时,基本上不会再看grpc源码了。目前公司项目中在使用grpc,等有了坑,我再来总结吧


4 总结
grpc中有很多我比较喜欢的点,一个是grpcsever的配置通过传递函数来修改,这样又不用取回之前的设置,又可以只修改其中一个值,比较精巧,-0- 原谅我见识不多。还有就是错误的处理,context的使用,trace的使用。 不得不吐槽一下,golang的代码看起来真不够美,虽然简单,但是写起来不够优美,源码的错误处理看起来也是乱糟糟的。


Category golang