Skip to content

Commit

Permalink
Merge pull request #237 from emqx/1227-add-log-level-cli-option
Browse files Browse the repository at this point in the history
Improve CLI
  • Loading branch information
zmstone authored Dec 28, 2023
2 parents 1bff2ec + c05245e commit c815a18
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 41 deletions.
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# 1.11.0

- Add `connect` command which only establishes connection,
but does not publish or subscribe.
- Add `--log-level` option to CLI.
- Add timestamp to CLI logs.
- Exit with non-zero code when CLI stops due to error.

# 1.10.0

- Export emqtt:qos/0, emqtt:topic/0 and emqtt:packet_id/0 as public types.
Expand Down
2 changes: 0 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
[{test,
[{deps,
[ {meck, "0.9.2"}
%% add esockd 5.9.8 to solve otp 26 compatibility issue, can be deleted once emqx has it
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.8"}}}
, {emqx, {git_subdir, "https://github.com/emqx/emqx", {branch, "master"}, "apps/emqx"}}
, {emqx_utils, {git_subdir, "https://github.com/emqx/emqx", {branch, "master"}, "apps/emqx_utils"}}
, {emqx_durable_storage, {git_subdir, "https://github.com/emqx/emqx", {branch, "master"}, "apps/emqx_durable_storage"}}
Expand Down
104 changes: 66 additions & 38 deletions src/emqtt_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,13 @@
"certificate is issued"},
{verify, undefined, "verify", {boolean, false},
"TLS verify option, default: false "
}
},
{log_level, undefined, "log-level", {atom, warning},
"Log level: debug | info | warning | error"}
]).

-define(CONNECT_OPTS, ?CONN_SHORT_OPTS ++ ?HELP_OPT ++ ?CONN_LONG_OPTS).

-define(PUB_OPTS, ?CONN_SHORT_OPTS ++
[{topic, $t, "topic", string,
"mqtt topic on which to publish the message"},
Expand Down Expand Up @@ -113,12 +117,15 @@
"'size' to print payload size, 'as-string' to print payload as string"}
]).

main(["connect" | Argv]) ->
{ok, {Opts, _Args}} = getopt:parse(?CONNECT_OPTS, Argv),
ok = maybe_help(connect, Opts),
main(connect, Opts);
main(["sub" | Argv]) ->
{ok, {Opts, _Args}} = getopt:parse(?SUB_OPTS, Argv),
ok = maybe_help(sub, Opts),
ok = check_required_args(sub, [topic], Opts),
main(sub, Opts);

main(["pub" | Argv]) ->
{ok, {Opts, _Args}} = getopt:parse(?PUB_OPTS, Argv),
ok = maybe_help(pub, Opts),
Expand All @@ -127,22 +134,28 @@ main(["pub" | Argv]) ->
File = get_value(file, Opts),
case {Payload, File} of
{undefined, undefined} ->
io:format("Error: missing --payload or --file~n"),
halt(1);
log_halt("Error: missing --payload or --file~n", []);
_ ->
ok
end,
main(pub, Opts);

main(_Argv) ->
io:format("Usage: ~s pub | sub [--help]~n", [?CMD_NAME]).
io:format("Usage: ~s pub | sub | connect [--help]~n", [?CMD_NAME]).

main(PubSub, Opts0) ->
main(PubSubOrJustConnect, Opts0) ->
_ = process_flag(trap_exit, true),
application:ensure_all_started(quicer),
application:ensure_all_started(emqtt),
Print = proplists:get_value(print, Opts0),
Opts = proplists:delete(print, Opts0),
NOpts = enrich_opts(parse_cmd_opts(Opts)),
case proplists:get_value(log_level, Opts0) of
undefined ->
ok;
Level ->
logger:set_primary_config(level, Level)
end,
{ok, Client} = emqtt:start_link(NOpts),
ConnRet = case {proplists:get_bool(enable_websocket, NOpts),
proplists:get_bool(enable_quic, NOpts)} of
Expand All @@ -152,19 +165,22 @@ main(PubSub, Opts0) ->
end,
case ConnRet of
{ok, Properties} ->
io:format("Client ~s sent CONNECT~n", [get_value(clientid, NOpts)]),
case PubSub of
log("Connected:~n~p~n", [Properties]),
case PubSubOrJustConnect of
connect ->
%% only connect, keep running
receive_loop(Client, Print);
pub ->
publish(Client, NOpts, proplists:get_value(repeat, Opts)),
disconnect(Client, NOpts);
disconnect(Client);
sub ->
subscribe(Client, NOpts),
KeepAlive = maps:get('Server-Keep-Alive', Properties, get_value(keepalive, NOpts)) * 1000,
timer:send_interval(KeepAlive, ping),
receive_loop(Client, Print)
end;
{error, Reason} ->
io:format("Client ~s failed to sent CONNECT due to ~p~n", [get_value(clientid, NOpts), Reason])
log_halt("Failed to send CONNECT due to: ~p~n", [Reason])
end.

