99re热这里只有精品视频,7777色鬼xxxx欧美色妇,国产成人精品一区二三区在线观看,内射爽无广熟女亚洲,精品人妻av一区二区三区

Go 語言 gRPC 入門

2023-03-22 15:02 更新

原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch4-rpc/ch4-04-grpc.html


4.4 gRPC 入門

gRPC 是 Google 公司基于 Protobuf 開發(fā)的跨語言的開源 RPC 框架。gRPC 基于 HTTP/2 協(xié)議設(shè)計(jì),可以基于一個(gè) HTTP/2 連接提供多個(gè)服務(wù),對(duì)于移動(dòng)設(shè)備更加友好。本節(jié)將講述 gRPC 的簡(jiǎn)單用法。

4.4.1 gRPC 技術(shù)棧

Go 語言的 gRPC 技術(shù)棧如圖 4-1 所示:


圖 4-1 gRPC 技術(shù)棧

最底層為 TCP 或 Unix Socket 協(xié)議,在此之上是 HTTP/2 協(xié)議的實(shí)現(xiàn),然后在 HTTP/2 協(xié)議之上又構(gòu)建了針對(duì) Go 語言的 gRPC 核心庫。應(yīng)用程序通過 gRPC 插件生產(chǎn)的 Stub 代碼和 gRPC 核心庫通信,也可以直接和 gRPC 核心庫通信。

4.4.2 gRPC 入門

如果從 Protobuf 的角度看,gRPC 只不過是一個(gè)針對(duì) service 接口生成代碼的生成器。我們?cè)诒菊碌牡诙?jié)中手工實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的 Protobuf 代碼生成器插件,只不過當(dāng)時(shí)生成的代碼是適配標(biāo)準(zhǔn)庫的 RPC 框架的?,F(xiàn)在我們將學(xué)習(xí) gRPC 的用法。

創(chuàng)建 hello.proto 文件,定義 HelloService 接口:

syntax = "proto3";

package main;

message String {
	string value = 1;
}

service HelloService {
	rpc Hello (String) returns (String);
}

使用 protoc-gen-go 內(nèi)置的 gRPC 插件生成 gRPC 代碼:

$ protoc --go_out=plugins=grpc:. hello.proto

gRPC 插件會(huì)為服務(wù)端和客戶端生成不同的接口:

type HelloServiceServer interface {
    Hello(context.Context, *String) (*String, error)
}

type HelloServiceClient interface {
    Hello(context.Context, *String, ...grpc.CallOption) (*String, error)
}

gRPC 通過 context.Context 參數(shù),為每個(gè)方法調(diào)用提供了上下文支持。客戶端在調(diào)用方法的時(shí)候,可以通過可選的 grpc.CallOption 類型的參數(shù)提供額外的上下文信息。

基于服務(wù)端的 HelloServiceServer 接口可以重新實(shí)現(xiàn) HelloService 服務(wù):

type HelloServiceImpl struct{}

func (p *HelloServiceImpl) Hello(
    ctx context.Context, args *String,
) (*String, error) {
    reply := &String{Value: "hello:" + args.GetValue()}
    return reply, nil
}

gRPC 服務(wù)的啟動(dòng)流程和標(biāo)準(zhǔn)庫的 RPC 服務(wù)啟動(dòng)流程類似:

func main() {
    grpcServer := grpc.NewServer()
    RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))

    lis, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal(err)
    }
    grpcServer.Serve(lis)
}

首先是通過 grpc.NewServer() 構(gòu)造一個(gè) gRPC 服務(wù)對(duì)象,然后通過 gRPC 插件生成的 RegisterHelloServiceServer 函數(shù)注冊(cè)我們實(shí)現(xiàn)的 HelloServiceImpl 服務(wù)。然后通過 grpcServer.Serve(lis) 在一個(gè)監(jiān)聽端口上提供 gRPC 服務(wù)。

然后就可以通過客戶端連接 gRPC 服務(wù)了:

func main() {
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := NewHelloServiceClient(conn)
    reply, err := client.Hello(context.Background(), &String{Value: "hello"})
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(reply.GetValue())
}

其中 grpc.Dial 負(fù)責(zé)和 gRPC 服務(wù)建立連接,然后 NewHelloServiceClient 函數(shù)基于已經(jīng)建立的連接構(gòu)造 HelloServiceClient 對(duì)象。返回的 client 其實(shí)是一個(gè) HelloServiceClient 接口對(duì)象,通過接口定義的方法就可以調(diào)用服務(wù)端對(duì)應(yīng)的 gRPC 服務(wù)提供的方法。

gRPC 和標(biāo)準(zhǔn)庫的 RPC 框架有一個(gè)區(qū)別,gRPC 生成的接口并不支持異步調(diào)用。不過我們可以在多個(gè) Goroutine 之間安全地共享 gRPC 底層的 HTTP/2 連接,因此可以通過在另一個(gè) Goroutine 阻塞調(diào)用的方式模擬異步調(diào)用。

