Skip to content

Commit

Permalink
olve issue #8: multiple service handlers.
Browse files Browse the repository at this point in the history
  • Loading branch information
willemdj committed Aug 29, 2017
1 parent bb75f6b commit e5d85d9
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 51 deletions.
12 changes: 7 additions & 5 deletions doc/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,17 +369,17 @@ end, we send a 2-tuple `{[Value], Stream}`.
Once we've implemented all our methods, we also need to start up a gRPC server
so that clients can actually use our service. The following snippet shows how we
do this for our `RouteGuide` service. Note that it will only work if both
the `route_guide_server` module and the generated encoder/decoder module
the `route_guide_server_1` module and the generated encoder/decoder module
`route_guide` are on the code path.

```erlang
5> {ok, Server} = grpc:start_server(grpc, tcp, 10000, route_guide_server_1).
5> {ok, Server} = grpc:start_server(grpc, tcp, 10000, #{'RouteGuide' => #{handler => route_guide_server_1}}).
```

The first argument is the name of the server (strictly speaking: the Cowboy
listener). It can be any term. The second argument specifies the transport
layer. It can be `tcp` or `ssl`. The third argument specifies the port and
the fourth argument is the handler module. As an optional fifth argument
the fourth argument links the gRPC service to our handler module. As an optional fifth argument
some options can be provided, for example to enable compression or
authentication, details about that are provided in separate sections below.

Expand Down Expand Up @@ -631,7 +631,8 @@ be correct. The example certificates can be found in
test/certificates.

