From 0e2050ad242f2b8346ca78556f557947c24db507 Mon Sep 17 00:00:00 2001 From: Yang Yang <962032265@qq.com> Date: Tue, 2 Jul 2024 11:00:49 +0800 Subject: [PATCH] Add Loadbalance trait (#199) * refact(ld): add LoadBalancer trait, impl random ld * refact(loadbalance): update ld api design * style: cargo fmt * example(greet): update client * chore: add tag field * style: cargo fmt * refact(dubbo): use target_family * style: cargo check --- application.yaml | 1 + dubbo/src/config/config.rs | 3 +- dubbo/src/config/service.rs | 4 + dubbo/src/extension/invoker_extension.rs | 44 +++--- dubbo/src/loadbalancer/mod.rs | 104 ++++++++++++-- dubbo/src/loadbalancer/random.rs | 42 ++++++ dubbo/src/triple/transport/connector/mod.rs | 4 +- dubbo/src/triple/transport/listener/mod.rs | 4 +- .../echo/src/generated/grpc.examples.echo.rs | 127 +++++------------- examples/greeter/application.yaml | 1 + examples/greeter/src/greeter/client.rs | 18 ++- 11 files changed, 218 insertions(+), 134 deletions(-) create mode 100644 dubbo/src/loadbalancer/random.rs diff --git a/application.yaml b/application.yaml index bec29a67..3096bcf6 100644 --- a/application.yaml +++ b/application.yaml @@ -13,6 +13,7 @@ dubbo: provider: services: GreeterProvider: + tag: red version: 1.0.0 group: test protocol: triple diff --git a/dubbo/src/config/config.rs b/dubbo/src/config/config.rs index 3c606833..dd196101 100644 --- a/dubbo/src/config/config.rs +++ b/dubbo/src/config/config.rs @@ -119,7 +119,8 @@ impl RootConfig { .group("test".to_string()) .version("1.0.0".to_string()) .interface("helloworld.Greeter".to_string()) - .protocol("triple".to_string()), + .protocol("triple".to_string()) + .tag("read".to_string()), ); self.protocols.insert( "triple".to_string(), diff --git a/dubbo/src/config/service.rs b/dubbo/src/config/service.rs index 8a1f1910..e8834440 100644 --- a/dubbo/src/config/service.rs +++ b/dubbo/src/config/service.rs @@ -23,6 +23,7 @@ pub struct ServiceConfig { pub group: String, pub protocol: String, pub interface: String, + pub tag: String, } impl ServiceConfig { @@ -41,4 +42,7 @@ impl ServiceConfig { pub fn protocol(self, protocol: String) -> Self { Self { protocol, ..self } } + pub fn tag(self, tag: String) -> Self { + Self { tag, ..self } + } } diff --git a/dubbo/src/extension/invoker_extension.rs b/dubbo/src/extension/invoker_extension.rs index 8d59e5b4..3373bd2b 100644 --- a/dubbo/src/extension/invoker_extension.rs +++ b/dubbo/src/extension/invoker_extension.rs @@ -40,30 +40,32 @@ pub trait Invoker { async fn url(&self) -> Result; } -pub enum CallType { - Unary, - ClientStream, - ServerStream, - BiStream, -} +// pub enum CallType { +// Unary, +// ClientStream, +// ServerStream, +// BiStream, +// } pub struct GrpcInvocation { - service_name: String, - method_name: String, - arguments: Vec, - attachments: HashMap, - call_type: CallType, + // service_name: String, + // method_name: String, + // arguments: Vec, + // attachments: HashMap, + // call_type: CallType, } -pub struct Argument { - name: String, - value: Box> + Send + 'static>, -} +// pub struct Argument { +// name: String, +// value: Box> + Send + 'static>, +// } +#[allow(dead_code)] pub trait Serializable { fn serialize(&self, serialization_type: String) -> Result; } +#[allow(dead_code)] pub trait Deserializable { fn deserialize(&self, bytes: Bytes, deserialization_type: String) -> Result where @@ -138,8 +140,8 @@ pub mod proxy { } } - impl From> for InvokerProxy { - fn from(invoker: Box) -> Self { + impl From> for InvokerProxy { + fn from(invoker: Box) -> Self { let (tx, mut rx) = tokio::sync::mpsc::channel(64); tokio::spawn(async move { while let Some(opt) = rx.recv().await { @@ -220,7 +222,11 @@ impl InvokerExtensionLoader { type InvokerExtensionConstructor = fn( Url, ) -> Pin< - Box, StdError>> + Send + 'static>, + Box< + dyn Future, StdError>> + + Send + + 'static, + >, >; pub(crate) struct InvokerExtensionFactory { constructor: InvokerExtensionConstructor, @@ -275,7 +281,7 @@ where impl ExtensionMetaInfo for InvokerExtension where T: Invoker + Send + 'static, - T: Extension>, + T: Extension>, { fn name() -> String { T::name() diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs index 6f4ec89b..d49ec5df 100644 --- a/dubbo/src/loadbalancer/mod.rs +++ b/dubbo/src/loadbalancer/mod.rs @@ -14,26 +14,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use crate::StdError; + +pub mod random; + use futures_core::future::BoxFuture; +use std::error::Error; +use tokio::time::Duration; use tower::{discover::ServiceList, ServiceExt}; use tower_service::Service; +use tracing::debug; use crate::{ codegen::RpcInvocation, + invocation::Metadata, invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker}, + loadbalancer::random::RandomLoadBalancer, param::Param, + protocol::triple::triple_invoker::TripleInvoker, svc::NewService, + StdError, }; -use crate::protocol::triple::triple_invoker::TripleInvoker; - pub struct NewLoadBalancer { inner: N, } #[derive(Clone)] -pub struct LoadBalancer { +pub struct LoadBalancerSvc { inner: S, // Routes service } @@ -53,17 +60,17 @@ where // NewRoutes N: NewService, { - type Service = LoadBalancer; + type Service = LoadBalancerSvc; fn new_service(&self, target: T) -> Self::Service { // Routes service let svc = self.inner.new_service(target); - LoadBalancer { inner: svc } + LoadBalancerSvc { inner: svc } } } -impl Service> for LoadBalancer +impl Service> for LoadBalancerSvc where // Routes service N: Service<(), Response = Vec>> + Clone, @@ -94,18 +101,87 @@ where Ok(routes) => routes, }; - let service_list: Vec<_> = routes - .into_iter() - .map(|invoker| tower::load::Constant::new(invoker, 1)) - .collect(); + // let service_list: Vec<_> = routes + // .into_iter() + // // .map(|invoker| tower::load::Constant::new(invoker, 1)) + // .collect(); - let service_list = ServiceList::new(service_list); + // let rdm = RandomLoadBalancer::default(); + let metadata = Metadata::from_headers(req.headers().clone()); + // let invks = rdm.select_invokers(service_list, metadata); + // invks.oneshot(req).await + // let service_list = ServiceList::new(service_list); - let p2c = tower::balance::p2c::Balance::new(service_list); + // let p2c = tower::balance::p2c::Balance::new(service_list); + // let p: Box, http::Response>, Box>> + std::marker::Send + std::marker::Sync> = get_loadbalancer("p2c").into(); + let p = get_loadbalancer("p2c"); + // let ivk = p.select_invokers(invokers, metadata); + let ivk = p.select_invokers(routes, metadata); - p2c.oneshot(req).await + ivk.oneshot(req).await }; Box::pin(fut) } } + +type DubboBoxService = tower::util::BoxService< + http::Request, + http::Response, + Box, +>; + +pub trait LoadBalancer { + type Invoker; + + fn select_invokers( + &self, + invokers: Vec>, + metadata: Metadata, + ) -> Self::Invoker; +} + +fn get_loadbalancer( + loadbalancer: &str, +) -> Box + Send + Sync + 'static> { + match loadbalancer { + "random" => { + println!("random!"); + Box::new(RandomLoadBalancer::default()) + } + "p2c" => Box::new(P2cBalancer::default()), + _ => Box::new(P2cBalancer::default()), + } +} +const DEFAULT_RTT: Duration = Duration::from_millis(30); +#[derive(Debug, Default)] +pub struct P2cBalancer {} + +impl LoadBalancer for P2cBalancer { + type Invoker = DubboBoxService; + + fn select_invokers( + &self, + invokers: Vec>, + _metadata: Metadata, + ) -> Self::Invoker { + debug!("p2c load balancer"); + let service_list: Vec<_> = invokers + .into_iter() + .map(|invoker| tower::load::Constant::new(invoker, 1)) + .collect(); + + let decay = Duration::from_secs(10); + let service_list = ServiceList::new(service_list); + let s = tower::load::PeakEwmaDiscover::new( + service_list, + DEFAULT_RTT, + decay, + tower::load::CompleteOnResponse::default(), + ); + + let p = tower::balance::p2c::Balance::new(s); + let svc = DubboBoxService::new(p); + svc + } +} diff --git a/dubbo/src/loadbalancer/random.rs b/dubbo/src/loadbalancer/random.rs new file mode 100644 index 00000000..7739cba4 --- /dev/null +++ b/dubbo/src/loadbalancer/random.rs @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use rand::prelude::SliceRandom; +use tracing::debug; + +use super::{DubboBoxService, LoadBalancer}; +use crate::{ + invocation::Metadata, loadbalancer::CloneInvoker, + protocol::triple::triple_invoker::TripleInvoker, +}; + +#[derive(Clone, Default)] +pub struct RandomLoadBalancer {} + +impl LoadBalancer for RandomLoadBalancer { + type Invoker = DubboBoxService; + + fn select_invokers( + &self, + invokers: Vec>, + metadata: Metadata, + ) -> Self::Invoker { + debug!("random loadbalance {:?}", metadata); + let ivk = invokers.choose(&mut rand::thread_rng()).unwrap().clone(); + DubboBoxService::new(ivk) + } +} diff --git a/dubbo/src/triple/transport/connector/mod.rs b/dubbo/src/triple/transport/connector/mod.rs index 690b781a..66f95e87 100644 --- a/dubbo/src/triple/transport/connector/mod.rs +++ b/dubbo/src/triple/transport/connector/mod.rs @@ -17,7 +17,7 @@ pub mod http_connector; pub mod https_connector; -#[cfg(any(target_os = "macos", target_os = "unix"))] +#[cfg(any(target_os = "macos", target_family = "unix"))] pub mod unix_connector; use hyper::Uri; @@ -84,7 +84,7 @@ pub fn get_connector(connector: &str) -> BoxCloneService { let c = unix_connector::UnixConnector::new(); BoxCloneService::new(Connector::new(c)) diff --git a/dubbo/src/triple/transport/listener/mod.rs b/dubbo/src/triple/transport/listener/mod.rs index 0be7415f..4f01ef00 100644 --- a/dubbo/src/triple/transport/listener/mod.rs +++ b/dubbo/src/triple/transport/listener/mod.rs @@ -16,7 +16,7 @@ */ pub mod tcp_listener; -#[cfg(any(target_os = "macos", target_os = "unix"))] +#[cfg(any(target_os = "macos", target_family = "unix"))] pub mod unix_listener; use std::net::SocketAddr; @@ -65,7 +65,7 @@ impl Listener for WrappedListener { pub async fn get_listener(name: String, addr: SocketAddr) -> Result { match name.as_str() { "tcp" => Ok(TcpListener::bind(addr).await?.boxed()), - #[cfg(any(target_os = "macos", target_os = "unix"))] + #[cfg(any(target_os = "macos", target_family = "unix"))] "unix" => Ok(unix_listener::UnixListener::bind(addr).await?.boxed()), _ => { warn!("no support listener: {:?}", name); diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs index ee8cc1e6..fc48dc5c 100644 --- a/examples/echo/src/generated/grpc.examples.echo.rs +++ b/examples/echo/src/generated/grpc.examples.echo.rs @@ -43,9 +43,7 @@ pub mod echo_client { let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static( - "/grpc.examples.echo.Echo/UnaryEcho", - ); + let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); self.inner.unary(request, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -102,9 +100,7 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type ServerStreamingEchoStream: futures_util::Stream> + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -118,19 +114,14 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type BidirectionalStreamingEchoStream: futures_util::Stream> + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result< - Response, - dubbo::status::Status, - >; + ) -> Result, dubbo::status::Status>; } /// Echo is the echo service. #[derive(Debug)] @@ -160,10 +151,7 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -176,24 +164,16 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -204,30 +184,20 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc - for ServerStreamingEchoServer { + impl ServerStreamingSvc for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = + BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.server_streaming_echo(request).await - }; + let fut = async move { inner.server_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -240,29 +210,21 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc - for ClientStreamingEchoServer { + impl ClientStreamingSvc for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.client_streaming_echo(request).await - }; + let fut = async move { inner.client_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -275,54 +237,39 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc - for BidirectionalStreamingEchoServer { + impl StreamingSvc for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = + BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.bidirectional_streaming_echo(request).await - }; + let fut = + async move { inner.bidirectional_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server - .bidi_streaming( - BidirectionalStreamingEchoServer { - inner, - }, - req, - ) + .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) .await; Ok(res) }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/examples/greeter/application.yaml b/examples/greeter/application.yaml index 6683c6d1..9a4b47fc 100644 --- a/examples/greeter/application.yaml +++ b/examples/greeter/application.yaml @@ -15,5 +15,6 @@ dubbo: GreeterProvider: version: 1.0.0 group: test + tag: red protocol: triple interface: org.apache.dubbo.sample.tri.Greeter \ No newline at end of file diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs index f6fea892..2493fbcb 100644 --- a/examples/greeter/src/greeter/client.rs +++ b/examples/greeter/src/greeter/client.rs @@ -38,13 +38,17 @@ async fn main() { let builder = ClientBuilder::new().with_registry("nacos://127.0.0.1:8848".parse().unwrap()); let mut cli = GreeterClient::new(builder); + let mut mtdata = Metadata::default(); + mtdata = mtdata.insert("static_tag".to_string(), "red".to_string()); + let req = Request::from_parts( + mtdata.clone(), + GreeterRequest { + name: "message from client".to_string(), + }, + ); println!("# unary call"); - let resp = cli - .greet(Request::new(GreeterRequest { - name: "message from client".to_string(), - })) - .await; + let resp = cli.greet(req).await; let resp = match resp { Ok(resp) => resp, Err(err) => return println!("response error: {:?}", err), @@ -64,7 +68,9 @@ async fn main() { name: "msg3 from client streaming".to_string(), }, ]; - let req = futures_util::stream::iter(data); + let mut mtdata = Metadata::default(); + mtdata = mtdata.insert("client_streaming".to_string(), "true".to_string()); + let req = Request::from_parts(mtdata, futures_util::stream::iter(data)); let resp = cli.greet_client_stream(req).await; let client_streaming_resp = match resp { Ok(resp) => resp,