From 04f6cb07dabd5165a7005427bd055c1fe9966f78 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 28 Dec 2023 13:09:08 +0100 Subject: [PATCH 1/7] feat: add --log-level CLI option --- changelog.md | 4 ++++ src/emqtt_cli.erl | 10 +++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 13e5fd23..696d0881 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,7 @@ +# 1.11.0 + +- Add `--log-level` option to CLI. + # 1.10.0 - Export emqtt:qos/0, emqtt:topic/0 and emqtt:packet_id/0 as public types. diff --git a/src/emqtt_cli.erl b/src/emqtt_cli.erl index 7137c6f4..fa7f445f 100644 --- a/src/emqtt_cli.erl +++ b/src/emqtt_cli.erl @@ -79,7 +79,9 @@ "certificate is issued"}, {verify, undefined, "verify", {boolean, false}, "TLS verify option, default: false " - } + }, + {log_level, undefined, "log-level", {atom, warning}, + "Log level: debug | info | warning | error"} ]). -define(PUB_OPTS, ?CONN_SHORT_OPTS ++ @@ -143,6 +145,12 @@ main(PubSub, Opts0) -> Print = proplists:get_value(print, Opts0), Opts = proplists:delete(print, Opts0), NOpts = enrich_opts(parse_cmd_opts(Opts)), + case proplists:get_value(log_level, Opts0) of + undefined -> + ok; + Level -> + logger:set_primary_config(level, Level) + end, {ok, Client} = emqtt:start_link(NOpts), ConnRet = case {proplists:get_bool(enable_websocket, NOpts), proplists:get_bool(enable_quic, NOpts)} of From 81ef9d9ceddd8a4f57832e0cfb0c28e3fdb13a69 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 28 Dec 2023 14:49:44 +0100 Subject: [PATCH 2/7] fix(cli): trap exit to avoid crash dump when connection closed --- src/emqtt_cli.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/emqtt_cli.erl b/src/emqtt_cli.erl index fa7f445f..063a0402 100644 --- a/src/emqtt_cli.erl +++ b/src/emqtt_cli.erl @@ -140,6 +140,7 @@ main(_Argv) -> io:format("Usage: ~s pub | sub [--help]~n", [?CMD_NAME]). main(PubSub, Opts0) -> + _ = process_flag(trap_exit, true), application:ensure_all_started(quicer), application:ensure_all_started(emqtt), Print = proplists:get_value(print, Opts0), @@ -172,7 +173,8 @@ main(PubSub, Opts0) -> receive_loop(Client, Print) end; {error, Reason} -> - io:format("Client ~s failed to sent CONNECT due to ~p~n", [get_value(clientid, NOpts), Reason]) + io:format("Client ~s failed to sent CONNECT due to: ~p~n", [get_value(clientid, NOpts), Reason]), + halt(1) end. publish(Client, Opts, 1) -> @@ -390,6 +392,10 @@ pipeline([Fun|More], Input) -> receive_loop(Client, Print) -> receive + {'EXIT', Client, Reason} -> + io:format("Client down: ~p~n", [Reason]), + %% Reason is never 'normal', so, always halt with 1 + halt(1); {publish, #{payload := Payload}} -> case Print of "size" -> io:format("received ~p bytes~n", [size(Payload)]); From acfc7e6b08ac249baf2972ce30a65d879385678d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 28 Dec 2023 14:53:28 +0100 Subject: [PATCH 3/7] test: remove esockd version pin from this project's rebar.config --- rebar.config | 2 -- 1 file changed, 2 deletions(-) diff --git a/rebar.config b/rebar.config index a7e301fc..175dbffc 100644 --- a/rebar.config +++ b/rebar.config @@ -23,8 +23,6 @@ [{test, [{deps, [ {meck, "0.9.2"} - %% add esockd 5.9.8 to solve otp 26 compatibility issue, can be deleted once emqx has it - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.8"}}} , {emqx, {git_subdir, "https://github.com/emqx/emqx", {branch, "master"}, "apps/emqx"}} , {emqx_utils, {git_subdir, "https://github.com/emqx/emqx", {branch, "master"}, "apps/emqx_utils"}} , {emqx_durable_storage, {git_subdir, "https://github.com/emqx/emqx", {branch, "master"}, "apps/emqx_durable_storage"}} From ae8ee8ae8187f759a5152e28cc159b207a7992a9 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 28 Dec 2023 15:01:48 +0100 Subject: [PATCH 4/7] test: fix test case --- test/emqtt_test_lib.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/emqtt_test_lib.erl b/test/emqtt_test_lib.erl index 76bca755..e10fda74 100644 --- a/test/emqtt_test_lib.erl +++ b/test/emqtt_test_lib.erl @@ -84,7 +84,7 @@ ensure_listener(Type, Name, BindAddr, BindPort) -> BaseConf = #{ acceptors => 16, bind => {BindAddr, BindPort}, - enabled => true, + enable => true, idle_timeout => 15000, limiter => #{}, max_connections => 1024000, From 0957ef24807b0cd55b81dd0f30b650827dbbe041 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 28 Dec 2023 15:38:06 +0100 Subject: [PATCH 5/7] refactor(cli): refine logs and exit code --- src/emqtt_cli.erl | 63 ++++++++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/src/emqtt_cli.erl b/src/emqtt_cli.erl index 063a0402..4678d216 100644 --- a/src/emqtt_cli.erl +++ b/src/emqtt_cli.erl @@ -129,8 +129,7 @@ main(["pub" | Argv]) -> File = get_value(file, Opts), case {Payload, File} of {undefined, undefined} -> - io:format("Error: missing --payload or --file~n"), - halt(1); + log_halt("Error: missing --payload or --file~n", []); _ -> ok end, @@ -161,11 +160,11 @@ main(PubSub, Opts0) -> end, case ConnRet of {ok, Properties} -> - io:format("Client ~s sent CONNECT~n", [get_value(clientid, NOpts)]), + log("Sent CONNECT~n", []), case PubSub of pub -> publish(Client, NOpts, proplists:get_value(repeat, Opts)), - disconnect(Client, NOpts); + disconnect(Client); sub -> subscribe(Client, NOpts), KeepAlive = maps:get('Server-Keep-Alive', Properties, get_value(keepalive, NOpts)) * 1000, @@ -173,8 +172,7 @@ main(PubSub, Opts0) -> receive_loop(Client, Print) end; {error, Reason} -> - io:format("Client ~s failed to sent CONNECT due to: ~p~n", [get_value(clientid, NOpts), Reason]), - halt(1) + log_halt("Failed to send CONNECT due to: ~p~n", [Reason]) end. publish(Client, Opts, 1) -> @@ -194,8 +192,7 @@ do_publish(Client, Opts) -> case file:read_file(File) of {ok, Bin} -> do_publish(Client, Opts, Bin); {error, Reason} -> - io:format("Error: failed_to_read ~s:~nreason=~p", [File, Reason]), - halt(1) + log_halt("Failed to read ~s:~nreason: ~p", [File, Reason]) end; Bin -> do_publish(Client, Opts, Bin) @@ -204,34 +201,29 @@ do_publish(Client, Opts) -> do_publish(Client, Opts, Payload) -> case emqtt:publish(Client, get_value(topic, Opts), Payload, Opts) of {error, Reason} -> - io:format("Client ~s failed to sent PUBLISH due to ~p~n", [get_value(clientid, Opts), Reason]); + log_halt("Failed to send PUBLISH due to: ~p~n", [Reason]); _ -> - io:format("Client ~s sent PUBLISH (Q~p, R~p, D0, Topic=~s, Payload=...(~p bytes))~n", - [get_value(clientid, Opts), - get_value(qos, Opts), - i(get_value(retain, Opts)), - get_value(topic, Opts), - iolist_size(Payload)]) + log("Sent PUBLISH (Q~p, R~p, D0, Topic=~s, Payload=...(~p bytes))~n", + [get_value(qos, Opts), i(get_value(retain, Opts)), + get_value(topic, Opts), iolist_size(Payload)]) end. subscribe(Client, Opts) -> case emqtt:subscribe(Client, get_value(topic, Opts), Opts) of {ok, _, [ReasonCode]} when 0 =< ReasonCode andalso ReasonCode =< 2 -> - io:format("Client ~s subscribed to ~s~n", [get_value(clientid, Opts), get_value(topic, Opts)]); + log("Subscribed to: ~s~n", [get_value(topic, Opts)]); {ok, _, [ReasonCode]} -> - io:format("Client ~s failed to subscribe to ~s due to ~s~n", [get_value(clientid, Opts), - get_value(topic, Opts), - emqtt:reason_code_name(ReasonCode)]); + log_halt("Failed to subscribe to ~s due to: ~s~n", [get_value(topic, Opts), emqtt:reason_code_name(ReasonCode)]); {error, Reason} -> - io:format("Client ~s failed to send SUBSCRIBE due to ~p~n", [get_value(clientid, Opts), Reason]) + log_halt("Failed to send SUBSCRIBE due to: ~p~n", [Reason]) end. -disconnect(Client, Opts) -> +disconnect(Client) -> case emqtt:disconnect(Client) of ok -> - io:format("Client ~s sent DISCONNECT~n", [get_value(clientid, Opts)]); + log("Sent DISCONNECT~n", []); {error, Reason} -> - io:format("Client ~s failed to send DISCONNECT due to ~p~n", [get_value(clientid, Opts), Reason]) + log_halt("Failed to send DISCONNECT due to: ~p~n", [Reason]) end. maybe_help(PubSub, Opts) -> @@ -253,7 +245,7 @@ check_required_args(PubSub, Keys, Opts) -> lists:foreach(fun(Key) -> case lists:keyfind(Key, 1, Opts) of false -> - io:format("Error: '~s' required~n", [Key]), + log("Error: '~s' required~n", [Key]), usage(PubSub), halt(1); _ -> ok @@ -369,8 +361,12 @@ enrich_opts(Opts) -> enrich_clientid_opt(Opts) -> case lists:keyfind(clientid, 1, Opts) of - false -> [{clientid, emqtt:random_client_id()} | Opts]; - _ -> Opts + false -> + ClientId = emqtt:random_client_id(), + log("Generated clientid: ~s~n", [ClientId]), + [{clientid, ClientId} | Opts]; + _ -> + Opts end. enrich_port_opt(Opts) -> @@ -393,13 +389,11 @@ pipeline([Fun|More], Input) -> receive_loop(Client, Print) -> receive {'EXIT', Client, Reason} -> - io:format("Client down: ~p~n", [Reason]), - %% Reason is never 'normal', so, always halt with 1 - halt(1); + log_halt("Client down: ~p~n", [Reason]); {publish, #{payload := Payload}} -> case Print of - "size" -> io:format("received ~p bytes~n", [size(Payload)]); - _ -> io:format("~s~n", [Payload]) + "size" -> log("Received ~p bytes~n", [size(Payload)]); + _ -> log("~s~n", [Payload]) end, receive_loop(Client, Print); ping -> @@ -411,3 +405,10 @@ receive_loop(Client, Print) -> i(true) -> 1; i(false) -> 0. + +log(Fmt, Args) -> + io:format(Fmt, Args). + +log_halt(Fmt, Args) -> + log(Fmt, Args), + halt(1). From 23a17b47c86a857bfdf94d7d9c1c2a536d7e6603 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 28 Dec 2023 16:00:08 +0100 Subject: [PATCH 6/7] feat(cli): log with timestamp --- changelog.md | 2 ++ src/emqtt_cli.erl | 7 ++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 696d0881..4b0845e0 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,8 @@ # 1.11.0 - Add `--log-level` option to CLI. +- Add timestamp to CLI logs. +- Exit with non-zero code when CLI stops due to error. # 1.10.0 diff --git a/src/emqtt_cli.erl b/src/emqtt_cli.erl index 4678d216..ec51e612 100644 --- a/src/emqtt_cli.erl +++ b/src/emqtt_cli.erl @@ -407,8 +407,13 @@ i(true) -> 1; i(false) -> 0. log(Fmt, Args) -> - io:format(Fmt, Args). + io:format("~s " ++ Fmt, [ts() | Args]). log_halt(Fmt, Args) -> log(Fmt, Args), halt(1). + +ts() -> + SystemTime = erlang:system_time(millisecond), + Offset = erlang:time_offset(), + calendar:system_time_to_rfc3339(SystemTime, [{unit, millisecond}, {time_designator, $T}]). From c05245e68af9a91d45651a0518e70380508411a2 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 28 Dec 2023 16:28:26 +0100 Subject: [PATCH 7/7] feat(cli): support 'connect' command --- changelog.md | 2 ++ src/emqtt_cli.erl | 32 ++++++++++++++++++++------------ 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/changelog.md b/changelog.md index 4b0845e0..15e333d1 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,7 @@ # 1.11.0 +- Add `connect` command which only establishes connection, + but does not publish or subscribe. - Add `--log-level` option to CLI. - Add timestamp to CLI logs. - Exit with non-zero code when CLI stops due to error. diff --git a/src/emqtt_cli.erl b/src/emqtt_cli.erl index ec51e612..2eda8dfe 100644 --- a/src/emqtt_cli.erl +++ b/src/emqtt_cli.erl @@ -84,6 +84,8 @@ "Log level: debug | info | warning | error"} ]). +-define(CONNECT_OPTS, ?CONN_SHORT_OPTS ++ ?HELP_OPT ++ ?CONN_LONG_OPTS). + -define(PUB_OPTS, ?CONN_SHORT_OPTS ++ [{topic, $t, "topic", string, "mqtt topic on which to publish the message"}, @@ -115,12 +117,15 @@ "'size' to print payload size, 'as-string' to print payload as string"} ]). +main(["connect" | Argv]) -> + {ok, {Opts, _Args}} = getopt:parse(?CONNECT_OPTS, Argv), + ok = maybe_help(connect, Opts), + main(connect, Opts); main(["sub" | Argv]) -> {ok, {Opts, _Args}} = getopt:parse(?SUB_OPTS, Argv), ok = maybe_help(sub, Opts), ok = check_required_args(sub, [topic], Opts), main(sub, Opts); - main(["pub" | Argv]) -> {ok, {Opts, _Args}} = getopt:parse(?PUB_OPTS, Argv), ok = maybe_help(pub, Opts), @@ -136,9 +141,9 @@ main(["pub" | Argv]) -> main(pub, Opts); main(_Argv) -> - io:format("Usage: ~s pub | sub [--help]~n", [?CMD_NAME]). + io:format("Usage: ~s pub | sub | connect [--help]~n", [?CMD_NAME]). -main(PubSub, Opts0) -> +main(PubSubOrJustConnect, Opts0) -> _ = process_flag(trap_exit, true), application:ensure_all_started(quicer), application:ensure_all_started(emqtt), @@ -160,8 +165,11 @@ main(PubSub, Opts0) -> end, case ConnRet of {ok, Properties} -> - log("Sent CONNECT~n", []), - case PubSub of + log("Connected:~n~p~n", [Properties]), + case PubSubOrJustConnect of + connect -> + %% only connect, keep running + receive_loop(Client, Print); pub -> publish(Client, NOpts, proplists:get_value(repeat, Opts)), disconnect(Client); @@ -226,20 +234,21 @@ disconnect(Client) -> log_halt("Failed to send DISCONNECT due to: ~p~n", [Reason]) end. -maybe_help(PubSub, Opts) -> +maybe_help(PubSubOrConnect, Opts) -> case proplists:get_value(help, Opts) of true -> - usage(PubSub), + usage(PubSubOrConnect), halt(0); _ -> ok end. -usage(PubSub) -> - Opts = case PubSub of +usage(PubSubOrConnect) -> + Opts = case PubSubOrConnect of pub -> ?PUB_OPTS; - sub -> ?SUB_OPTS + sub -> ?SUB_OPTS; + connect -> ?CONNECT_OPTS end, - getopt:usage(Opts, ?CMD_NAME ++ " " ++ atom_to_list(PubSub)). + getopt:usage(Opts, ?CMD_NAME ++ " " ++ atom_to_list(PubSubOrConnect)). check_required_args(PubSub, Keys, Opts) -> lists:foreach(fun(Key) -> @@ -415,5 +424,4 @@ log_halt(Fmt, Args) -> ts() -> SystemTime = erlang:system_time(millisecond), - Offset = erlang:time_offset(), calendar:system_time_to_rfc3339(SystemTime, [{unit, millisecond}, {time_designator, $T}]).