做外贸单网上都做的那些网站长沙百度推广排名优化
grpc四种数据流
简介
1.简单模式
这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这和大家平时熟悉的rpc没什么区别,所以不在详细介绍
2.服务端数据流模式
这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端。
3.客户端数据流模式
与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网向服务器报送数据。
4.双向数据流模式
顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人
proto文件代码编写
哪一方需要源源不断的返回数据,就在那一端的前面添加
stream
关键字,表示流
syntax = "proto3";option go_package="../../common/stream/proto/v1";
service Greeter {rpc GetStream(StreamReqData) returns (stream StreamResData); //服务端流模式rpc PutStream(stream StreamReqData) returns (StreamResData); //客户端流模式rpc AllStream(stream StreamReqData) returns (stream StreamResData); //双向流模式
}message StreamReqData {string data = 1;
}message StreamResData {string data = 1;
}
服务端代码
-
除三种流模式实现代码外整体代码与之前的普通模式无异
-
服务端数据流模式:
grpc
生成的函数原型为:-
func GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error
-
proto.Greeter_GetStreamServer
:使用方式类似于socket
网络编程
-
-
客户端数据流模式:
grpc
生成的函数原型为:-
func PutStream(cliStr proto.Greeter_PutStreamServer) error
-
-
双向数据流模式:
grpc
生成的函数原型为:-
func AllStream(allStr proto.Greeter_AllStreamServer) error
-
package mainimport ("OldPackageTest/stream_grpc_test/proto""fmt""google.golang.org/grpc""net""sync""time"
)const PORT = ":50052"type server struct {
}func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {i := 0for {i++_ = res.Send(&proto.StreamResData{Data: fmt.Sprintf("%v", time.Now().Unix()),})time.Sleep(time.Second)if i > 10 {break}}return nil
}func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {for {if a, err := cliStr.Recv(); err != nil {fmt.Println(err)break} else {fmt.Println(a.Data)}}return nil
}func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {wg := sync.WaitGroup{}wg.Add(2)//启动两个协程,一发一收go func() {defer wg.Done()for {data, _ := allStr.Recv()fmt.Println("收到客户端消息:" + data.Data)}}()go func() {defer wg.Done()for {_ = allStr.Send(&proto.StreamResData{Data: "我是服务器"})time.Sleep(time.Second)}}()wg.Wait()return nil
}func main() {lis, err := net.Listen("tcp", PORT)if err != nil {panic(err)}s := grpc.NewServer()proto.RegisterGreeterServer(s, &server{})err = s.Serve(lis)if err != nil {panic(err)}
}
客户端代码
客户端实现代码简单,大家自己阅读即可
package mainimport ("context""fmt""sync""time""google.golang.org/grpc""OldPackageTest/stream_grpc_test/proto"
)func main() {conn, err := grpc.Dial("localhost:50052", grpc.WithInsecure())if err != nil {panic(err)}defer conn.Close()//服务端流模式c := proto.NewGreeterClient(conn)res, _ := c.GetStream(context.Background(), &proto.StreamReqData{Data: "go"})for {a, err := res.Recv() //如果大家懂socket编程的话就明白 send recvif err != nil {fmt.Println(err)break}fmt.Println(a.Data)}//客户端流模式putS, _ := c.PutStream(context.Background())i := 0for {i++_ = putS.Send(&proto.StreamReqData{Data: fmt.Sprintf("grpc %d", i),})time.Sleep(time.Second)if i > 10 {break}}//双向流模式allStr, _ := c.AllStream(context.Background())wg := sync.WaitGroup{}wg.Add(2)go func() {defer wg.Done()for {data, _ := allStr.Recv()fmt.Println("收到客户端消息:" + data.Data)}}()go func() {defer wg.Done()for {_ = allStr.Send(&proto.StreamReqData{Data: "go"})time.Sleep(time.Second)}}()wg.Wait()
}