From e458d4790ccbc364abab3b303e11ede674990c17 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 26 Jan 2022 16:38:24 +0800 Subject: [PATCH 1/4] chore(docs): update mqtt schema desc --- apps/emqx/etc/emqx.conf | 4 +- apps/emqx/src/emqx_schema.erl | 121 +++++++++++++++++++++++++--------- 2 files changed, 92 insertions(+), 33 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 0b877cf0b..2dc213b75 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -649,10 +649,12 @@ mqtt { ## The backoff for MQTT keepalive timeout. The broker will kick a connection out ## until 'Keepalive * backoff * 2' timeout. + ## There is one exception: + ## If the client connects successfully and then does not send any more packets, + ## it will be kicked out until 'Keepalive * backoff * 3'. ## ## @doc mqtt.keepalive_backoff ## ValueType: Float - ## Range: (0.5, 1] ## Default: 0.75 keepalive_backoff = 0.75 diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 935bef979..2cda55aff 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -155,7 +155,8 @@ roots(medium) -> roots(low) -> [ {"force_gc", sc(ref("force_gc"), - #{})} + #{ desc => "Force the MQTT connection process GC after this number of messages or bytes passed through." + })} , {"conn_congestion", sc(ref("conn_congestion"), #{})} @@ -288,131 +289,185 @@ fields("cache") -> fields("mqtt") -> [ {"idle_timeout", sc(hoconsc:union([infinity, duration()]), - #{ default => "15s" + #{ default => "15s", + desc => +"""How long time the MQTT connection will be disconnected if the +TCP connection is established but MQTT CONNECT has not been received.""" })} , {"max_packet_size", sc(bytesize(), - #{ default => "1MB" + #{ default => "1MB", + desc => "Maximum MQTT packet size allowed." })} , {"max_clientid_len", sc(range(23, 65535), - #{ default => 65535 + #{ default => 65535, + desc => "Maximum length of MQTT clientId allowed." })} , {"max_topic_levels", sc(range(1, 65535), - #{ default => 65535 + #{ default => 65535, + desc => "Maximum topic levels allowed." })} , {"max_qos_allowed", sc(range(0, 2), - #{ default => 2 + #{ default => 2, + desc => "Maximum QoS allowed." })} , {"max_topic_alias", sc(range(0, 65535), - #{ default => 65535 + #{ default => 65535, + desc => "Maximum Topic Alias, 0 means no topic alias supported." })} , {"retain_available", sc(boolean(), - #{ default => true + #{ default => true, + desc => "Supports MQTT retained messages." })} , {"wildcard_subscription", sc(boolean(), - #{ default => true + #{ default => true, + desc => "Supports MQTT Wildcard Subscriptions." })} , {"shared_subscription", sc(boolean(), - #{ default => true + #{ default => true, + desc => "Supports MQTT Shared Subscriptions" })} , {"ignore_loop_deliver", sc(boolean(), - #{ default => false + #{ default => false, + desc => "Ignore loop delivery of messages for mqtt v3.1.1" })} , {"strict_mode", sc(boolean(), - #{default => false + #{default => false, + desc => "Parse the MQTT frame in strict mode" }) } , {"response_information", sc(string(), - #{default => "" + #{default => "", + desc => +"""Specify the response information returned to the client +This feature is disabled if is set to \"\".""" }) } , {"server_keepalive", sc(hoconsc:union([integer(), disabled]), - #{ default => disabled + #{ default => disabled, + desc => "Server Keep Alive of MQTT 5.0" }) } , {"keepalive_backoff", sc(float(), - #{default => 0.75 + #{default => 0.75, + desc => +"""The backoff for MQTT keepalive timeout. The broker will kick a connection out +until 'Keepalive * backoff * 2' timeout. +There is one exception: +If the client connects successfully and then does not send any more packets, +it will be kicked out until 'Keepalive * backoff * 3'.""" }) } , {"max_subscriptions", sc(hoconsc:union([range(1, inf), infinity]), - #{ default => infinity + #{ default => infinity, + desc => "Maximum number of subscriptions allowed." }) } , {"upgrade_qos", sc(boolean(), - #{ default => false + #{ default => false, + desc => "Force to upgrade QoS according to subscription." }) } , {"max_inflight", sc(range(1, 65535), - #{ default => 32 + #{ default => 32, + desc => "Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked." }) } , {"retry_interval", sc(duration(), - #{default => "30s" + #{ default => "30s", + desc => "Retry interval for QoS1/2 message delivering." }) } , {"max_awaiting_rel", sc(hoconsc:union([integer(), infinity]), - #{ default => 100 + #{ default => 100, + desc => "Maximum QoS2 packets (Client -> Broker) awaiting PUBREL." }) } , {"await_rel_timeout", sc(duration(), - #{ default => "300s" + #{ default => "300s", + desc => "The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout." }) } , {"session_expiry_interval", sc(duration(), - #{ default => "2h" + #{ default => "2h", + desc => "Default session expiry interval for MQTT V3.1.1 connections." }) } , {"max_mqueue_len", sc(hoconsc:union([range(0, inf), infinity]), - #{ default => 1000 + #{ default => 1000, + desc => +"""Maximum queue length. Enqueued messages when persistent client disconnected, +or inflight window is full.""" }) } , {"mqueue_priorities", sc(hoconsc:union([map(), disabled]), - #{ default => disabled + #{ default => disabled, + desc => +"""Topic priorities.
+There's no priority table by default, hence all messages are treated equal.
+Priority number [1-255]
+ +**NOTE**: comma and equal signs are not allowed for priority topic names
+**NOTE**: Messages for topics not in the priority table are treated as +either highest or lowest priority depending on the configured value for mqtt.mqueue_default_priority +

+**Examples**: +To configure \"topic/1\" > \"topic/2\": +mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}""" }) } , {"mqueue_default_priority", sc(hoconsc:enum([highest, lowest]), - #{ default => lowest + #{ default => lowest, + desc => "Default to highest priority for topics not matching priority table" }) } , {"mqueue_store_qos0", sc(boolean(), - #{ default => true + #{ default => true, + desc => "Support enqueue QoS0 messages." }) } , {"use_username_as_clientid", sc(boolean(), - #{ default => false + #{ default => false, + desc => "use username replace client id" }) } , {"peer_cert_as_username", sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]), - #{ default => disabled + #{ default => disabled, + desc => +"""Use the CN, DN or CRT field from the client certificate as a username. +Only works for SSL connection.""" })} , {"peer_cert_as_clientid", sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]), - #{ default => disabled + #{ default => disabled, + desc => +"""Use the CN, DN or CRT field from the client certificate as a clientid. +Only works for SSL connection.""" })} ]; @@ -529,14 +584,16 @@ fields("force_gc") -> [ {"enable", sc(boolean(), #{ default => true - })} + })} , {"count", sc(range(0, inf), - #{ default => 16000 + #{ default => 16000, + desc => "GC the process after how many messages received" })} , {"bytes", sc(bytesize(), - #{ default => "16MB" + #{ default => "16MB", + desc => "GC the process after how much bytes passed through" })} ]; From abc0a3526e185aa6200ae828e976238503518d71 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 26 Jan 2022 18:27:15 +0800 Subject: [PATCH 2/4] fix(keepalive): keepalive init with right recv_oct --- apps/emqx/etc/emqx.conf | 3 --- apps/emqx/src/emqx_keepalive.erl | 2 +- apps/emqx/src/emqx_schema.erl | 5 +---- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 2dc213b75..de4e6fb3c 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -649,9 +649,6 @@ mqtt { ## The backoff for MQTT keepalive timeout. The broker will kick a connection out ## until 'Keepalive * backoff * 2' timeout. - ## There is one exception: - ## If the client connects successfully and then does not send any more packets, - ## it will be kicked out until 'Keepalive * backoff * 3'. ## ## @doc mqtt.keepalive_backoff ## ValueType: Float diff --git a/apps/emqx/src/emqx_keepalive.erl b/apps/emqx/src/emqx_keepalive.erl index 4a589dc61..0f7e340cb 100644 --- a/apps/emqx/src/emqx_keepalive.erl +++ b/apps/emqx/src/emqx_keepalive.erl @@ -39,7 +39,7 @@ -spec(init(Interval :: non_neg_integer()) -> keepalive()). init(Interval) when Interval > 0 -> #keepalive{interval = Interval, - statval = 0, + statval = emqx_pd:get_counter(incoming_bytes), repeat = 0}. %% @doc Get Info of the keepalive. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 2cda55aff..c6d32c05e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -364,10 +364,7 @@ This feature is disabled if is set to \"\".""" #{default => 0.75, desc => """The backoff for MQTT keepalive timeout. The broker will kick a connection out -until 'Keepalive * backoff * 2' timeout. -There is one exception: -If the client connects successfully and then does not send any more packets, -it will be kicked out until 'Keepalive * backoff * 3'.""" +until 'Keepalive * backoff * 2' timeout.""" }) } , {"max_subscriptions", From 3ce3c5c805ae0c49c0a2a985d14de132fc3d61f6 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 27 Jan 2022 10:54:11 +0800 Subject: [PATCH 3/4] fix(doc): Update the documentation according to the review --- apps/emqx/etc/emqx.conf | 4 ++-- apps/emqx/src/emqx_schema.erl | 32 +++++++++++++++++--------------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index de4e6fb3c..74ad9b873 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -647,8 +647,8 @@ mqtt { ## Default: disabled server_keepalive = disabled - ## The backoff for MQTT keepalive timeout. The broker will kick a connection out - ## until 'Keepalive * backoff * 2' timeout. + ## The backoff for MQTT keepalive timeout. The broker will close the connection + ## after idling for 'Keepalive * backoff * 2'. ## ## @doc mqtt.keepalive_backoff ## ValueType: Float diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index c6d32c05e..04525a805 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -155,7 +155,9 @@ roots(medium) -> roots(low) -> [ {"force_gc", sc(ref("force_gc"), - #{ desc => "Force the MQTT connection process GC after this number of messages or bytes passed through." + #{ desc => +"""Force the MQTT connection process garbage collection after +this number of messages or bytes passed through.""" })} , {"conn_congestion", sc(ref("conn_congestion"), @@ -291,8 +293,8 @@ fields("mqtt") -> sc(hoconsc:union([infinity, duration()]), #{ default => "15s", desc => -"""How long time the MQTT connection will be disconnected if the -TCP connection is established but MQTT CONNECT has not been received.""" +"""Close TCP connections from the clients that have not sent MQTT CONNECT +message within this interval.""" })} , {"max_packet_size", sc(bytesize(), @@ -302,7 +304,7 @@ TCP connection is established but MQTT CONNECT has not been received.""" , {"max_clientid_len", sc(range(23, 65535), #{ default => 65535, - desc => "Maximum length of MQTT clientId allowed." + desc => "Maximum allowed length of MQTT clientId." })} , {"max_topic_levels", sc(range(1, 65535), @@ -322,17 +324,17 @@ TCP connection is established but MQTT CONNECT has not been received.""" , {"retain_available", sc(boolean(), #{ default => true, - desc => "Supports MQTT retained messages." + desc => "Support MQTT retained messages." })} , {"wildcard_subscription", sc(boolean(), #{ default => true, - desc => "Supports MQTT Wildcard Subscriptions." + desc => "Support MQTT Wildcard Subscriptions." })} , {"shared_subscription", sc(boolean(), #{ default => true, - desc => "Supports MQTT Shared Subscriptions" + desc => "Support MQTT Shared Subscriptions" })} , {"ignore_loop_deliver", sc(boolean(), @@ -363,8 +365,8 @@ This feature is disabled if is set to \"\".""" sc(float(), #{default => 0.75, desc => -"""The backoff for MQTT keepalive timeout. The broker will kick a connection out -until 'Keepalive * backoff * 2' timeout.""" +"""The backoff for MQTT keepalive timeout. The broker will close the connection +after idling for 'Keepalive * backoff * 2'.""" }) } , {"max_subscriptions", @@ -425,7 +427,7 @@ or inflight window is full.""" There's no priority table by default, hence all messages are treated equal.
Priority number [1-255]
-**NOTE**: comma and equal signs are not allowed for priority topic names
+**NOTE**: Comma and equal signs are not allowed for priority topic names
**NOTE**: Messages for topics not in the priority table are treated as either highest or lowest priority depending on the configured value for mqtt.mqueue_default_priority

@@ -449,7 +451,7 @@ mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}""" , {"use_username_as_clientid", sc(boolean(), #{ default => false, - desc => "use username replace client id" + desc => "Replace client id with the username" }) } , {"peer_cert_as_username", @@ -457,14 +459,14 @@ mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}""" #{ default => disabled, desc => """Use the CN, DN or CRT field from the client certificate as a username. -Only works for SSL connection.""" +Only works for the TLS connection.""" })} , {"peer_cert_as_clientid", sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]), #{ default => disabled, desc => """Use the CN, DN or CRT field from the client certificate as a clientid. -Only works for SSL connection.""" +Only works for the TLS connection.""" })} ]; @@ -585,7 +587,7 @@ fields("force_gc") -> , {"count", sc(range(0, inf), #{ default => 16000, - desc => "GC the process after how many messages received" + desc => "GC the process after this many received messages" })} , {"bytes", sc(bytesize(), @@ -1068,7 +1070,7 @@ fields("alarm") -> example => [log, publish], desc => """The actions triggered when the alarm is activated.<\br> -Currently supports two actions, 'log' and 'publish'. +Currently support two actions, 'log' and 'publish'. 'log' is to write the alarm to log (console or file). 'publish' is to publish the alarm as an MQTT message to the system topics: $SYS/brokers/emqx@xx.xx.xx.x/alarms/activate and From a7676d0163dfbfae23ce6c2c07c6bf2b51da1ce2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 27 Jan 2022 18:57:35 +0800 Subject: [PATCH 4/4] fix(doc): Update the documentation/keepalive according to the review. --- apps/emqx/src/emqx_channel.erl | 3 ++- apps/emqx/src/emqx_keepalive.erl | 9 +++++++-- apps/emqx/src/emqx_schema.erl | 27 ++++++++++++++----------- apps/emqx/test/emqx_keepalive_SUITE.erl | 1 - 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index a616f462d..07463b0c5 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1670,7 +1670,8 @@ ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(disabled, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> Backoff = get_mqtt_conf(Zone, keepalive_backoff), - Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), + RecvOct = emqx_pd:get_counter(incoming_bytes), + Keepalive = emqx_keepalive:init(RecvOct, round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). clear_keepalive(Channel = #channel{timers = Timers}) -> diff --git a/apps/emqx/src/emqx_keepalive.erl b/apps/emqx/src/emqx_keepalive.erl index 0f7e340cb..b19cd9277 100644 --- a/apps/emqx/src/emqx_keepalive.erl +++ b/apps/emqx/src/emqx_keepalive.erl @@ -17,6 +17,7 @@ -module(emqx_keepalive). -export([ init/1 + , init/2 , info/1 , info/2 , check/2 @@ -37,9 +38,13 @@ %% @doc Init keepalive. -spec(init(Interval :: non_neg_integer()) -> keepalive()). -init(Interval) when Interval > 0 -> +init(Interval) -> init(0, Interval). + +%% @doc Init keepalive. +-spec(init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive()). +init(StatVal, Interval) when Interval > 0 -> #keepalive{interval = Interval, - statval = emqx_pd:get_counter(incoming_bytes), + statval = StatVal, repeat = 0}. %% @doc Get Info of the keepalive. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 04525a805..df6d93948 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -157,7 +157,7 @@ roots(low) -> sc(ref("force_gc"), #{ desc => """Force the MQTT connection process garbage collection after -this number of messages or bytes passed through.""" +this number of messages or bytes have passed through.""" })} , {"conn_congestion", sc(ref("conn_congestion"), @@ -334,17 +334,17 @@ message within this interval.""" , {"shared_subscription", sc(boolean(), #{ default => true, - desc => "Support MQTT Shared Subscriptions" + desc => "Support MQTT Shared Subscriptions." })} , {"ignore_loop_deliver", sc(boolean(), #{ default => false, - desc => "Ignore loop delivery of messages for mqtt v3.1.1" + desc => "Ignore loop delivery of messages for mqtt v3.1.1." })} , {"strict_mode", sc(boolean(), #{default => false, - desc => "Parse the MQTT frame in strict mode" + desc => "Parse the MQTT frame in strict mode." }) } , {"response_information", @@ -358,7 +358,10 @@ This feature is disabled if is set to \"\".""" , {"server_keepalive", sc(hoconsc:union([integer(), disabled]), #{ default => disabled, - desc => "Server Keep Alive of MQTT 5.0" + desc => +"""Server Keep Alive of MQTT 5.0. +If the Server returns a Server Keep Alive on the CONNACK packet, +the Client MUST use that value instead of the value it sent as the Keep Alive.""" }) } , {"keepalive_backoff", @@ -378,7 +381,7 @@ after idling for 'Keepalive * backoff * 2'.""" , {"upgrade_qos", sc(boolean(), #{ default => false, - desc => "Force to upgrade QoS according to subscription." + desc => "Force upgrade of QoS level according to subscription." }) } , {"max_inflight", @@ -427,9 +430,9 @@ or inflight window is full.""" There's no priority table by default, hence all messages are treated equal.
Priority number [1-255]
-**NOTE**: Comma and equal signs are not allowed for priority topic names
+**NOTE**: Comma and equal signs are not allowed for priority topic names.
**NOTE**: Messages for topics not in the priority table are treated as -either highest or lowest priority depending on the configured value for mqtt.mqueue_default_priority +either highest or lowest priority depending on the configured value for mqtt.mqueue_default_priority.

**Examples**: To configure \"topic/1\" > \"topic/2\": @@ -439,7 +442,7 @@ mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}""" , {"mqueue_default_priority", sc(hoconsc:enum([highest, lowest]), #{ default => lowest, - desc => "Default to highest priority for topics not matching priority table" + desc => "Default to highest priority for topics not matching priority table." }) } , {"mqueue_store_qos0", @@ -451,7 +454,7 @@ mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}""" , {"use_username_as_clientid", sc(boolean(), #{ default => false, - desc => "Replace client id with the username" + desc => "Replace client id with the username." }) } , {"peer_cert_as_username", @@ -587,12 +590,12 @@ fields("force_gc") -> , {"count", sc(range(0, inf), #{ default => 16000, - desc => "GC the process after this many received messages" + desc => "GC the process after this many received messages." })} , {"bytes", sc(bytesize(), #{ default => "16MB", - desc => "GC the process after how much bytes passed through" + desc => "GC the process after specified number of bytes have passed through." })} ]; diff --git a/apps/emqx/test/emqx_keepalive_SUITE.erl b/apps/emqx/test/emqx_keepalive_SUITE.erl index 8baf52528..7f725e61b 100644 --- a/apps/emqx/test/emqx_keepalive_SUITE.erl +++ b/apps/emqx/test/emqx_keepalive_SUITE.erl @@ -39,4 +39,3 @@ t_check(_) -> ?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)), ?assertEqual(1, emqx_keepalive:info(repeat, Keepalive2)), ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive2)). -