4.4.3 gRPC 流

RPC 是遠(yuǎn)程函數(shù)調(diào)用,因此每次調(diào)用的函數(shù)參數(shù)和返回值不能太大,否則將嚴(yán)重影響每次調(diào)用的響應(yīng)時(shí)間。因此傳統(tǒng)的 RPC 方法調(diào)用對(duì)于上傳和下載較大數(shù)據(jù)量場(chǎng)景并不適合。同時(shí)傳統(tǒng) RPC 模式也不適用于對(duì)時(shí)間不確定的訂閱和發(fā)布模式。為此,gRPC 框架針對(duì)服務(wù)器端和客戶端分別提供了流特性。

服務(wù)端或客戶端的單向流是雙向流的特例,我們?cè)?HelloService 增加一個(gè)支持雙向流的 Channel 方法:

service HelloService {
	rpc Hello (String) returns (String);

	rpc Channel (stream String) returns (stream String);
}

關(guān)鍵字 stream 指定啟用流特性,參數(shù)部分是接收客戶端參數(shù)的流,返回值是返回給客戶端的流。

重新生成代碼可以看到接口中新增加的 Channel 方法的定義:

type HelloServiceServer interface {
    Hello(context.Context, *String) (*String, error)
    Channel(HelloService_ChannelServer) error
}
type HelloServiceClient interface {
    Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (
        *String, error,
    )
    Channel(ctx context.Context, opts ...grpc.CallOption) (
        HelloService_ChannelClient, error,
    )
}

在服務(wù)端的 Channel 方法參數(shù)是一個(gè)新的 HelloService_ChannelServer 類型的參數(shù),可以用于和客戶端雙向通信??蛻舳说?Channel 方法返回一個(gè) HelloService_ChannelClient 類型的返回值,可以用于和服務(wù)端進(jìn)行雙向通信。

HelloService_ChannelServer 和 HelloService_ChannelClient 均為接口類型:

type HelloService_ChannelServer interface {
    Send(*String) error
    Recv() (*String, error)
    grpc.ServerStream
}

type HelloService_ChannelClient interface {
    Send(*String) error
    Recv() (*String, error)
    grpc.ClientStream
}

可以發(fā)現(xiàn)服務(wù)端和客戶端的流輔助接口均定義了 Send 和 Recv 方法用于流數(shù)據(jù)的雙向通信。

現(xiàn)在我們可以實(shí)現(xiàn)流服務(wù):

func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
    for {
        args, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                return nil
            }
            return err
        }

        reply := &String{Value: "hello:" + args.GetValue()}

        err = stream.Send(reply)
        if err != nil {
            return err
        }
    }
}

服務(wù)端在循環(huán)中接收客戶端發(fā)來的數(shù)據(jù),如果遇到 io.EOF 表示客戶端流被關(guān)閉,如果函數(shù)退出表示服務(wù)端流關(guān)閉。生成返回的數(shù)據(jù)通過流發(fā)送給客戶端,雙向流數(shù)據(jù)的發(fā)送和接收都是完全獨(dú)立的行為。需要注意的是,發(fā)送和接收的操作并不需要一一對(duì)應(yīng),用戶可以根據(jù)真實(shí)場(chǎng)景進(jìn)行組織代碼。

客戶端需要先調(diào)用 Channel 方法獲取返回的流對(duì)象:

stream, err := client.Channel(context.Background())
if err != nil {
    log.Fatal(err)
}

在客戶端我們將發(fā)送和接收操作放到兩個(gè)獨(dú)立的 Goroutine。首先是向服務(wù)端發(fā)送數(shù)據(jù):

go func() {
    for {
        if err := stream.Send(&String{Value: "hi"}); err != nil {
            log.Fatal(err)
        }
        time.Sleep(time.Second)
    }
}()

然后在循環(huán)中接收服務(wù)端返回的數(shù)據(jù):

for {
    reply, err := stream.Recv()
    if err != nil {
        if err == io.EOF {
            break
        }
        log.Fatal(err)
    }
    fmt.Println(reply.GetValue())
}

這樣就完成了完整的流接收和發(fā)送支持。

4.4.4 發(fā)布和訂閱模式

在前一節(jié)中,我們基于 Go 內(nèi)置的 RPC 庫實(shí)現(xiàn)了一個(gè)簡(jiǎn)化版的 Watch 方法?;?Watch 的思路雖然也可以構(gòu)造發(fā)布和訂閱系統(tǒng),但是因?yàn)?RPC 缺乏流機(jī)制導(dǎo)致每次只能返回一個(gè)結(jié)果。在發(fā)布和訂閱模式中,由調(diào)用者主動(dòng)發(fā)起的發(fā)布行為類似一個(gè)普通函數(shù)調(diào)用,而被動(dòng)的訂閱者則類似 gRPC 客戶端單向流中的接收者?,F(xiàn)在我們可以嘗試基于 gRPC 的流特性構(gòu)造一個(gè)發(fā)布和訂閱系統(tǒng)。

