diff --git a/doc/tutorial.md b/doc/tutorial.md index 0e5a2b1..7d2f60b 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -369,17 +369,17 @@ end, we send a 2-tuple `{[Value], Stream}`. Once we've implemented all our methods, we also need to start up a gRPC server so that clients can actually use our service. The following snippet shows how we do this for our `RouteGuide` service. Note that it will only work if both -the `route_guide_server` module and the generated encoder/decoder module +the `route_guide_server_1` module and the generated encoder/decoder module `route_guide` are on the code path. ```erlang -5> {ok, Server} = grpc:start_server(grpc, tcp, 10000, route_guide_server_1). +5> {ok, Server} = grpc:start_server(grpc, tcp, 10000, #{'RouteGuide' => #{handler => route_guide_server_1}}). ``` The first argument is the name of the server (strictly speaking: the Cowboy listener). It can be any term. The second argument specifies the transport layer. It can be `tcp` or `ssl`. The third argument specifies the port and -the fourth argument is the handler module. As an optional fifth argument +the fourth argument links the gRPC service to our handler module. As an optional fifth argument some options can be provided, for example to enable compression or authentication, details about that are provided in separate sections below. @@ -631,7 +631,8 @@ be correct. The example certificates can be found in test/certificates. ```erlang -grpc:start_server(grpc, ssl, 10000, route_guide_server_1, +grpc:start_server(grpc, ssl, 10000, + #{'RouteGuide' => #{handler => route_guide_server_1}}, [{transport_options, [{certfile, "certificates/localhost.crt"}, {keyfile, "certificates/localhost.key"}, {cacertfile, "certificates/My_Root_CA.crt"}]}]). @@ -674,7 +675,8 @@ certificates are, and we must pass a few extra transport_options to ensure that the authentication is enforced. So the server will be started like this: ```erlang -grpc:start_server(grpc, ssl, 10000, route_guide_server_1, +grpc:start_server(grpc, ssl, 10000, + #{'RouteGuide' => #{handler => route_guide_server_1}}, [{client_cert_dir, "certificates"}, {transport_options, [{certfile, "certificates/localhost.crt"}, {keyfile, "certificates/localhost.key"}, diff --git a/src/grpc.erl b/src/grpc.erl index faccc3b..73d2ec2 100644 --- a/src/grpc.erl +++ b/src/grpc.erl @@ -22,9 +22,26 @@ authority/1, scheme/1, method/1, path/1, set_compression/2]). + +-type service_spec() :: #{handler := module(), + decoder => module(), + handler_state => handler_state()}. +%% The 'handler' module must export a function for each of the RPCs. Typically +%% this module is generated from the .proto file using grpc:compile/1. The +%% generated module contains skeleton functions for the RPCs, these must be +%% extended with the actual implementation of the service. +%% +%% Optionally a start state ('handler_state') can be specified. +%% +%% The 'decoder' is also optional: by default the result of the 'decoder/0' +%% function in the handler module will be used. + +-type services() :: #{ServiceName :: atom() := service_spec()}. +%% Links each gRPC service to a 'service_spec()'. The 'service_spec()' contains +%% the information that the server needs to execute the service. + -type option() :: {transport_options, [ranch_ssl:ssl_opt()]} | - {num_acceptors, integer()} | - {handler_state, handler_state()} . + {num_acceptors, integer()}. -type metadata_key() :: binary(). -type metadata_value() :: binary(). -type metadata() :: #{metadata_key() => metadata_value()}. @@ -33,13 +50,14 @@ -type error_message() :: binary(). -type error_response() :: {error, error_code(), error_message(), stream()}. -type handler_state() :: any(). -%% This term is passed to the handler module. It will show up as the value of the -%% 'State' parameter that is passed to the first invocation (per stream) of the -%% generated RPC skeleton functions. The default value is undefined. See the implementation -%% of 'RecordRoute' in the tutorial for an example. +%% This term is passed to the handler module. It will show up as the value of +%% the 'State' parameter that is passed to the first invocation (per stream) of +%% the generated RPC skeleton functions. The default value is 'undefined'. See +%% the implementation of 'RecordRoute' in the tutorial for an example. -opaque stream() :: map(). -export_type([option/0, + services/0, error_response/0, compression_method/0, stream/0, @@ -66,16 +84,16 @@ compile(FileName, Options) -> -spec start_server(Name::term(), Transport::ssl|tcp, Port::integer(), - Handler::module()) -> {ok, CowboyListenerPid::pid()} | - {error, any()}. -%% @equiv start_server(Name, Transport, Port, Handler, []) -start_server(Name, Transport, Port, Handler) when is_atom(Handler) -> - start_server(Name, Transport, Port, Handler, []). + Services::services()) -> {ok, CowboyListenerPid::pid()} | + {error, any()}. +%% @equiv start_server(Name, Transport, Port, Services, []) +start_server(Name, Transport, Port, Services) when is_map(Services) -> + start_server(Name, Transport, Port, Services, []). -spec start_server(Name::term(), Transport::ssl|tcp, Port::integer(), - Handler::module(), + Services::services(), Options::[option()]) -> {ok, CowboyListenerPid::pid()} | {error, any()}. %% @doc Start a gRPC server. @@ -83,15 +101,11 @@ start_server(Name, Transport, Port, Handler) when is_atom(Handler) -> %% The Name is used to identify this server in future calls, in particular when stopping %% the server. %% -%% The Handler module must export functions to provide the name of the -%% server and the module to encode and decode the messages, and a function -%% for each of the RPCs. Typically this module is generated from the -%% .proto file using grpc:compile/1. The generated module contains skeleton -%% functions for the RPCs, these must be extended with the actual implementation -%% of the service. -start_server(Name, Transport, Port, Handler, Options) when is_atom(Handler), +%% 'Services' is a map that links each gRPC service to the module that implements +%% it (with some optional additional information). +start_server(Name, Transport, Port, Services, Options) when is_map(Services), is_list(Options) -> - grpc_server:start(Name, Transport, Port, Handler, Options). + grpc_server:start(Name, Transport, Port, Services, Options). -spec stop_server(Name::term()) -> ok | {error, not_found}. %% @doc Stop a gRPC server. diff --git a/src/grpc_server.erl b/src/grpc_server.erl index 0b628a5..92b4580 100644 --- a/src/grpc_server.erl +++ b/src/grpc_server.erl @@ -21,15 +21,11 @@ -spec start(Name::term(), Transport::tcp|ssl, Port::integer(), - Handler::module(), + Services::grpc:services(), Options::[grpc:server_option()]) -> {ok, CowboyListenerPid::pid()} | {error, any()}. -start(Name, Transport, Port, Handler, Options) -> +start(Name, Transport, Port, Services, Options) -> {ok, _Started} = application:ensure_all_started(grpc), - {module, _} = code:ensure_loaded(Handler), - HandlerState = proplists:get_value(handler_state, Options), - Decoder = Handler:decoder(), - {module, _} = code:ensure_loaded(Decoder), AuthFun = get_authfun(Transport, Options), %% All requests are dispatched to this same module (?MODULE), %% which means that `init/2` below will be called for each @@ -37,10 +33,8 @@ start(Name, Transport, Port, Handler, Options) -> Dispatch = cowboy_router:compile([ {'_', [{"/:service/:method", ?MODULE, - #{handler_state => HandlerState, - auth_fun => AuthFun, - handler => Handler, - decoder => Decoder}}]}]), + #{auth_fun => AuthFun, + services => Services}}]}]), ProtocolOpts = #{env => #{dispatch => Dispatch}, %% inactivity_timeout => infinity, stream_handlers => [grpc_stream_handler, @@ -151,20 +145,24 @@ authenticated(#{cowboy_req := Req} = Stream, Options) -> NewStream -> read_frames(NewStream) catch - _:_ -> throw({?GRPC_STATUS_UNIMPLEMENTED, - <<"Operation not implemented">>}) + _:_ -> + throw({?GRPC_STATUS_UNIMPLEMENTED, + <<"Operation not implemented">>}) end. -get_function(Req, #{handler := HandlerModule, - decoder := DecoderModule, - handler_state := HandlerState} = _Options, Stream) -> +get_function(Req, #{services := Services} = _Options, Stream) -> QualifiedService = cowboy_req:binding(service, Req), - Rpc = binary_to_existing_atom(cowboy_req:binding(method, Req)), Service = binary_to_existing_atom(lists:last(binary:split(QualifiedService, <<".">>, [global]))), + #{Service := #{handler := Handler} = Spec} = Services, + {module, _} = code:ensure_loaded(Handler), + HandlerState = maps:get(handler_state, Spec, undefined), + DecoderModule = maps:get(decoder, Spec, Handler:decoder()), + {module, _} = code:ensure_loaded(DecoderModule), + Rpc = binary_to_existing_atom(cowboy_req:binding(method, Req)), Stream#{decoder => DecoderModule, service => Service, - handler => HandlerModule, + handler => Handler, handler_state => HandlerState, rpc => Rpc}. diff --git a/test/grpc_SUITE.erl b/test/grpc_SUITE.erl index 107a5a1..055b49b 100644 --- a/test/grpc_SUITE.erl +++ b/test/grpc_SUITE.erl @@ -81,6 +81,11 @@ groups() -> ,run_recordroute ,flow_control ]}, + {more_services, [sequence], + [compile_hello_world + ,start_server_4 + ,run_getfeature + ,run_bonjour]}, {metadata, [sequence], [compile_example_2 ,start_server_2 @@ -134,6 +139,7 @@ init_per_group(Group, Config) -> end_per_group(Group, _Config) when Group == tutorial; Group == metadata; + Group == more_services; Group == h2_client; Group == security; Group == compressed; @@ -153,6 +159,7 @@ end_per_group(_, _Config) -> all() -> [ {group, tutorial}, + {group, more_services}, {group, metadata}, {group, h2_client}, {group, compressed}, @@ -195,7 +202,8 @@ compile_routeguide_example(_Config) -> start_routeguide_server(Config) -> Port = port(Config), Server = server(Config), - {ok, _} = grpc:start_server(Server, tcp, Port, route_guide_server_1, []). + Spec = #{'RouteGuide' => #{handler => route_guide_server_1}}, + {ok, _} = grpc:start_server(Server, tcp, Port, Spec, []). run_getfeature(Config) -> Port = port(Config), @@ -203,6 +211,15 @@ run_getfeature(Config) -> [{http2_client, h2_client(Config)}]), {ok, #{result := #{name := ?BVM_TRAIL}}} = feature(Connection, ?BVM_TRAIL_POINT). + +run_bonjour(Config) -> + Port = port(Config), + {ok, Connection} = grpc_client:connect(tcp, "localhost", Port, + [{http2_client, h2_client(Config)}]), + {ok, #{result := #{message := "Bonjour, World"}}} = + grpc_client:unary(Connection, #{name => "World"}, 'Greeter', + 'SayHello', helloworld, []). + getfeature_client(Config) -> Port = port(Config), {ok, Connection} = grpc_client:connect(tcp, "localhost", Port, [{http2_client, h2_client(Config)}]), @@ -334,6 +351,13 @@ send_big_message(Stream, Message, N) -> %% allow the WINDOW_UPDATE message to arrive timer:sleep(500). +compile_hello_world(_Config) -> + ExampleDir = filename:join(code:lib_dir(grpc, examples), "helloworld"), + Server = filename:join([ExampleDir, "helloworld_server.erl"]), + {ok, _} = compile:file(Server), + ok = grpc:compile("helloworld.proto", [{i, ExampleDir}]), + {ok, _} = compile:file("helloworld.erl"). + compile_example_2(_Config) -> compile_example("route_guide_server_2.erl"). @@ -342,11 +366,20 @@ compile_example_3(_Config) -> start_server_2(Config) -> Server = server(Config), - {ok, _} = grpc:start_server(Server, tcp, port(Config), route_guide_server_2, []). + Spec = #{'RouteGuide' => #{handler => route_guide_server_2}}, + {ok, _} = grpc:start_server(Server, tcp, port(Config), Spec, []). start_server_3(Config) -> Server = server(Config), - {ok, _} = grpc:start_server(Server, tcp, port(Config), route_guide_server_3, []). + Spec = #{'RouteGuide' => #{handler => route_guide_server_3}}, + {ok, _} = grpc:start_server(Server, tcp, port(Config), Spec, []). + +start_server_4(Config) -> + Server = server(Config), + Spec = #{'RouteGuide' => #{handler => route_guide_server_1}, + 'Greeter' => #{handler => helloworld_server, + handler_state => "Bonjour, "}}, + {ok, _} = grpc:start_server(Server, tcp, port(Config), Spec, []). metadata_from_client(Config) -> {ok, Connection} = grpc_client:connect(tcp, "localhost", port(Config), @@ -446,7 +479,8 @@ start_server_secure(Config) -> SslOptions = [{certfile, certificate("localhost.crt")}, {keyfile, certificate("localhost.key")}, {cacertfile, certificate("My_Root_CA.crt")}], - {ok, _} = grpc:start_server(Server, ssl, port(Config), route_guide_server_1, + Spec = #{'RouteGuide' => #{handler => route_guide_server_1}}, + {ok, _} = grpc:start_server(Server, ssl, port(Config), Spec, [{transport_options, SslOptions}]). secure_request(Config) -> @@ -474,7 +508,8 @@ start_server_wrong_certificate(Config) -> TlsOptions = [{certfile, certificate("mydomain.com.crt")}, {keyfile, certificate("mydomain.com.key")}, {cacertfile, certificate("My_Root_CA.crt")}], - {ok, _} = grpc:start_server(Server, ssl, port(Config), route_guide_server_1, + Spec = #{'RouteGuide' => #{handler => route_guide_server_1}}, + {ok, _} = grpc:start_server(Server, ssl, port(Config), Spec, [{transport_options, TlsOptions}]). ssl_without_server_identification(Config) -> @@ -501,7 +536,8 @@ start_server_authenticating(Config) -> {cacertfile, certificate("My_Root_CA.crt")}, {fail_if_no_peer_cert, true}, {verify, verify_peer}], - {ok, _} = grpc:start_server(Server, ssl, port(Config), route_guide_server_1, + Spec = #{'RouteGuide' => #{handler => route_guide_server_1}}, + {ok, _} = grpc:start_server(Server, ssl, port(Config), Spec, [{client_cert_dir, client_cert_dir()}, {http2_client, h2_client(Config)}, {transport_options, TlsOptions}]). @@ -536,6 +572,7 @@ compile_example(File) -> "server"]), {ok, _} = compile:file(filename:join(ExampleDir, File)). + feature(Connection, Message) -> feature(Connection, Message, []). diff --git a/test/run_load_test.erl b/test/run_load_test.erl new file mode 100644 index 0000000..4a697fa --- /dev/null +++ b/test/run_load_test.erl @@ -0,0 +1,172 @@ +%% use make to run the tests (see Makefile): +%% $> make grpc_load_test +%% $> make rpc_load_test + +-module(run_load_test). +-export([start_grpc_server/0]). +-export([start_native_server/0]). +-export([start_client/0]). + +start_grpc_server() -> + {ok, _} = grpc:start_server(load_test, tcp, 10000, + #{get_stats => #{handler => statistics_server}}), + log_msg("Server started~n", []). + +start_native_server() -> + ok. + +%% If Rpc_type == grpc, Server is the host name of the server +%% ("localhost", typically). +%% If Rpc_type == native, Server is the erlang node name, for example +%% 'grpc_test@localhost'. +start_client() -> + Rpc_type = get_arg_val(rpc_type, grpc), + ServerArg = get_arg_val(server, undefined), + N_workers = get_arg_val(num_workers, 100), + N_reqs = get_arg_val(num_requests_per_worker, 100), + Server = case Rpc_type of + native -> list_to_atom(ServerArg); + grpc -> ServerArg; + undefined -> io:format("Error: server argument must be provided~n") + end, + start(Server, N_workers, N_reqs, Rpc_type). + +get_arg_val(Tag, Def_val) -> + try + [Val_str] = proplists:get_value(Tag, init:get_arguments()), + case Tag of + server -> + Val_str; + rpc_type -> + Val_atom = list_to_atom(Val_str), + true = ((Val_atom == native) orelse (Val_atom == grpc)), + Val_atom; + _ -> + Val_int = list_to_integer(Val_str), + true = is_integer(Val_int), + true = (Val_int > 0), + Val_int + end + catch _:_ -> + Def_val + end. + +start(Server, Num_workers, Num_reqs, Rpc_type) -> + ets:new(ts_counters, [named_table, public, {write_concurrency, true}, {read_concurrency, true}]), + ets:new(counters, [named_table, public, {write_concurrency, true}, {read_concurrency, true}]), + ets:new(worker_pids, [named_table, public, {write_concurrency, true}, {read_concurrency, true}]), + log_msg("Starting spawn of ~p workers...~n", [Num_workers]), + spawn_workers(Num_workers, Num_reqs, Rpc_type, Server), + log_msg("Finished spawning workers~n", []), + wait_for_finish(), + print_results(), + erlang:halt(). + +spawn_workers(0, _, _, _) -> + ok; +spawn_workers(Num_workers, Num_reqs, Rpc_type, Server) -> + Pid = spawn(fun() -> + worker_init(Num_reqs, Rpc_type, Server) + end), + ets:insert(worker_pids, {Pid, []}), + Pid ! start, + spawn_workers(Num_workers - 1, Num_reqs, Rpc_type, Server). + +wait_for_finish() -> + case ets:info(worker_pids, size) of + 0 -> + ok; + N -> + log_msg("Waiting for ~p workers to finish...~n", [N]), + timer:sleep(5000), + wait_for_finish() + end. + +worker_init(Num_reqs, Rpc_type=grpc, Server) when is_list(Server) -> + receive + start -> + {ok, Connection} = grpc_client:connect(tcp, Server, 10000), + worker_loop(Num_reqs, Connection, Rpc_type, Server) + end; +worker_init(Num_reqs, Rpc_type=native, Server) -> + receive + start -> + worker_loop(Num_reqs, undefined, Rpc_type, Server) + end. + +worker_loop(0, _, _, _) -> + ets:delete(worker_pids, self()), + ok; +worker_loop(N, Connection, Rpc_type, Server) -> + update_counter(req_attempted, 1), + case do_rpc(Connection, Rpc_type, Server) of + ok -> + update_ts_counter(success, 1); + {badrpc, Reason} -> + update_ts_counter(Reason, 1) + end, + worker_loop(N - 1, Connection, Rpc_type, Server). + +do_rpc(Connection, grpc, _) -> + case statistics_client:get_stats(Connection, + #{type_field => microstate_accounting}, []) of + {ok, #{}} -> + ok; + Other -> + Other + end; +do_rpc(_, native, Server) when is_atom(Server) -> + case rpc:call(Server, erlang, statistics, [microstate_accounting]) of + L when is_list(L) -> + ok; + {badrpc, Reason} -> + {error, Reason} + end. + +update_ts_counter(Key, Val) -> + Key_ts = {calendar:local_time(), Key}, + ets:update_counter(ts_counters, Key_ts, {2, Val}, {Key_ts, 0}). + +update_counter(Key, Val) -> + ets:update_counter(counters, Key, {2, Val}, {Key, 0}). + +log_msg(Fmt, Args) -> + io:format("~s -- " ++ Fmt, [printable_ts(calendar:local_time()) | Args]). + +print_results() -> + Dict = lists:foldl( + fun({{Ts, Key}, Val}, X_dict) -> + dict:update(Ts, + fun(Res_tvl) -> + [{Key, Val} | Res_tvl] + end, [{Key, Val}], X_dict) + end, dict:new(), ets:tab2list(ts_counters)), + Res_tvl = lists:keysort(1, dict:to_list(Dict)), + Start_time = element(1, hd(Res_tvl)), + End_time = element(1, hd(lists:reverse(Res_tvl))), + {TT_d, TT_s} = calendar:time_difference(Start_time, End_time), + TT_secs = TT_d*86400 + calendar:time_to_seconds(TT_s), + [{_, Num_reqs}] = ets:lookup(counters, req_attempted), + ReqsPerSec = case TT_secs of + 0 -> undefined; + _ -> trunc(Num_reqs/TT_secs) + end, + io:format("~n", []), + io:format("Start time : ~s~n", [printable_ts(Start_time)]), + io:format("End time : ~s~n", [printable_ts(End_time)]), + io:format("Elapsed time (seconds) : ~p~n", [TT_secs]), + io:format("Number of attempted requests : ~p~n", [Num_reqs]), + io:format("Req/sec : ~p~n", [ReqsPerSec]), + io:format("~n", []), + io:format("~-30.30.\ss success, timeout, not_connected~n", ["Timestamp"]), + lists:foreach(fun({X_key, X_vals}) -> + X_succ = proplists:get_value(success, X_vals, 0), + X_timeout = proplists:get_value(timeout, X_vals, 0), + X_not_conn = proplists:get_value(not_connected, X_vals, 0), + io:format("~s\t\t~10.10.\sw, ~7.7.\sw, ~13.13.\sw~n", + [printable_ts(X_key), + X_succ, + X_timeout, X_not_conn]) + end, Res_tvl). +printable_ts({{Y, M, D}, {H, Mi, S}}) -> + io_lib:format("~p-~2.2.0w-~2.2.0w_~2.2.0w:~2.2.0w:~2.2.0w", [Y, M, D, H, Mi, S]). diff --git a/test/test_grpc_server.erl b/test/test_grpc_server.erl index 5c90659..8929e0b 100644 --- a/test/test_grpc_server.erl +++ b/test/test_grpc_server.erl @@ -11,7 +11,7 @@ %% $> make shell %% 1> cd(test). %% 2> c(test_grpc_server). -%% 3> test_grpc_server(http). +%% 3> test_grpc_server:run(tcp). %% %% Run the go client: %% @@ -19,7 +19,7 @@ %% %% To test with ssl: %% -%% 4> test_grpc_server(tls). +%% 4> test_grpc_server:run(ssl). %% %% Tun the go client with ssl: %% $> $GO_BIN/client -tls -ca_file $GRPC_ROOT/test/certificates/My_Root_CA.crt -server_host_override localhost @@ -33,7 +33,9 @@ run(How) -> compile(), {ok, _} = compile:file(filename:join(test_dir(), "test_route_guide_server.erl")), - {ok, _} = grpc:start_server(grpc, How, 10000, test_route_guide_server, options(How)). + {ok, _} = grpc:start_server(grpc, How, 10000, + #{'RouteGuide' => #{handler => test_route_guide_server}}, + options(How)). stop() -> grpc:stop_server(grpc).