diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf
index 179a6505d..1fee36ee3 100644
--- a/apps/emqx/etc/emqx.conf
+++ b/apps/emqx/etc/emqx.conf
@@ -647,12 +647,11 @@ mqtt {
## Default: disabled
server_keepalive = disabled
- ## The backoff for MQTT keepalive timeout. The broker will kick a connection out
- ## until 'Keepalive * backoff * 2' timeout.
+ ## The backoff for MQTT keepalive timeout. The broker will close the connection
+ ## after idling for 'Keepalive * backoff * 2'.
##
## @doc mqtt.keepalive_backoff
## ValueType: Float
- ## Range: (0.5, 1]
## Default: 0.75
keepalive_backoff = 0.75
diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl
index a616f462d..07463b0c5 100644
--- a/apps/emqx/src/emqx_channel.erl
+++ b/apps/emqx/src/emqx_channel.erl
@@ -1670,7 +1670,8 @@ ensure_keepalive_timer(0, Channel) -> Channel;
ensure_keepalive_timer(disabled, Channel) -> Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
Backoff = get_mqtt_conf(Zone, keepalive_backoff),
- Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
+ RecvOct = emqx_pd:get_counter(incoming_bytes),
+ Keepalive = emqx_keepalive:init(RecvOct, round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) ->
diff --git a/apps/emqx/src/emqx_keepalive.erl b/apps/emqx/src/emqx_keepalive.erl
index 4a589dc61..b19cd9277 100644
--- a/apps/emqx/src/emqx_keepalive.erl
+++ b/apps/emqx/src/emqx_keepalive.erl
@@ -17,6 +17,7 @@
-module(emqx_keepalive).
-export([ init/1
+ , init/2
, info/1
, info/2
, check/2
@@ -37,9 +38,13 @@
%% @doc Init keepalive.
-spec(init(Interval :: non_neg_integer()) -> keepalive()).
-init(Interval) when Interval > 0 ->
+init(Interval) -> init(0, Interval).
+
+%% @doc Init keepalive.
+-spec(init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive()).
+init(StatVal, Interval) when Interval > 0 ->
#keepalive{interval = Interval,
- statval = 0,
+ statval = StatVal,
repeat = 0}.
%% @doc Get Info of the keepalive.
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index e9000c449..2f34f825f 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -155,7 +155,10 @@ roots(medium) ->
roots(low) ->
[ {"force_gc",
sc(ref("force_gc"),
- #{})}
+ #{ desc =>
+"""Force the MQTT connection process garbage collection after
+this number of messages or bytes have passed through."""
+ })}
, {"conn_congestion",
sc(ref("conn_congestion"),
#{})}
@@ -288,131 +291,185 @@ fields("cache") ->
fields("mqtt") ->
[ {"idle_timeout",
sc(hoconsc:union([infinity, duration()]),
- #{ default => "15s"
+ #{ default => "15s",
+ desc =>
+"""Close TCP connections from the clients that have not sent MQTT CONNECT
+message within this interval."""
})}
, {"max_packet_size",
sc(bytesize(),
- #{ default => "1MB"
+ #{ default => "1MB",
+ desc => "Maximum MQTT packet size allowed."
})}
, {"max_clientid_len",
sc(range(23, 65535),
- #{ default => 65535
+ #{ default => 65535,
+ desc => "Maximum allowed length of MQTT clientId."
})}
, {"max_topic_levels",
sc(range(1, 65535),
- #{ default => 65535
+ #{ default => 65535,
+ desc => "Maximum topic levels allowed."
})}
, {"max_qos_allowed",
sc(range(0, 2),
- #{ default => 2
+ #{ default => 2,
+ desc => "Maximum QoS allowed."
})}
, {"max_topic_alias",
sc(range(0, 65535),
- #{ default => 65535
+ #{ default => 65535,
+ desc => "Maximum Topic Alias, 0 means no topic alias supported."
})}
, {"retain_available",
sc(boolean(),
- #{ default => true
+ #{ default => true,
+ desc => "Support MQTT retained messages."
})}
, {"wildcard_subscription",
sc(boolean(),
- #{ default => true
+ #{ default => true,
+ desc => "Support MQTT Wildcard Subscriptions."
})}
, {"shared_subscription",
sc(boolean(),
- #{ default => true
+ #{ default => true,
+ desc => "Support MQTT Shared Subscriptions."
})}
, {"ignore_loop_deliver",
sc(boolean(),
- #{ default => false
+ #{ default => false,
+ desc => "Ignore loop delivery of messages for mqtt v3.1.1."
})}
, {"strict_mode",
sc(boolean(),
- #{default => false
+ #{default => false,
+ desc => "Parse the MQTT frame in strict mode."
})
}
, {"response_information",
sc(string(),
- #{default => ""
+ #{default => "",
+ desc =>
+"""Specify the response information returned to the client
+This feature is disabled if is set to \"\"."""
})
}
, {"server_keepalive",
sc(hoconsc:union([integer(), disabled]),
- #{ default => disabled
+ #{ default => disabled,
+ desc =>
+"""Server Keep Alive of MQTT 5.0.
+If the Server returns a Server Keep Alive on the CONNACK packet,
+the Client MUST use that value instead of the value it sent as the Keep Alive."""
})
}
, {"keepalive_backoff",
sc(float(),
- #{default => 0.75
+ #{default => 0.75,
+ desc =>
+"""The backoff for MQTT keepalive timeout. The broker will close the connection
+after idling for 'Keepalive * backoff * 2'."""
})
}
, {"max_subscriptions",
sc(hoconsc:union([range(1, inf), infinity]),
- #{ default => infinity
+ #{ default => infinity,
+ desc => "Maximum number of subscriptions allowed."
})
}
, {"upgrade_qos",
sc(boolean(),
- #{ default => false
+ #{ default => false,
+ desc => "Force upgrade of QoS level according to subscription."
})
}
, {"max_inflight",
sc(range(1, 65535),
- #{ default => 32
+ #{ default => 32,
+ desc => "Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked."
})
}
, {"retry_interval",
sc(duration(),
- #{default => "30s"
+ #{ default => "30s",
+ desc => "Retry interval for QoS1/2 message delivering."
})
}
, {"max_awaiting_rel",
sc(hoconsc:union([integer(), infinity]),
- #{ default => 100
+ #{ default => 100,
+ desc => "Maximum QoS2 packets (Client -> Broker) awaiting PUBREL."
})
}
, {"await_rel_timeout",
sc(duration(),
- #{ default => "300s"
+ #{ default => "300s",
+ desc => "The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout."
})
}
, {"session_expiry_interval",
sc(duration(),
- #{ default => "2h"
+ #{ default => "2h",
+ desc => "Default session expiry interval for MQTT V3.1.1 connections."
})
}
, {"max_mqueue_len",
sc(hoconsc:union([range(0, inf), infinity]),
- #{ default => 1000
+ #{ default => 1000,
+ desc =>
+"""Maximum queue length. Enqueued messages when persistent client disconnected,
+or inflight window is full."""
})
}
, {"mqueue_priorities",
sc(hoconsc:union([map(), disabled]),
- #{ default => disabled
+ #{ default => disabled,
+ desc =>
+"""Topic priorities.
+There's no priority table by default, hence all messages are treated equal.
+Priority number [1-255]
+
+**NOTE**: Comma and equal signs are not allowed for priority topic names.
+**NOTE**: Messages for topics not in the priority table are treated as
+either highest or lowest priority depending on the configured value for mqtt.mqueue_default_priority.
+
+**Examples**:
+To configure \"topic/1\" > \"topic/2\":
+mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}"""
})
}
, {"mqueue_default_priority",
sc(hoconsc:enum([highest, lowest]),
- #{ default => lowest
+ #{ default => lowest,
+ desc => "Default to highest priority for topics not matching priority table."
})
}
, {"mqueue_store_qos0",
sc(boolean(),
- #{ default => true
+ #{ default => true,
+ desc => "Support enqueue QoS0 messages."
})
}
, {"use_username_as_clientid",
sc(boolean(),
- #{ default => false
+ #{ default => false,
+ desc => "Replace client id with the username."
})
}
, {"peer_cert_as_username",
sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
- #{ default => disabled
+ #{ default => disabled,
+ desc =>
+"""Use the CN, DN or CRT field from the client certificate as a username.
+Only works for the TLS connection."""
})}
, {"peer_cert_as_clientid",
sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
- #{ default => disabled
+ #{ default => disabled,
+ desc =>
+"""Use the CN, DN or CRT field from the client certificate as a clientid.
+Only works for the TLS connection."""
})}
];
@@ -529,14 +586,16 @@ fields("force_gc") ->
[ {"enable",
sc(boolean(),
#{ default => true
- })}
+ })}
, {"count",
sc(range(0, inf),
- #{ default => 16000
+ #{ default => 16000,
+ desc => "GC the process after this many received messages."
})}
, {"bytes",
sc(bytesize(),
- #{ default => "16MB"
+ #{ default => "16MB",
+ desc => "GC the process after specified number of bytes have passed through."
})}
];
@@ -1014,7 +1073,7 @@ fields("alarm") ->
example => [log, publish],
desc =>
"""The actions triggered when the alarm is activated.<\br>
-Currently supports two actions, 'log' and 'publish'.
+Currently support two actions, 'log' and 'publish'.
'log' is to write the alarm to log (console or file).
'publish' is to publish the alarm as an MQTT message to the system topics:
$SYS/brokers/emqx@xx.xx.xx.x/alarms/activate and
diff --git a/apps/emqx/test/emqx_keepalive_SUITE.erl b/apps/emqx/test/emqx_keepalive_SUITE.erl
index 8baf52528..7f725e61b 100644
--- a/apps/emqx/test/emqx_keepalive_SUITE.erl
+++ b/apps/emqx/test/emqx_keepalive_SUITE.erl
@@ -39,4 +39,3 @@ t_check(_) ->
?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)),
?assertEqual(1, emqx_keepalive:info(repeat, Keepalive2)),
?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive2)).
-