publish(Client, Opts, 1) ->
Expand All @@ -184,8 +200,7 @@ do_publish(Client, Opts) ->
case file:read_file(File) of
{ok, Bin} -> do_publish(Client, Opts, Bin);
{error, Reason} ->
io:format("Error: failed_to_read ~s:~nreason=~p", [File, Reason]),
halt(1)
log_halt("Failed to read ~s:~nreason: ~p", [File, Reason])
end;
Bin ->
do_publish(Client, Opts, Bin)
Expand All @@ -194,56 +209,52 @@ do_publish(Client, Opts) ->
do_publish(Client, Opts, Payload) ->
case emqtt:publish(Client, get_value(topic, Opts), Payload, Opts) of
{error, Reason} ->
io:format("Client ~s failed to sent PUBLISH due to ~p~n", [get_value(clientid, Opts), Reason]);
log_halt("Failed to send PUBLISH due to: ~p~n", [Reason]);
_ ->
io:format("Client ~s sent PUBLISH (Q~p, R~p, D0, Topic=~s, Payload=...(~p bytes))~n",
[get_value(clientid, Opts),
get_value(qos, Opts),
i(get_value(retain, Opts)),
get_value(topic, Opts),
iolist_size(Payload)])
log("Sent PUBLISH (Q~p, R~p, D0, Topic=~s, Payload=...(~p bytes))~n",
[get_value(qos, Opts), i(get_value(retain, Opts)),
get_value(topic, Opts), iolist_size(Payload)])
end.

subscribe(Client, Opts) ->
case emqtt:subscribe(Client, get_value(topic, Opts), Opts) of
{ok, _, [ReasonCode]} when 0 =< ReasonCode andalso ReasonCode =< 2 ->
io:format("Client ~s subscribed to ~s~n", [get_value(clientid, Opts), get_value(topic, Opts)]);
log("Subscribed to: ~s~n", [get_value(topic, Opts)]);
{ok, _, [ReasonCode]} ->
io:format("Client ~s failed to subscribe to ~s due to ~s~n", [get_value(clientid, Opts),
get_value(topic, Opts),
emqtt:reason_code_name(ReasonCode)]);
log_halt("Failed to subscribe to ~s due to: ~s~n", [get_value(topic, Opts), emqtt:reason_code_name(ReasonCode)]);
{error, Reason} ->
io:format("Client ~s failed to send SUBSCRIBE due to ~p~n", [get_value(clientid, Opts), Reason])
log_halt("Failed to send SUBSCRIBE due to: ~p~n", [Reason])
end.

disconnect(Client, Opts) ->
disconnect(Client) ->
case emqtt:disconnect(Client) of
ok ->
io:format("Client ~s sent DISCONNECT~n", [get_value(clientid, Opts)]);
log("Sent DISCONNECT~n", []);
{error, Reason} ->
io:format("Client ~s failed to send DISCONNECT due to ~p~n", [get_value(clientid, Opts), Reason])
log_halt("Failed to send DISCONNECT due to: ~p~n", [Reason])
end.

maybe_help(PubSub, Opts) ->
maybe_help(PubSubOrConnect, Opts) ->
case proplists:get_value(help, Opts) of
true ->
usage(PubSub),
usage(PubSubOrConnect),
halt(0);
_ -> ok
end.

usage(PubSub) ->
Opts = case PubSub of
usage(PubSubOrConnect) ->
Opts = case PubSubOrConnect of
pub -> ?PUB_OPTS;
sub -> ?SUB_OPTS
sub -> ?SUB_OPTS;
connect -> ?CONNECT_OPTS
end,
getopt:usage(Opts, ?CMD_NAME ++ " " ++ atom_to_list(PubSub)).
getopt:usage(Opts, ?CMD_NAME ++ " " ++ atom_to_list(PubSubOrConnect)).

check_required_args(PubSub, Keys, Opts) ->
lists:foreach(fun(Key) ->
case lists:keyfind(Key, 1, Opts) of
false ->
io:format("Error: '~s' required~n", [Key]),
log("Error: '~s' required~n", [Key]),
usage(PubSub),
halt(1);
_ -> ok
Expand Down Expand Up @@ -359,8 +370,12 @@ enrich_opts(Opts) ->

enrich_clientid_opt(Opts) ->
case lists:keyfind(clientid, 1, Opts) of
false -> [{clientid, emqtt:random_client_id()} | Opts];
_ -> Opts
false ->
ClientId = emqtt:random_client_id(),
log("Generated clientid: ~s~n", [ClientId]),
[{clientid, ClientId} | Opts];
_ ->
Opts
end.

enrich_port_opt(Opts) ->
Expand All @@ -382,10 +397,12 @@ pipeline([Fun|More], Input) ->

receive_loop(Client, Print) ->
receive
{'EXIT', Client, Reason} ->
log_halt("Client down: ~p~n", [Reason]);
{publish, #{payload := Payload}} ->
case Print of
"size" -> io:format("received ~p bytes~n", [size(Payload)]);
_ -> io:format("~s~n", [Payload])
"size" -> log("Received ~p bytes~n", [size(Payload)]);
_ -> log("~s~n", [Payload])
end,
receive_loop(Client, Print);
ping ->
Expand All @@ -397,3 +414,14 @@ receive_loop(Client, Print) ->

i(true) -> 1;
i(false) -> 0.

log(Fmt, Args) ->
io:format("~s " ++ Fmt, [ts() | Args]).

log_halt(Fmt, Args) ->
log(Fmt, Args),
halt(1).

ts() ->
SystemTime = erlang:system_time(millisecond),
calendar:system_time_to_rfc3339(SystemTime, [{unit, millisecond}, {time_designator, $T}]).
2 changes: 1 addition & 1 deletion test/emqtt_test_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ensure_listener(Type, Name, BindAddr, BindPort) ->
BaseConf = #{
acceptors => 16,
bind => {BindAddr, BindPort},
enabled => true,
enable => true,
idle_timeout => 15000,
limiter => #{},
max_connections => 1024000,
Expand Down

0 comments on commit c815a18

Please sign in to comment.