發(fā)布訂閱是一個(gè)常見的設(shè)計(jì)模式,開源社區(qū)中已經(jīng)存在很多該模式的實(shí)現(xiàn)。其中 docker 項(xiàng)目中提供了一個(gè) pubsub 的極簡(jiǎn)實(shí)現(xiàn),下面是基于 pubsub 包實(shí)現(xiàn)的本地發(fā)布訂閱代碼:

import (
    "github.com/moby/moby/pkg/pubsub"
)

func main() {
    p := pubsub.NewPublisher(100*time.Millisecond, 10)

    golang := p.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if strings.HasPrefix(key, "golang:") {
                return true
            }
        }
        return false
    })
    docker := p.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if strings.HasPrefix(key, "docker:") {
                return true
            }
        }
        return false
    })

    go p.Publish("hi")
    go p.Publish("golang: https://golang.org")
    go p.Publish("docker: https://www.docker.com/")
    time.Sleep(1)

    go func() {
        fmt.Println("golang topic:", <-golang)
    }()
    go func() {
        fmt.Println("docker topic:", <-docker)
    }()

    <-make(chan bool)
}

其中 pubsub.NewPublisher 構(gòu)造一個(gè)發(fā)布對(duì)象,p.SubscribeTopic() 可以通過函數(shù)篩選感興趣的主題進(jìn)行訂閱。

現(xiàn)在嘗試基于 gRPC 和 pubsub 包,提供一個(gè)跨網(wǎng)絡(luò)的發(fā)布和訂閱系統(tǒng)。首先通過 Protobuf 定義一個(gè)發(fā)布訂閱服務(wù)接口:

service PubsubService {
	rpc Publish (String) returns (String);
	rpc Subscribe (String) returns (stream String);
}

其中 Publish 是普通的 RPC 方法,Subscribe 則是一個(gè)單向的流服務(wù)。然后 gRPC 插件會(huì)為服務(wù)端和客戶端生成對(duì)應(yīng)的接口:

type PubsubServiceServer interface {
    Publish(context.Context, *String) (*String, error)
    Subscribe(*String, PubsubService_SubscribeServer) error
}
type PubsubServiceClient interface {
    Publish(context.Context, *String, ...grpc.CallOption) (*String, error)
    Subscribe(context.Context, *String, ...grpc.CallOption) (
        PubsubService_SubscribeClient, error,
    )
}

type PubsubService_SubscribeServer interface {
    Send(*String) error
    grpc.ServerStream
}

因?yàn)?Subscribe 是服務(wù)端的單向流,因此生成的 PubsubService_SubscribeServer 接口中只有 Send 方法。

然后就可以實(shí)現(xiàn)發(fā)布和訂閱服務(wù)了:

type PubsubService struct {
    pub *pubsub.Publisher
}

func NewPubsubService() *PubsubService {
    return &PubsubService{
        pub: pubsub.NewPublisher(100*time.Millisecond, 10),
    }
}

然后是實(shí)現(xiàn)發(fā)布方法和訂閱方法:

func (p *PubsubService) Publish(
    ctx context.Context, arg *String,
) (*String, error) {
    p.pub.Publish(arg.GetValue())
    return &String{}, nil
}

func (p *PubsubService) Subscribe(
    arg *String, stream PubsubService_SubscribeServer,
) error {
    ch := p.pub.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if strings.HasPrefix(key,arg.GetValue()) {
                return true
            }
        }
        return false
    })

    for v := range ch {
        if err := stream.Send(&String{Value: v.(string)}); err != nil {
            return err
        }
    }

    return nil
}

這樣就可以從客戶端向服務(wù)器發(fā)布信息了:

func main() {
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := NewPubsubServiceClient(conn)

    _, err = client.Publish(
        context.Background(), &String{Value: "golang: hello Go"},
    )
    if err != nil {
        log.Fatal(err)
    }
    _, err = client.Publish(
        context.Background(), &String{Value: "docker: hello Docker"},
    )
    if err != nil {
        log.Fatal(err)
    }
}

然后就可以在另一個(gè)客戶端進(jìn)行訂閱信息了:

func main() {
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := NewPubsubServiceClient(conn)
    stream, err := client.Subscribe(
        context.Background(), &String{Value: "golang:"},
    )
    if err != nil {
        log.Fatal(err)
    }

    for {
        reply, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Fatal(err)
        }

        fmt.Println(reply.GetValue())
    }
}

到此我們就基于 gRPC 簡(jiǎn)單實(shí)現(xiàn)了一個(gè)跨網(wǎng)絡(luò)的發(fā)布和訂閱服務(wù)。



以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)