```erlang
grpc:start_server(grpc, ssl, 10000, route_guide_server_1,
grpc:start_server(grpc, ssl, 10000,
#{'RouteGuide' => #{handler => route_guide_server_1}},
[{transport_options, [{certfile, "certificates/localhost.crt"},
{keyfile, "certificates/localhost.key"},
{cacertfile, "certificates/My_Root_CA.crt"}]}]).
Expand Down Expand Up @@ -674,7 +675,8 @@ certificates are, and we must pass a few extra transport_options to ensure that
the authentication is enforced. So the server will be started like this:

```erlang
grpc:start_server(grpc, ssl, 10000, route_guide_server_1,
grpc:start_server(grpc, ssl, 10000,
#{'RouteGuide' => #{handler => route_guide_server_1}},
[{client_cert_dir, "certificates"},
{transport_options, [{certfile, "certificates/localhost.crt"},
{keyfile, "certificates/localhost.key"},
Expand Down
54 changes: 34 additions & 20 deletions src/grpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,26 @@
authority/1, scheme/1, method/1, path/1,
set_compression/2]).


-type service_spec() :: #{handler := module(),
decoder => module(),
handler_state => handler_state()}.
%% The 'handler' module must export a function for each of the RPCs. Typically
%% this module is generated from the .proto file using grpc:compile/1. The
%% generated module contains skeleton functions for the RPCs, these must be
%% extended with the actual implementation of the service.
%%
%% Optionally a start state ('handler_state') can be specified.
%%
%% The 'decoder' is also optional: by default the result of the 'decoder/0'
%% function in the handler module will be used.

-type services() :: #{ServiceName :: atom() := service_spec()}.
%% Links each gRPC service to a 'service_spec()'. The 'service_spec()' contains
%% the information that the server needs to execute the service.

-type option() :: {transport_options, [ranch_ssl:ssl_opt()]} |
{num_acceptors, integer()} |
{handler_state, handler_state()} .
{num_acceptors, integer()}.
-type metadata_key() :: binary().
-type metadata_value() :: binary().
-type metadata() :: #{metadata_key() => metadata_value()}.
Expand All @@ -33,13 +50,14 @@
-type error_message() :: binary().
-type error_response() :: {error, error_code(), error_message(), stream()}.
-type handler_state() :: any().
%% This term is passed to the handler module. It will show up as the value of the
%% 'State' parameter that is passed to the first invocation (per stream) of the
%% generated RPC skeleton functions. The default value is undefined. See the implementation
%% of 'RecordRoute' in the tutorial for an example.
%% This term is passed to the handler module. It will show up as the value of
%% the 'State' parameter that is passed to the first invocation (per stream) of
%% the generated RPC skeleton functions. The default value is 'undefined'. See
%% the implementation of 'RecordRoute' in the tutorial for an example.
-opaque stream() :: map().

-export_type([option/0,
services/0,
error_response/0,
compression_method/0,
stream/0,
Expand All @@ -66,32 +84,28 @@ compile(FileName, Options) ->
-spec start_server(Name::term(),
Transport::ssl|tcp,
Port::integer(),
Handler::module()) -> {ok, CowboyListenerPid::pid()} |
{error, any()}.
%% @equiv start_server(Name, Transport, Port, Handler, [])
start_server(Name, Transport, Port, Handler) when is_atom(Handler) ->
start_server(Name, Transport, Port, Handler, []).
Services::services()) -> {ok, CowboyListenerPid::pid()} |
{error, any()}.
%% @equiv start_server(Name, Transport, Port, Services, [])
start_server(Name, Transport, Port, Services) when is_map(Services) ->
start_server(Name, Transport, Port, Services, []).

-spec start_server(Name::term(),
Transport::ssl|tcp,
Port::integer(),
Handler::module(),
Services::services(),
Options::[option()]) ->
{ok, CowboyListenerPid::pid()} | {error, any()}.
%% @doc Start a gRPC server.
%%
%% The Name is used to identify this server in future calls, in particular when stopping
%% the server.
%%
%% The Handler module must export functions to provide the name of the
%% server and the module to encode and decode the messages, and a function
%% for each of the RPCs. Typically this module is generated from the
%% .proto file using grpc:compile/1. The generated module contains skeleton
%% functions for the RPCs, these must be extended with the actual implementation
%% of the service.
start_server(Name, Transport, Port, Handler, Options) when is_atom(Handler),
%% 'Services' is a map that links each gRPC service to the module that implements
%% it (with some optional additional information).
start_server(Name, Transport, Port, Services, Options) when is_map(Services),
is_list(Options) ->
grpc_server:start(Name, Transport, Port, Handler, Options).
grpc_server:start(Name, Transport, Port, Services, Options).

-spec stop_server(Name::term()) -> ok | {error, not_found}.
%% @doc Stop a gRPC server.
Expand Down
32 changes: 15 additions & 17 deletions src/grpc_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,20 @@
-spec start(Name::term(),
Transport::tcp|ssl,
Port::integer(),
Handler::module(),
Services::grpc:services(),
Options::[grpc:server_option()]) ->
{ok, CowboyListenerPid::pid()} | {error, any()}.
start(Name, Transport, Port, Handler, Options) ->
start(Name, Transport, Port, Services, Options) ->
{ok, _Started} = application:ensure_all_started(grpc),
{module, _} = code:ensure_loaded(Handler),
HandlerState = proplists:get_value(handler_state, Options),
Decoder = Handler:decoder(),
{module, _} = code:ensure_loaded(Decoder),
AuthFun = get_authfun(Transport, Options),
%% All requests are dispatched to this same module (?MODULE),
%% which means that `init/2` below will be called for each
%% request.
Dispatch = cowboy_router:compile([
{'_', [{"/:service/:method",
?MODULE,
#{handler_state => HandlerState,
auth_fun => AuthFun,
handler => Handler,
decoder => Decoder}}]}]),
#{auth_fun => AuthFun,
services => Services}}]}]),
ProtocolOpts = #{env => #{dispatch => Dispatch},
%% inactivity_timeout => infinity,
stream_handlers => [grpc_stream_handler,
Expand Down Expand Up @@ -151,20 +145,24 @@ authenticated(#{cowboy_req := Req} = Stream, Options) ->
NewStream ->
read_frames(NewStream)
catch
_:_ -> throw({?GRPC_STATUS_UNIMPLEMENTED,
<<"Operation not implemented">>})
_:_ ->
throw({?GRPC_STATUS_UNIMPLEMENTED,
<<"Operation not implemented">>})
end.

get_function(Req, #{handler := HandlerModule,
decoder := DecoderModule,
handler_state := HandlerState} = _Options, Stream) ->
get_function(Req, #{services := Services} = _Options, Stream) ->
QualifiedService = cowboy_req:binding(service, Req),
Rpc = binary_to_existing_atom(cowboy_req:binding(method, Req)),
Service = binary_to_existing_atom(lists:last(binary:split(QualifiedService,
<<".">>, [global]))),
#{Service := #{handler := Handler} = Spec} = Services,
{module, _} = code:ensure_loaded(Handler),
HandlerState = maps:get(handler_state, Spec, undefined),
DecoderModule = maps:get(decoder, Spec, Handler:decoder()),
{module, _} = code:ensure_loaded(DecoderModule),
Rpc = binary_to_existing_atom(cowboy_req:binding(method, Req)),
Stream#{decoder => DecoderModule,
service => Service,
handler => HandlerModule,
handler => Handler,
handler_state => HandlerState,
rpc => Rpc}.

Expand Down
49 changes: 43 additions & 6 deletions test/grpc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ groups() ->
,run_recordroute
,flow_control
]},
{more_services, [sequence],
[compile_hello_world
,start_server_4
,run_getfeature
,run_bonjour]},
{metadata, [sequence],
[compile_example_2
,start_server_2
Expand Down Expand Up @@ -134,6 +139,7 @@ init_per_group(Group, Config) ->
end_per_group(Group, _Config)
when Group == tutorial;
Group == metadata;
Group == more_services;
Group == h2_client;
Group == security;
Group == compressed;
Expand All @@ -153,6 +159,7 @@ end_per_group(_, _Config) ->
all() ->
[
{group, tutorial},
{group, more_services},
{group, metadata},
{group, h2_client},
{group, compressed},
Expand Down Expand Up @@ -195,14 +202,24 @@ compile_routeguide_example(_Config) ->
start_routeguide_server(Config) ->
Port = port(Config),
Server = server(Config),
{ok, _} = grpc:start_server(Server, tcp, Port, route_guide_server_1, []).
Spec = #{'RouteGuide' => #{handler => route_guide_server_1}},
{ok, _} = grpc:start_server(Server, tcp, Port, Spec, []).

run_getfeature(Config) ->
Port = port(Config),
{ok, Connection} = grpc_client:connect(tcp, "localhost", Port,
[{http2_client, h2_client(Config)}]),
{ok, #{result := #{name := ?BVM_TRAIL}}} = feature(Connection,
?BVM_TRAIL_POINT).

run_bonjour(Config) ->
Port = port(Config),
{ok, Connection} = grpc_client:connect(tcp, "localhost", Port,
[{http2_client, h2_client(Config)}]),
{ok, #{result := #{message := "Bonjour, World"}}} =
grpc_client:unary(Connection, #{name => "World"}, 'Greeter',
'SayHello', helloworld, []).

getfeature_client(Config) ->
Port = port(Config),
{ok, Connection} = grpc_client:connect(tcp, "localhost", Port, [{http2_client, h2_client(Config)}]),
Expand Down Expand Up @@ -334,6 +351,13 @@ send_big_message(Stream, Message, N) ->
%% allow the WINDOW_UPDATE message to arrive
timer:sleep(500).

compile_hello_world(_Config) ->
ExampleDir = filename:join(code:lib_dir(grpc, examples), "helloworld"),
Server = filename:join([ExampleDir, "helloworld_server.erl"]),
{ok, _} = compile:file(Server),
ok = grpc:compile("helloworld.proto", [{i, ExampleDir}]),
{ok, _} = compile:file("helloworld.erl").

compile_example_2(_Config) ->
compile_example("route_guide_server_2.erl").

Expand All @@ -342,11 +366,20 @@ compile_example_3(_Config) ->

start_server_2(Config) ->
Server = server(Config),
{ok, _} = grpc:start_server(Server, tcp, port(Config), route_guide_server_2, []).
Spec = #{'RouteGuide' => #{handler => route_guide_server_2}},
{ok, _} = grpc:start_server(Server, tcp, port(Config), Spec, []).

start_server_3(Config) ->
Server = server(Config),
{ok, _} = grpc:start_server(Server, tcp, port(Config), route_guide_server_3, []).
Spec = #{'RouteGuide' => #{handler => route_guide_server_3}},
{ok, _} = grpc:start_server(Server, tcp, port(Config), Spec, []).

start_server_4(Config) ->
Server = server(Config),
Spec = #{'RouteGuide' => #{handler => route_guide_server_1},
'Greeter' => #{handler => helloworld_server,
handler_state => "Bonjour, "}},
{ok, _} = grpc:start_server(Server, tcp, port(Config), Spec, []).

metadata_from_client(Config) ->
{ok, Connection} = grpc_client:connect(tcp, "localhost", port(Config),
Expand Down Expand Up @@ -446,7 +479,8 @@ start_server_secure(Config) ->
SslOptions = [{certfile, certificate("localhost.crt")},
{keyfile, certificate("localhost.key")},
{cacertfile, certificate("My_Root_CA.crt")}],
{ok, _} = grpc:start_server(Server, ssl, port(Config), route_guide_server_1,
Spec = #{'RouteGuide' => #{handler => route_guide_server_1}},
{ok, _} = grpc:start_server(Server, ssl, port(Config), Spec,
[{transport_options, SslOptions}]).

secure_request(Config) ->
Expand Down Expand Up @@ -474,7 +508,8 @@ start_server_wrong_certificate(Config) ->
TlsOptions = [{certfile, certificate("mydomain.com.crt")},
{keyfile, certificate("mydomain.com.key")},
{cacertfile, certificate("My_Root_CA.crt")}],
{ok, _} = grpc:start_server(Server, ssl, port(Config), route_guide_server_1,
Spec = #{'RouteGuide' => #{handler => route_guide_server_1}},
{ok, _} = grpc:start_server(Server, ssl, port(Config), Spec,
[{transport_options, TlsOptions}]).

ssl_without_server_identification(Config) ->
Expand All @@ -501,7 +536,8 @@ start_server_authenticating(Config) ->
{cacertfile, certificate("My_Root_CA.crt")},
{fail_if_no_peer_cert, true},
{verify, verify_peer}],
{ok, _} = grpc:start_server(Server, ssl, port(Config), route_guide_server_1,
Spec = #{'RouteGuide' => #{handler => route_guide_server_1}},
{ok, _} = grpc:start_server(Server, ssl, port(Config), Spec,
[{client_cert_dir, client_cert_dir()},
{http2_client, h2_client(Config)},
{transport_options, TlsOptions}]).
Expand Down Expand Up @@ -536,6 +572,7 @@ compile_example(File) ->
"server"]),
{ok, _} = compile:file(filename:join(ExampleDir, File)).


feature(Connection, Message) ->
feature(Connection, Message, []).

Expand Down
Loading

0 comments on commit e5d85d9

Please sign in to comment.