GO实现高可用高并发分布式系统:使用gRPC实现一对多和多对多交互

在上一节我们使用gRPC实现了客户端和服务端的一对一通讯,也就是客户端向服务端发出一个请求,服务端返回一个结果。但是在很多场景下可能需要客户端向服务端连续发送多个请求后,服务端才能进行处理然后返回一个结果,例如客户端向服务端发送多个订单号,让服务端对订单号进行记录,然后服务端把所有订单号记录后返回结果;或者是客户端发送一个订单号查询所有大于给定订单号的交易记录,然后服务端返回满足条件的十几条记录等。

我们首先看看服务端给客户端返回多条记录的情形。在gRPC中,可以连续发送多条数据的对象叫stream,该对象支持异步发送,假设客户端要查询所有订单号大于10的交易记录,假设在服务端存储了满足条件的记录有20条,那么服务端可以先返回5条,等5分钟后再返回10条,然后等20分钟后再返回5条,因此客户端在接收记录时需要做相应的异步处理。

我们首先修改proto文件如下:

ervice OrderManagement {
    rpc getOrder(google.protobuf.StringValue) returns(Order);
    rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
}

上面代码中的stream表明,当客户端通过searchOrders接口向服务器发出请求时,它需要通过stream对象来获取一系列从服务器返回的Order数据。按照上一节的方法再次编译proto文件后,我们看看它内容的改变,使用searchOrders作为关键字在生成的pb.go文件中查询我们可以看到如下内容:

type OrderManagementClient interface {
	GetOrder(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (*Order, error)

	SearchOrders(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (OrderManagement_SearchOrdersClient, error)

}
。。。

type OrderManagement_SearchOrdersClient interface {
	Recv() (*Order, error)
	grpc.ClientStream
}

type orderManagementSearchOrdersClient struct {
	grpc.ClientStream
}

func (x *orderManagementSearchOrdersClient) Recv() (*Order, error) {
	m := new(Order)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

这段代码表明客户端在调用searchOrders接口时它会获得一个名为OrderManagement_SearchOrdersClient的对象,这个对象实现了一个接口叫Recv,我们不难猜测到时候客户端需要调用Recv()来接收服务端返回的一系列Order数据。继续往下查可以看到如下代码:

// OrderManagementServer is the server API for OrderManagement service.
type OrderManagementServer interface {
	GetOrder(context.Context, *wrappers.StringValue) (*Order, error)
	
	SearchOrders(*wrappers.StringValue, OrderManagement_SearchOrdersServer) error

}

。。。。

type OrderManagement_SearchOrdersServer interface {
	Send(*Order) error
	grpc.ServerStream
}

type orderManagementSearchOrdersServer struct {
	grpc.ServerStream
}

func (x *orderManagementSearchOrdersServer) Send(m *Order) error {
	return x.ServerStream.SendMsg(m)
}

上面代码代码表明,服务端在实现searchOrders接口时需要使用一个名为OrderManagement_SearchOrdersServer的对象,它用于一个接口叫Send,我们不难猜测服务端将调用这个接口给客户端发送一系列Order数据,我们首先看服务端代码的实现,在server/main.go中增加代码如下:

func (s *server) SearchOrders(searchQuery *wrappers.StringValue, 
	stream pb.OrderManagement_SearchOrdersServer) error {
		for key, order := range orderMap {
			log.Print(key, order)
			for _, itemStr := range order.Items {
				log.Print(itemStr)
				if strings.Contains(itemStr, searchQuery.Value) {
					err := stream.Send(&order)
					if err != nil {
						return fmt.Errorf("error sending message to stream: %v", err)
					}
					log.Print("Matching Order Found: " + key)
				    break
				}
				
			}
		}
		return nil //返回nil,gRPC会关闭服务器发往客户端的数据管道
	}

服务端通过实现SearchOrders接口来执行业务逻辑,其中stream的类型为OrderManagement_SearchOrdersServer,它有gRPC框架传给我们,通过前面的分析我们知道它有接口Send, 函数的输入参数searchQuery其实就是客户端发送过来的订单号字符串,代码从该数据结构拿到订单号后,从数据存储中进行查询,把所有查到的满足条件的Order数据通过Send发送给客户端。这里需要注意的是,客户端在接收数据过程中可能由于多种原因中断连接,这时服务端调用Send就会返回错误,同时还需要注意的是当服务端发送完所有数据后,一定要return nil,这样gRPC才会把发送管道给关闭调。

同理我们看客户端的实现,在client/main.go的main函数中添加如下代码:


	searchStream, _ := client.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})
	//如果server 使用stream传输结果,客户端需要使用Recv()接收多个返回
	for {
		searchOrder, err := searchStream.Recv()
		if err == io.EOF {
			log.Print("EOF")
			break
		}
		if err == nil {
			log.Print("Search result: ", searchOrder)
		}
	}

从前面代码查询可以看到,客户端调用SearchOrder时会返回一个orderManagementSearchOrdersClient对象,它实现了接口Recv()用来接收服务端发送来的一连串数据,所以在上面代码实现中,我们在for循环中调用Recv()接口不断接收服务端发送的数据,如果数据发送完了,前面服务端通过return nil断掉连接后,客户端就会在调用Recv时得到io.EOF错误,这是就可以中断对Recv()的调用。

以上是客户端发送一个请求,服务端返回一系列结果,我们看看反过来,客户端发送一系列请求,服务端返回一个结果,首先还是修改proto文件,增加一个接口定义:

service OrderManagement {
    rpc getOrder(google.protobuf.StringValue) returns(Order);
  
    rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
    rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
}

updateOrders就是新增加的接口,注意到它对应的输入参数使用了stream来修饰,也就是说客户端会给服务端连续发送一系列Order数据,服务端处理后只返回一个StringValue结构,我们可以使用前面的搜索方法在新编译后的pb.go文件里查询新增加的接口,同样道理,服务端在实现该接口是,也是在一个for循环中使用Recv接口来获取客户端发送的一系列数据,在server/main.go中添加代码如下:

func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
	ordersStr := "Updated Order IDs: "
	for {
		order, err := stream.Recv()
		if err == io.EOF {
			//通知客户端不用继续发送
			return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed" + ordersStr})
		}

		orderMap[order.Id] = *order 
	    log.Printf("Order ID ", order.Id, ": Updated")
		ordersStr += order.Id + ", "
	}
}

代码的实现逻辑跟前面客户端实现的服务请求逻辑一样,相当于服务端和客户端的角色颠倒了一下。这里需要注意的是服务端如何给客户端返回结果,代码中调用了SendAndClose,它把返回结果传输给客户端的同时将连接关闭,于是客户端就不能继续再给服务端发送数据。我们看看客户端的实现,在client/main.go中添加代码如下:

updOrder1 := pb.Order{Id: "102", Items:[]string{"Google Pixel 3A", "Google Pixel Book"}, Destination:"Mountain View, CA", Price:1100.00}
	updOrder2 := pb.Order{Id: "103", Items:[]string{"Apple Watch S4", "Mac Book Pro", "iPad Pro"}, Destination:"San Jose, CA", Price:2800.00}
	updOrder3 := pb.Order{Id: "104", Items:[]string{"Google Home Mini", "Google Nest Hub", "iPad Mini"}, Destination:"Mountain View, CA", Price:2200.00}

	updateStream, err := client.UpdateOrders(ctx)
	if err != nil {
		log.Fatalf("%v.UpdateOrders(_) = , %v", client, err)
	}

	if err := updateStream.Send(&updOrder1); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
	}

	if err := updateStream.Send(&updOrder2); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
	}

	if err := updateStream.Send(&updOrder3); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder3, err)
	}

	updateRes, err := updateStream.CloseAndRecv()
	if err != nil {
		log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
	}
	log.Printf("Update orders res: %s", updateRes)

客户端先是构造一系列Order数据然后分别调用多次Send传递给服务端,如果客户端没有多余数据要传输后,它调用CloseAndRecv(),这个函数会让服务端的Recv()返回io.EOF错误,然后客户端阻塞等待服务端将处理结果返回。

最后我们看客户端给服务端发送一系列数据,然后服务端返回一系列结果给客户端的情况。假设客户端给服务端发送了一系列订单信息,服务端收到订单信息后,把收货地址相同的货物信息合在一起发送给客户端,我们用shipment表示收货地址相同的货物信息组合。如果客户端发送order1, order2,order3, order4 等4个订单号给服务端,其中order1 ,order3 对应货物的收货地址一样, order2, order4对应的收货地址一样,于是服务端就返回两个shipment结构,第一个对应order1, order3, 第二个对应order2, order4,我们先看proto文件的修改:

service OrderManagement {
    rpc getOrder(google.protobuf.StringValue) returns(Order);
  
    rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
    rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
    rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);
}

message Order {
    string id = 1;
    repeated string items = 2;
    string description = 3;
    float price = 4;
    string destination = 5;
}

message CombinedShipment {
    string id = 1;
    string status = 2;
    repeated Order orderList = 3;
}

我们先看服务端的实现,在server/main.go中添加如下代码:

func (s *server) ProcessOrder(stream pb.OrderManagement_ProcessOrdersServer) error {
	batchMarker := 1
	var combinedShipmentMap = make(map[string]pb.CombinedShipment)
	for {
		orderId, err := stream.Recv()
		log.Printf("Reading Proc order: %s", orderId)
		if err == io.EOF {
			log.Printf("EOF: %s", orderId)
			for _, shipment := range combinedShipmentMap {
				if err := stream.Send(&shipment); err != nil {
					return err 
				}
			}
			return nil //返回nil,gRPC框架会关闭调server发送给客户端的管道
		}
		if err != nil {
			log.Println(err)
			return err 
		}
		destination := orderMap[orderId.GetValue()].Destination 
		shipment, found := combinedShipmentMap[destination]
		if found {
			ord := orderMap[orderId.GetValue()]
			shipment.OrdersList = append(shipment.OrderList, &ord)
			combinedShipmentMap[destination] = shipment 
		} else {
			comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!",}
			ord := orderMap[orderId.GetValue()]
			comShip.OrdersList = append(shipment.OrdersList, &ord)
			combinedShipmentMap[destination] = comShip 
			log.Print(len(comShip.OrdersList), comShip.GetId())
		}

		if batchMarker == orderBatchSize {
			for _, comb := range combinedShipmentMap {
				log.Printf("Shipping: %v -> %v", comb.Id, len(comb.OrdersList))
				if err := stream.Send(&comb); err != nil {
					return err 
				}
			}
			batchMarker = 0
			combinedShipmentMap = make(map[string]pb.CombinedShipment)
		} else {
			batchMarker++
		}
	}
}

上面代码实现我们只需要注意几点,首先它使用一个stream对象来完成两个功能,一个功能是调用Recv()来接收客户端发送的多个数据,然后同样是这个对象,继续调用它的Send接口给客户端发送多个数据,也就是一个stream对象既负责接收客户端发送的一系列数据,又负责将服务端的一系列处理结果发送给客户端,把握这一点就行,其他那些业务逻辑无关紧要。

我们再看看客户端的实现,在client/main.go中添加如下代码:

func main() {
。。。
	channel := make(chan struct{})
	go asncClientBidirectionalRPC(streamProcOrder, channel)
	time.Sleep(time.Milliscond * 1000)

	if err := streamProcOrder.Send(&wrapper.StringValue{Value: "101"}); err != nil {
		log.Fatalf("%v.Send(%v) = %v", client, "101", err)
	}

	if err := streamProcOrder.CloseSend(); err != nil {
		log.Fatal(err)
	}
	channel <- struct{}{} 
	
}

func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
	for {
		combinedShipment, errorProcOrder := streamProcOrder.Recv()
		if errProcOrder == io.EOF {
			break 
		}
		log.Printf("Combined shipment: ", combinedShipment.OrdersList)
	}
	<-c
}

上面代码实现中有一个关键点需要把握,客户端也是通过一个stream对象来完成数据的发送和接收,同时我们要特别注意到,同一个stream对象发送和接收完全可以在异步的条件下同时进行,所有上面代码在主函数main里通过Send发送请求,然后扔出一个goroutine异步接收服务端发送回来的数据,虽然发送和接收同时进行但客户端不用加锁,也就是gRPC框架保证了发送和接收在异步情况下业务逻辑依然不会出错。

相关代码从上一节的github路径可以获取。

  • 3
    点赞
  • 3
    收藏
    觉得还不错? 一键收藏
  • 0
    评论
## 用Python构建分布式高并发RPC框架 ------ ### 一、为什么要写一个RPC框架? > + 不是想要造轮子,Dubbo、gRPC、Thift这些轮子已经非常好用了 > + RPC在微服务分布式系统、Web服务器方面应用太广泛了,需要对底层通信过程有基本认识 > + Nignx、Hadoop、K8s、Tensorflow等系统或软件的底层源码大多是关于RPC的 > + 可以更加熟悉地使用已有的RPC框架,甚至考虑如何优化已有的框架 ### 二、为什么要用Python来写? > + 一个高性能的RPC框架是不可能使用Python来完成的,Python的速度太感人了 > + 以学习基本原理为目的时,不必在乎过多细节,Python封装好的类库屏蔽掉很多细节 > + 实现同样的功能,Python的代码量相较于C/C++要少很多,减少编程难度 ### 三、这个是原创的吗? > + 永远站在巨人的肩膀之上,学习他人的代码,消化吸收,据为己用 ### 四、划重点 > + 分布式和高并发是如何实现的?Prefork异步模型+Zookeeper服务发现 ### 五、提供了什么RPC服务? > + 客户端请求服务端计算一个整数值的斐波那契数列值,当然也可以自行定义 ### 六、项目的组成部分 -------- 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 --------

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值