diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml
new file mode 100644
index 000000000..ba0161293
--- /dev/null
+++ b/.ci/docker-compose-file/docker-compose-kafka.yaml
@@ -0,0 +1,73 @@
+version: '3.9'
+
+services:
+ zookeeper:
+ image: wurstmeister/zookeeper
+ ports:
+ - "2181:2181"
+ container_name: zookeeper
+ hostname: zookeeper
+ networks:
+ emqx_bridge:
+ ssl_cert_gen:
+ image: fredrikhgrelland/alpine-jdk11-openssl
+ container_name: ssl_cert_gen
+ volumes:
+ - emqx-shared-secret:/var/lib/secret
+ - ./kafka/generate-certs.sh:/bin/generate-certs.sh
+ entrypoint: /bin/sh
+ command: /bin/generate-certs.sh
+ kdc:
+ hostname: kdc.emqx.net
+ image: ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-ubuntu20.04
+ container_name: kdc.emqx.net
+ networks:
+ emqx_bridge:
+ volumes:
+ - emqx-shared-secret:/var/lib/secret
+ - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
+ - ./kerberos/krb5.conf:/etc/krb5.conf
+ - ./kerberos/run.sh:/usr/bin/run.sh
+ command: run.sh
+ kafka_1:
+ image: wurstmeister/kafka:2.13-2.7.0
+ ports:
+ - "9092:9092"
+ - "9093:9093"
+ - "9094:9094"
+ - "9095:9095"
+ container_name: kafka-1.emqx.net
+ hostname: kafka-1.emqx.net
+ depends_on:
+ - "kdc"
+ - "zookeeper"
+ - "ssl_cert_gen"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093,SSL://:9094,SASL_SSL://:9095
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093,SSL://kafka-1.emqx.net:9094,SASL_SSL://kafka-1.emqx.net:9095
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
+ KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka
+ KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
+ KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf"
+ KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
+ KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1,
+ KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
+ KAFKA_SSL_TRUSTSTORE_LOCATION: /var/lib/secret/kafka.truststore.jks
+ KAFKA_SSL_TRUSTSTORE_PASSWORD: password
+ KAFKA_SSL_KEYSTORE_LOCATION: /var/lib/secret/kafka.keystore.jks
+ KAFKA_SSL_KEYSTORE_PASSWORD: password
+ KAFKA_SSL_KEY_PASSWORD: password
+ networks:
+ emqx_bridge:
+ volumes:
+ - emqx-shared-secret:/var/lib/secret
+ - ./kafka/jaas.conf:/etc/kafka/jaas.conf
+ - ./kafka/run_add_scram_users.sh:/bin/run_add_scram_users.sh
+ - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
+ - ./kerberos/krb5.conf:/etc/krb5.conf
+ command: run_add_scram_users.sh
+
diff --git a/.ci/docker-compose-file/docker-compose-python.yaml b/.ci/docker-compose-file/docker-compose-python.yaml
index 0b9af4517..14e798c6b 100644
--- a/.ci/docker-compose-file/docker-compose-python.yaml
+++ b/.ci/docker-compose-file/docker-compose-python.yaml
@@ -2,7 +2,7 @@ version: '3.9'
services:
python:
- container_name: python
+ container_name: python
image: python:3.7.2-alpine3.9
depends_on:
- emqx1
diff --git a/.ci/docker-compose-file/docker-compose.yaml b/.ci/docker-compose-file/docker-compose.yaml
index 2612eb8d8..8db53d562 100644
--- a/.ci/docker-compose-file/docker-compose.yaml
+++ b/.ci/docker-compose-file/docker-compose.yaml
@@ -18,6 +18,9 @@ services:
- emqx_bridge
volumes:
- ../..:/emqx
+ - emqx-shared-secret:/var/lib/secret
+ - ./kerberos/krb5.conf:/etc/kdc/krb5.conf
+ - ./kerberos/krb5.conf:/etc/krb5.conf
working_dir: /emqx
tty: true
@@ -33,3 +36,6 @@ networks:
gateway: 172.100.239.1
- subnet: 2001:3200:3200::/64
gateway: 2001:3200:3200::1
+
+volumes: # add this section
+ emqx-shared-secret: # does not need anything underneath this
diff --git a/.ci/docker-compose-file/kafka/generate-certs.sh b/.ci/docker-compose-file/kafka/generate-certs.sh
new file mode 100755
index 000000000..3f1c75550
--- /dev/null
+++ b/.ci/docker-compose-file/kafka/generate-certs.sh
@@ -0,0 +1,46 @@
+#!/usr/bin/bash
+
+set -euo pipefail
+
+set -x
+
+# Source https://github.com/zmstone/docker-kafka/blob/master/generate-certs.sh
+
+HOST="*."
+DAYS=3650
+PASS="password"
+
+cd /var/lib/secret/
+
+# Delete old files
+(rm ca.key ca.crt server.key server.csr server.crt client.key client.csr client.crt server.p12 kafka.keystore.jks kafka.truststore.jks 2>/dev/null || true)
+
+ls
+
+echo '== Generate self-signed server and client certificates'
+echo '= generate CA'
+openssl req -new -x509 -keyout ca.key -out ca.crt -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST"
+
+echo '= generate server certificate request'
+openssl req -newkey rsa:2048 -sha256 -keyout server.key -out server.csr -days "$DAYS" -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST"
+
+echo '= sign server certificate'
+openssl x509 -req -CA ca.crt -CAkey ca.key -in server.csr -out server.crt -days "$DAYS" -CAcreateserial
+
+echo '= generate client certificate request'
+openssl req -newkey rsa:2048 -sha256 -keyout client.key -out client.csr -days "$DAYS" -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST"
+
+echo '== sign client certificate'
+openssl x509 -req -CA ca.crt -CAkey ca.key -in client.csr -out client.crt -days $DAYS -CAserial ca.srl
+
+echo '= Convert self-signed certificate to PKCS#12 format'
+openssl pkcs12 -export -name "$HOST" -in server.crt -inkey server.key -out server.p12 -CAfile ca.crt -passout pass:"$PASS"
+
+echo '= Import PKCS#12 into a java keystore'
+
+echo $PASS | keytool -importkeystore -destkeystore kafka.keystore.jks -srckeystore server.p12 -srcstoretype pkcs12 -alias "$HOST" -storepass "$PASS"
+
+
+echo '= Import CA into java truststore'
+
+echo yes | keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca.crt -storepass "$PASS"
diff --git a/.ci/docker-compose-file/kafka/jaas.conf b/.ci/docker-compose-file/kafka/jaas.conf
new file mode 100644
index 000000000..8ffe8457d
--- /dev/null
+++ b/.ci/docker-compose-file/kafka/jaas.conf
@@ -0,0 +1,16 @@
+KafkaServer {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ user_admin="password"
+ user_emqxuser="password";
+
+ org.apache.kafka.common.security.scram.ScramLoginModule required
+ username="admin"
+ password="password";
+
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="/var/lib/secret/kafka.keytab"
+ principal="kafka/kafka-1.emqx.net@KDC.EMQX.NET";
+
+};
diff --git a/.ci/docker-compose-file/kafka/run_add_scram_users.sh b/.ci/docker-compose-file/kafka/run_add_scram_users.sh
new file mode 100755
index 000000000..4b51fee0d
--- /dev/null
+++ b/.ci/docker-compose-file/kafka/run_add_scram_users.sh
@@ -0,0 +1,49 @@
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+
+TIMEOUT=60
+
+echo "+++++++ Sleep for a while to make sure that old keytab and truststore is deleted ++++++++"
+
+sleep 5
+
+echo "+++++++ Wait until Kerberos Keytab is created ++++++++"
+
+timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.keytab ]; do sleep 1; done'
+
+
+echo "+++++++ Wait until SSL certs are generated ++++++++"
+
+timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.truststore.jks ]; do sleep 1; done'
+
+sleep 3
+
+echo "+++++++ Starting Kafka ++++++++"
+
+start-kafka.sh &
+
+SERVER=localhost
+PORT1=9092
+PORT2=9093
+TIMEOUT=60
+
+echo "+++++++ Wait until Kafka ports are up ++++++++"
+
+# shellcheck disable=SC2016
+timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1
+
+# shellcheck disable=SC2016
+timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT2
+
+echo "+++++++ Run config commands ++++++++"
+
+kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=password],SCRAM-SHA-512=[password=password]' --entity-type users --entity-name emqxuser
+
+echo "+++++++ Wait until Kafka ports are down ++++++++"
+
+bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1
+
+echo "+++++++ Kafka ports are down ++++++++"
+
diff --git a/.ci/docker-compose-file/kerberos/krb5.conf b/.ci/docker-compose-file/kerberos/krb5.conf
new file mode 100644
index 000000000..032236888
--- /dev/null
+++ b/.ci/docker-compose-file/kerberos/krb5.conf
@@ -0,0 +1,23 @@
+[libdefaults]
+ default_realm = KDC.EMQX.NET
+ ticket_lifetime = 24h
+ renew_lifetime = 7d
+ forwardable = true
+ rdns = false
+ dns_lookup_kdc = no
+ dns_lookup_realm = no
+
+[realms]
+ KDC.EMQX.NET = {
+ kdc = kdc
+ admin_server = kadmin
+ }
+
+[domain_realm]
+ kdc.emqx.net = KDC.EMQX.NET
+ .kdc.emqx.net = KDC.EMQX.NET
+
+[logging]
+ kdc = FILE:/var/log/kerberos/krb5kdc.log
+ admin_server = FILE:/var/log/kerberos/kadmin.log
+ default = FILE:/var/log/kerberos/krb5lib.log
diff --git a/.ci/docker-compose-file/kerberos/run.sh b/.ci/docker-compose-file/kerberos/run.sh
new file mode 100755
index 000000000..c9580073f
--- /dev/null
+++ b/.ci/docker-compose-file/kerberos/run.sh
@@ -0,0 +1,25 @@
+#!/bin/sh
+
+
+echo "Remove old keytabs"
+
+rm -f /var/lib/secret/kafka.keytab > /dev/null 2>&1
+rm -f /var/lib/secret/rig.keytab > /dev/null 2>&1
+
+echo "Create realm"
+
+kdb5_util -P emqx -r KDC.EMQX.NET create -s
+
+echo "Add principals"
+
+kadmin.local -w password -q "add_principal -randkey kafka/kafka-1.emqx.net@KDC.EMQX.NET"
+kadmin.local -w password -q "add_principal -randkey rig@KDC.EMQX.NET" > /dev/null
+
+
+echo "Create keytabs"
+
+kadmin.local -w password -q "ktadd -k /var/lib/secret/kafka.keytab -norandkey kafka/kafka-1.emqx.net@KDC.EMQX.NET " > /dev/null
+kadmin.local -w password -q "ktadd -k /var/lib/secret/rig.keytab -norandkey rig@KDC.EMQX.NET " > /dev/null
+
+echo STARTING KDC
+/usr/sbin/krb5kdc -n
diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config
index 242f95fcb..8c7635d58 100644
--- a/apps/emqx/rebar.config
+++ b/apps/emqx/rebar.config
@@ -22,7 +22,7 @@
%% This rebar.config is necessary because the app may be used as a
%% `git_subdir` dependency in other projects.
{deps, [
- {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}},
+ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl
index ce998656a..24477b21b 100644
--- a/apps/emqx/test/emqx_common_test_helpers.erl
+++ b/apps/emqx/test/emqx_common_test_helpers.erl
@@ -32,6 +32,7 @@
stop_apps/1,
reload/2,
app_path/2,
+ proj_root/0,
deps_path/2,
flush/0,
flush/1
@@ -245,6 +246,14 @@ stop_apps(Apps) ->
[application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
ok.
+proj_root() ->
+ filename:join(
+ lists:takewhile(
+ fun(X) -> iolist_to_binary(X) =/= <<"_build">> end,
+ filename:split(app_path(emqx, "."))
+ )
+ ).
+
%% backward compatible
deps_path(App, RelativePath) -> app_path(App, RelativePath).
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index d32115fff..df479952b 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -92,7 +92,7 @@ param_path_operation_cluster() ->
#{
in => path,
required => true,
- example => <<"start">>,
+ example => <<"restart">>,
desc => ?DESC("desc_param_path_operation_cluster")
}
)}.
diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl
index f0773a8ea..2894ec461 100644
--- a/apps/emqx_bridge/src/emqx_bridge_resource.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl
@@ -44,7 +44,7 @@
]).
%% bi-directional bridge with producer/consumer or ingress/egress configs
--define(IS_BI_DIR_BRIDGE(TYPE), TYPE == <<"mqtt">>; TYPE == <<"kafka">>).
+-define(IS_BI_DIR_BRIDGE(TYPE), TYPE =:= <<"mqtt">>; TYPE =:= <<"kafka">>).
-if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
@@ -261,7 +261,7 @@ parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) ->
%% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
%% receives a message from the external database.
BName = bridge_id(Type, Name),
- Conf#{hookpoint => <<"$bridges/", BName/binary>>};
+ Conf#{hookpoint => <<"$bridges/", BName/binary>>, bridge_name => Name};
parse_confs(_Type, _Name, Conf) ->
Conf.
diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl
index 309c34195..8086dfa25 100644
--- a/apps/emqx_resource/src/emqx_resource.erl
+++ b/apps/emqx_resource/src/emqx_resource.erl
@@ -93,7 +93,8 @@
%% verify if the resource is working normally
call_health_check/3,
%% stop the instance
- call_stop/3
+ call_stop/3,
+ is_buffer_supported/1
]).
%% list all the instances, id only.
@@ -117,7 +118,8 @@
on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
- on_get_status/2
+ on_get_status/2,
+ is_buffer_supported/0
]).
%% when calling emqx_resource:start/1
@@ -155,6 +157,8 @@
| {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}.
+-callback is_buffer_supported() -> boolean().
+
-spec list_types() -> [module()].
list_types() ->
discover_resource_mods().
@@ -256,10 +260,15 @@ query(ResId, Request) ->
Result :: term().
query(ResId, Request, Opts) ->
case emqx_resource_manager:ets_lookup(ResId) of
- {ok, _Group, #{query_mode := QM}} ->
- case QM of
- sync -> emqx_resource_worker:sync_query(ResId, Request, Opts);
- async -> emqx_resource_worker:async_query(ResId, Request, Opts)
+ {ok, _Group, #{query_mode := QM, mod := Module}} ->
+ IsBufferSupported = is_buffer_supported(Module),
+ case {IsBufferSupported, QM} of
+ {true, _} ->
+ emqx_resource_worker:simple_sync_query(ResId, Request);
+ {false, sync} ->
+ emqx_resource_worker:sync_query(ResId, Request, Opts);
+ {false, async} ->
+ emqx_resource_worker:async_query(ResId, Request, Opts)
end;
{error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found")
@@ -336,6 +345,15 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
get_callback_mode(Mod) ->
Mod:callback_mode().
+-spec is_buffer_supported(module()) -> boolean().
+is_buffer_supported(Module) ->
+ try
+ Module:is_buffer_supported()
+ catch
+ _:_ ->
+ false
+ end.
+
-spec call_start(manager_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
call_start(MgrId, Mod, Config) ->
diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl
index 8d61832e3..c8d5e4194 100644
--- a/apps/emqx_resource/src/emqx_resource_manager.erl
+++ b/apps/emqx_resource/src/emqx_resource_manager.erl
@@ -148,14 +148,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
],
[matched]
),
- ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
- case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
+ case emqx_resource:is_buffer_supported(ResourceType) of
true ->
- wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
+ %% the resource it self supports
+ %% buffer, so there is no need for resource workers
+ ok;
false ->
- ok
- end,
- ok.
+ ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
+ case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
+ true ->
+ wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
+ false ->
+ ok
+ end
+ end.
%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
%%
diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct
index f350a379c..a79037903 100644
--- a/lib-ee/emqx_ee_bridge/docker-ct
+++ b/lib-ee/emqx_ee_bridge/docker-ct
@@ -1,2 +1,3 @@
mongo
mongo_rs_sharded
+kafka
diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
new file mode 100644
index 000000000..1fdbfedc4
--- /dev/null
+++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
@@ -0,0 +1,471 @@
+emqx_ee_bridge_kafka {
+ config_enable {
+ desc {
+ en: "Enable (true) or disable (false) this Kafka bridge."
+ zh: "启用(true)或停用该(false)Kafka 数据桥接。"
+ }
+ label {
+ en: "Enable or Disable"
+ zh: "启用或停用"
+ }
+ }
+ desc_config {
+ desc {
+ en: """Configuration for a Kafka bridge."""
+ zh: """Kafka 桥接配置"""
+ }
+ label {
+ en: "Kafka Bridge Configuration"
+ zh: "Kafka 桥接配置"
+ }
+ }
+ desc_type {
+ desc {
+ en: """The Bridge Type"""
+ zh: """桥接类型"""
+ }
+ label {
+ en: "Bridge Type"
+ zh: "桥接类型"
+ }
+ }
+ desc_name {
+ desc {
+ en: """Bridge name, used as a human-readable description of the bridge."""
+ zh: """桥接名字,可读描述"""
+ }
+ label {
+ en: "Bridge Name"
+ zh: "桥接名字"
+ }
+ }
+ producer_opts {
+ desc {
+ en: "Local MQTT data source and Kafka bridge configs."
+ zh: "本地 MQTT 数据源和 Kafka 桥接的配置。"
+ }
+ label {
+ en: "MQTT to Kafka"
+ zh: "MQTT 到 Kafka"
+ }
+ }
+ producer_mqtt_opts {
+ desc {
+ en: "MQTT data source. Optional when used as a rule-engine action."
+ zh: "需要桥接到 MQTT 源主题。"
+ }
+ label {
+ en: "MQTT Source Topic"
+ zh: "MQTT 源主题"
+ }
+ }
+ mqtt_topic {
+ desc {
+ en: "MQTT topic or topic as data source (bridge input)."
+ zh: "指定 MQTT 主题作为桥接的数据源"
+ }
+ label {
+ en: "Source MQTT Topic"
+ zh: "源 MQTT 主题"
+ }
+ }
+ producer_kafka_opts {
+ desc {
+ en: "Kafka producer configs."
+ zh: "Kafka 生产者参数。"
+ }
+ label {
+ en: "Kafka Producer"
+ zh: "生产者参数"
+ }
+ }
+ bootstrap_hosts {
+ desc {
+ en: "A comma separated list of Kafka host:port endpoints to bootstrap the client."
+ zh: "用逗号分隔的 host:port 主机列表。"
+ }
+ label {
+ en: "Bootstrap Hosts"
+ zh: "主机列表"
+ }
+ }
+ connect_timeout {
+ desc {
+ en: "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
+ zh: "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。"
+ }
+ label {
+ en: "Connect Timeout"
+ zh: "连接超时"
+ }
+ }
+ min_metadata_refresh_interval {
+ desc {
+ en: "Minimum time interval the client has to wait before refreshing Kafka broker and topic metadata. "
+ "Setting too small value may add extra load on Kafka."
+ zh: "刷新 Kafka broker 和 Kafka 主题元数据段最短时间间隔。设置太小可能会增加 Kafka 压力。"
+ }
+ label {
+ en: "Min Metadata Refresh Interval"
+ zh: "元数据刷新最小间隔"
+ }
+ }
+ metadata_request_timeout {
+ desc {
+ en: "Maximum wait time when fetching metadata from Kafka."
+ zh: "刷新元数据时最大等待时长。"
+ }
+ label {
+ en: "Metadata Request Timeout"
+ zh: "元数据请求超时"
+ }
+ }
+ authentication {
+ desc {
+ en: "Authentication configs."
+ zh: "认证参数。"
+ }
+ label {
+ en: "Authentication"
+ zh: "认证"
+ }
+ }
+ socket_opts {
+ desc {
+ en: "Extra socket options."
+ zh: "更多 Socket 参数设置。"
+ }
+ label {
+ en: "Socket Options"
+ zh: "Socket 参数"
+ }
+ }
+ auth_sasl_mechanism {
+ desc {
+ en: "SASL authentication mechanism."
+ zh: "SASL 认证方法名称。"
+ }
+ label {
+ en: "Mechanism"
+ zh: "认证方法"
+ }
+ }
+ auth_sasl_username {
+ desc {
+ en: "SASL authentication username."
+ zh: "SASL 认证的用户名。"
+ }
+ label {
+ en: "Username"
+ zh: "用户名"
+ }
+ }
+ auth_sasl_password {
+ desc {
+ en: "SASL authentication password."
+ zh: "SASL 认证的密码。"
+ }
+ label {
+ en: "Password"
+ zh: "密码"
+ }
+ }
+ auth_kerberos_principal {
+ desc {
+ en: "SASL GSSAPI authentication Kerberos principal. "
+ "For example client_name@MY.KERBEROS.REALM.MYDOMAIN.COM, "
+ "NOTE: The realm in use has to be configured in /etc/krb5.conf in EMQX nodes."
+ zh: "SASL GSSAPI 认证方法的 Kerberos principal,"
+ "例如 client_name@MY.KERBEROS.REALM.MYDOMAIN.COM"
+ "注意:这里使用的 realm 需要配置在 EMQX 服务器的 /etc/krb5.conf 中"
+ }
+ label {
+ en: "Kerberos Principal"
+ zh: "Kerberos Principal"
+ }
+ }
+ auth_kerberos_keytab_file {
+ desc {
+ en: "SASL GSSAPI authentication Kerberos keytab file path. "
+ "NOTE: This file has to be placed in EMQX nodes, and the EMQX service runner user requires read permission."
+ zh: "SASL GSSAPI 认证方法的 Kerberos keytab 文件。"
+ "注意:该文件需要上传到 EMQX 服务器中,且运行 EMQX 服务的系统账户需要有读取权限。"
+ }
+ label {
+ en: "Kerberos keytab file"
+ zh: "Kerberos keytab 文件"
+ }
+ }
+ socket_send_buffer {
+ desc {
+ en: "Fine tune the socket send buffer. The default value is tuned for high throughput."
+ zh: "TCP socket 的发送缓存调优。默认值是针对高吞吐量的一个推荐值。"
+ }
+ label {
+ en: "Socket Send Buffer Size"
+ zh: "Socket 发送缓存大小"
+ }
+ }
+ socket_receive_buffer {
+ desc {
+ en: "Fine tune the socket receive buffer. The default value is tuned for high throughput."
+ zh: "TCP socket 的收包缓存调优。默认值是针对高吞吐量的一个推荐值。"
+ }
+ label {
+ en: "Socket Receive Buffer Size"
+ zh: "Socket 收包缓存大小"
+ }
+ }
+ socket_nodelay {
+ desc {
+ en: "When set to 'true', TCP buffer sent as soon as possible. "
+ "Otherwise, the OS kernel may buffer small TCP packets for a while (40 ms by default)."
+ zh: "设置 ‘true' 让系统内核立即发送。否则当需要发送当内容很少时,可能会有一定延迟(默认 40 毫秒)。"
+ }
+ label {
+ en: "No Delay"
+ zh: "是否延迟发送"
+ }
+ }
+ kafka_topic {
+ desc {
+ en: "Kafka topic name"
+ zh: "Kafka 主题名称"
+ }
+ label {
+ en: "Kafka Topic Name"
+ zh: "Kafka 主题名称"
+ }
+ }
+ kafka_message {
+ desc {
+ en: "Template to render a Kafka message."
+ zh: "用于生成 Kafka 消息的模版。"
+ }
+ label {
+ en: "Kafka Message Template"
+ zh: "Kafka 消息模版"
+ }
+ }
+ kafka_message_key {
+ desc {
+ en: "Template to render Kafka message key. "
+ "If the desired variable for this template is not found in the input data "
+ "NULL is used."
+ zh: "生成 Kafka 消息 Key 的模版。当所需要的输入没有时,会使用 NULL。"
+ }
+ label {
+ en: "Message Key"
+ zh: "消息的 Key"
+ }
+ }
+ kafka_message_value {
+ desc {
+ en: "Template to render Kafka message value. "
+ "If the desired variable for this template is not found in the input data "
+ "NULL is used."
+ zh: "生成 Kafka 消息 Value 的模版。当所需要的输入没有时,会使用 NULL。"
+ }
+ label {
+ en: "Message Value"
+ zh: "消息的 Value"
+ }
+ }
+ kafka_message_timestamp {
+ desc {
+ en: "Which timestamp to use. "
+ "The timestamp is expected to be a millisecond precision Unix epoch "
+ "which can be in string format, e.g. 1661326462115 or "
+ "'1661326462115'. "
+ "When the desired data field for this template is not found, "
+ "or if the found data is not a valid integer, "
+ "the current system timestamp will be used."
+ zh: "生成 Kafka 消息时间戳的模版。"
+ "该时间必需是一个整型数值(可以是字符串格式)例如 1661326462115 "
+ "或 '1661326462115'。"
+ "当所需的输入字段不存在,或不是一个整型时,"
+ "则会使用当前系统时间。"
+ }
+ label {
+ en: "Message Timestamp"
+ zh: "消息的时间戳"
+ }
+ }
+ max_batch_bytes {
+ desc {
+ en: "Maximum bytes to collect in a Kafka message batch. "
+ "Most of the Kafka brokers default to a limit of 1 MB batch size. "
+ "EMQX's default value is less than 1 MB in order to compensate "
+ "Kafka message encoding overheads (especially when each individual message is very small). "
+ "When a single message is over the limit, it is still sent (as a single element batch)."
+ zh: "最大消息批量字节数。"
+ "大多数 Kafka 环境的默认最低值是 1 MB,EMQX 的默认值比 1 MB 更小是因为需要"
+ "补偿 Kafka 消息编码索需要的额外字节(尤其是当每条消息都很小的情况下)。"
+ "当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。"
+ }
+ label {
+ en: "Max Batch Bytes"
+ zh: "最大批量字节数"
+ }
+ }
+ compression {
+ desc {
+ en: "Compression method."
+ zh: "压缩方法。"
+ }
+ label {
+ en: "Compression"
+ zh: "压缩"
+ }
+ }
+ partition_strategy {
+ desc {
+ en: "Partition strategy is to tell the producer how to dispatch messages to Kafka partitions.\n\n"
+ "random: Randomly pick a partition for each message\n"
+ "key_dispatch: Hash Kafka message key to a partition number\n"
+ zh: "设置消息发布时应该如何选择 Kafka 分区。\n\n"
+ "random: 为每个消息随机选择一个分区。\n"
+ "key_dispatch: Hash Kafka message key to a partition number\n"
+ }
+ label {
+ en: "Partition Strategy"
+ zh: "分区选择策略"
+ }
+ }
+ required_acks {
+ desc {
+ en: "Required acknowledgements for Kafka partition leader to wait for its followers "
+ "before it sends back the acknowledgement to EMQX Kafka producer\n\n"
+ "all_isr: Require all in-sync replicas to acknowledge.\n"
+ "leader_only: Require only the partition-leader's acknowledgement.\n"
+ "none: No need for Kafka to acknowledge at all.\n"
+ zh: "设置 Kafka leader 在返回给 EMQX 确认之前需要等待多少个 follower 的确认。\n\n"
+ "all_isr: 需要所有的在线复制者都确认。\n"
+ "leader_only: 仅需要分区 leader 确认。\n"
+ "none: 无需 Kafka 回复任何确认。\n"
+ }
+ label {
+ en: "Required Acks"
+ zh: "Kafka 确认数量"
+ }
+ }
+ partition_count_refresh_interval {
+ desc {
+ en: "The time interval for Kafka producer to discover increased number of partitions.\n"
+ "After the number of partitions is increased in Kafka, EMQX will start taking the \n"
+ "discovered partitions into account when dispatching messages per partition_strategy."
+ zh: "配置 Kafka 刷新分区数量的时间间隔。\n"
+ "EMQX 发现 Kafka 分区数量增加后,会开始按 partition_strategy 配置,把消息发送到新的分区中。"
+ }
+ label {
+ en: "Partition Count Refresh Interval"
+ zh: "分区数量刷新间隔"
+ }
+ }
+ max_inflight {
+ desc {
+ en: "Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. "
+ "Greater value typically means better throughput. However, there can be a risk of message reordering when this "
+ "value is greater than 1."
+ zh: "设置 Kafka 生产者(每个分区一个)在收到 Kafka 的确认前最多发送多少个请求(批量)。"
+ "调大这个值通常可以增加吞吐量,但是,当该值设置大于 1 是存在消息乱序的风险。"
+ }
+ label {
+ en: "Max Inflight"
+ zh: "飞行窗口"
+ }
+ }
+ producer_buffer {
+ desc {
+ en: "Configure producer message buffer.\n\n"
+ "Tell Kafka producer how to buffer messages when EMQX has more messages to send than "
+ "Kafka can keep up, or when Kafka is down.\n\n"
+ zh: "配置消息缓存的相关参数。\n\n"
+ "当 EMQX 需要发送的消息超过 Kafka 处理能力,或者当 Kafka 临时下线时,EMQX 内部会将消息缓存起来。"
+ }
+ label {
+ en: "Message Buffer"
+ zh: "消息缓存"
+ }
+ }
+ buffer_mode {
+ desc {
+ en: "Message buffer mode.\n\n"
+ "memory: Buffer all messages in memory. The messages will be lost in case of EMQX node restart\n"
+ "disc: Buffer all messages on disk. The messages on disk are able to survive EMQX node restart.\n"
+ "hybrid: Buffer message in memory first, when up to certain limit "
+ "(see segment_bytes config for more information), then start offloading "
+ "messages to disk, Like memory mode, the messages will be lost in case of "
+ "EMQX node restart."
+ zh: "消息缓存模式。\n"
+ "memory: 所有的消息都缓存在内存里。如果 EMQX 服务重启,缓存的消息会丢失。\n"
+ "disc: 缓存到磁盘上。EMQX 重启后会继续发送重启前未发送完成的消息。\n"
+ "hybrid: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制"
+ "(配置项 segment_bytes 描述了该限制)后,后续的消息会缓存到磁盘上。"
+ "与 memory 模式一样,如果 EMQX 服务重启,缓存的消息会丢失。"
+ }
+ label {
+ en: "Buffer Mode"
+ zh: "缓存模式"
+ }
+ }
+ buffer_per_partition_limit {
+ desc {
+ en: "Number of bytes allowed to buffer for each Kafka partition. "
+ "When this limit is exceeded, old messages will be dropped in a trade for credits "
+ "for new messages to be buffered."
+ zh: "为每个 Kafka 分区设置的最大缓存字节数。当超过这个上限之后,老的消息会被丢弃,"
+ "为新的消息腾出空间。"
+ }
+ label {
+ en: "Per-partition Buffer Limit"
+ zh: "Kafka 分区缓存上限"
+ }
+ }
+ buffer_segment_bytes {
+ desc {
+ en: "Applicable when buffer mode is set to disk or hybrid.\n"
+ "This value is to specify the size of each on-disk buffer file."
+ zh: "当缓存模式是 disk 或 hybrid 时适用。"
+ "该配置用于指定缓存到磁盘上的文件的大小。"
+ }
+ label {
+ en: "Segment File Bytes"
+ zh: "缓存文件大小"
+ }
+ }
+ buffer_memory_overload_protection {
+ desc {
+ en: "Applicable when buffer mode is set to memory or hybrid.\n"
+ "EMQX will drop old cached messages under high memory pressure. "
+ "The high memory threshold is defined in config sysmon.os.sysmem_high_watermark."
+ zh: "缓存模式是 memory 或 hybrid 时适用。"
+ "当系统处于高内存压力时,从队列中丢弃旧的消息以减缓内存增长。"
+ "内存压力值由配置项 sysmon.os.sysmem_high_watermark 决定。"
+ }
+ label {
+ en: "Memory Overload Protection"
+ zh: "内存过载保护"
+ }
+ }
+ auth_username_password {
+ desc {
+ en: "Username/password based authentication."
+ zh: "基于用户名密码的认证。"
+ }
+ label {
+ en: "Username/password Auth"
+ zh: "用户名密码认证"
+ }
+ }
+ auth_gssapi_kerberos {
+ desc {
+ en: "Use GSSAPI/Kerberos authentication."
+ zh: "使用 GSSAPI/Kerberos 认证。"
+ }
+ label {
+ en: "GSSAPI/Kerberos"
+ zh: "GSSAPI/Kerberos"
+ }
+ }
+}
diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config
index e986d7983..8c79e7274 100644
--- a/lib-ee/emqx_ee_bridge/rebar.config
+++ b/lib-ee/emqx_ee_bridge/rebar.config
@@ -1,5 +1,9 @@
{erl_opts, [debug_info]}.
{deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}}
+ , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}}
+ , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}}
+ , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
+ , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}}
, {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
index a578b7d0d..97c884fe9 100644
--- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
@@ -3,7 +3,8 @@
{registered, []},
{applications, [
kernel,
- stdlib
+ stdlib,
+ emqx_ee_connector
]},
{env, []},
{modules, []},
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
index 47925bb36..96efee066 100644
--- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
@@ -14,6 +14,7 @@
api_schemas(Method) ->
[
+ ref(emqx_ee_bridge_kafka, Method),
ref(emqx_ee_bridge_mysql, Method),
ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
@@ -26,6 +27,7 @@ api_schemas(Method) ->
schema_modules() ->
[
+ emqx_ee_bridge_kafka,
emqx_ee_bridge_hstreamdb,
emqx_ee_bridge_influxdb,
emqx_ee_bridge_mongodb,
@@ -45,6 +47,7 @@ examples(Method) ->
lists:foldl(Fun, #{}, schema_modules()).
resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
+resource_type(kafka) -> emqx_bridge_impl_kafka;
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
resource_type(mongodb_rs) -> emqx_connector_mongo;
resource_type(mongodb_sharded) -> emqx_connector_mongo;
@@ -56,6 +59,11 @@ resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb.
fields(bridges) ->
[
+ {kafka,
+ mk(
+ hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")),
+ #{desc => <<"EMQX Enterprise Config">>}
+ )},
{hstreamdb,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")),
@@ -66,8 +74,9 @@ fields(bridges) ->
hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
#{desc => <<"EMQX Enterprise Config">>}
)}
- ] ++ fields(mongodb) ++ fields(influxdb);
-fields(mongodb) ->
+ ] ++ mongodb_structs() ++ influxdb_structs().
+
+mongodb_structs() ->
[
{Type,
mk(
@@ -75,8 +84,9 @@ fields(mongodb) ->
#{desc => <<"EMQX Enterprise Config">>}
)}
|| Type <- [mongodb_rs, mongodb_sharded, mongodb_single]
- ];
-fields(influxdb) ->
+ ].
+
+influxdb_structs() ->
[
{Protocol,
mk(
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
new file mode 100644
index 000000000..ac5177f6e
--- /dev/null
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
@@ -0,0 +1,273 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_kafka).
+
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%% allow atoms like scram_sha_256 and scram_sha_512
+%% i.e. the _256 part does not start with a-z
+-elvis([
+ {elvis_style, atom_naming_convention, #{
+ regex => "^([a-z][a-z0-9]*_?)([a-z0-9]*_?)*$",
+ enclosed_atoms => ".*"
+ }}
+]).
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+ conn_bridge_examples/1
+]).
+
+-export([
+ namespace/0,
+ roots/0,
+ fields/1,
+ desc/1
+]).
+
+%% -------------------------------------------------------------------------------------------------
+%% api
+
+conn_bridge_examples(Method) ->
+ [
+ #{
+ <<"kafka">> => #{
+ summary => <<"Kafka Bridge">>,
+ value => values(Method)
+ }
+ }
+ ].
+
+values(get) ->
+ maps:merge(values(post), ?METRICS_EXAMPLE);
+values(post) ->
+ #{
+ bootstrap_hosts => <<"localhost:9092">>
+ };
+values(put) ->
+ values(post).
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+
+namespace() -> "bridge_kafka".
+
+roots() -> ["config"].
+
+fields("post") ->
+ [type_field(), name_field() | fields("config")];
+fields("put") ->
+ fields("config");
+fields("get") ->
+ emqx_bridge_schema:metrics_status_fields() ++ fields("post");
+fields("config") ->
+ [
+ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+ {bootstrap_hosts, mk(binary(), #{required => true, desc => ?DESC(bootstrap_hosts)})},
+ {connect_timeout,
+ mk(emqx_schema:duration_ms(), #{
+ default => "5s",
+ desc => ?DESC(connect_timeout)
+ })},
+ {min_metadata_refresh_interval,
+ mk(
+ emqx_schema:duration_ms(),
+ #{
+ default => "3s",
+ desc => ?DESC(min_metadata_refresh_interval)
+ }
+ )},
+ {metadata_request_timeout,
+ mk(emqx_schema:duration_ms(), #{
+ default => "5s",
+ desc => ?DESC(metadata_request_timeout)
+ })},
+ {authentication,
+ mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{
+ default => none, desc => ?DESC("authentication")
+ })},
+ {producer, mk(hoconsc:union([none, ref(producer_opts)]), #{desc => ?DESC(producer_opts)})},
+ %{consumer, mk(hoconsc:union([none, ref(consumer_opts)]), #{desc => ?DESC(consumer_opts)})},
+ {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}
+ ] ++ emqx_connector_schema_lib:ssl_fields();
+fields(auth_username_password) ->
+ [
+ {mechanism,
+ mk(enum([plain, scram_sha_256, scram_sha_512]), #{
+ required => true, desc => ?DESC(auth_sasl_mechanism)
+ })},
+ {username, mk(binary(), #{required => true, desc => ?DESC(auth_sasl_username)})},
+ {password,
+ mk(binary(), #{required => true, sensitive => true, desc => ?DESC(auth_sasl_password)})}
+ ];
+fields(auth_gssapi_kerberos) ->
+ [
+ {kerberos_principal,
+ mk(binary(), #{
+ required => true,
+ desc => ?DESC(auth_kerberos_principal)
+ })},
+ {kerberos_keytab_file,
+ mk(binary(), #{
+ required => true,
+ desc => ?DESC(auth_kerberos_keytab_file)
+ })}
+ ];
+fields(socket_opts) ->
+ [
+ {sndbuf,
+ mk(
+ emqx_schema:bytesize(),
+ #{default => "1024KB", desc => ?DESC(socket_send_buffer)}
+ )},
+ {recbuf,
+ mk(
+ emqx_schema:bytesize(),
+ #{default => "1024KB", desc => ?DESC(socket_receive_buffer)}
+ )},
+ {nodelay,
+ mk(
+ boolean(),
+ #{default => true, desc => ?DESC(socket_nodelay)}
+ )}
+ ];
+fields(producer_opts) ->
+ [
+ {mqtt, mk(ref(producer_mqtt_opts), #{desc => ?DESC(producer_mqtt_opts)})},
+ {kafka,
+ mk(ref(producer_kafka_opts), #{
+ required => true,
+ desc => ?DESC(producer_kafka_opts)
+ })}
+ ];
+fields(producer_mqtt_opts) ->
+ [{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}];
+fields(producer_kafka_opts) ->
+ [
+ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
+ {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
+ {max_batch_bytes,
+ mk(emqx_schema:bytesize(), #{default => "896KB", desc => ?DESC(max_batch_bytes)})},
+ {compression,
+ mk(enum([no_compression, snappy, gzip]), #{
+ default => no_compression, desc => ?DESC(compression)
+ })},
+ {partition_strategy,
+ mk(
+ enum([random, key_dispatch]),
+ #{default => random, desc => ?DESC(partition_strategy)}
+ )},
+ {required_acks,
+ mk(
+ enum([all_isr, leader_only, none]),
+ #{
+ default => all_isr,
+ desc => ?DESC(required_acks)
+ }
+ )},
+ {partition_count_refresh_interval,
+ mk(
+ emqx_schema:duration_s(),
+ #{
+ default => "60s",
+ desc => ?DESC(partition_count_refresh_interval)
+ }
+ )},
+ {max_inflight,
+ mk(
+ pos_integer(),
+ #{
+ default => 10,
+ desc => ?DESC(max_inflight)
+ }
+ )},
+ {buffer,
+ mk(ref(producer_buffer), #{
+ required => false,
+ desc => ?DESC(producer_buffer)
+ })}
+ ];
+fields(kafka_message) ->
+ [
+ {key, mk(string(), #{default => "${clientid}", desc => ?DESC(kafka_message_key)})},
+ {value, mk(string(), #{default => "${payload}", desc => ?DESC(kafka_message_value)})},
+ {timestamp,
+ mk(string(), #{
+ default => "${timestamp}", desc => ?DESC(kafka_message_timestamp)
+ })}
+ ];
+fields(producer_buffer) ->
+ [
+ {mode,
+ mk(
+ enum([memory, disk, hybrid]),
+ #{default => memory, desc => ?DESC(buffer_mode)}
+ )},
+ {per_partition_limit,
+ mk(
+ emqx_schema:bytesize(),
+ #{default => "2GB", desc => ?DESC(buffer_per_partition_limit)}
+ )},
+ {segment_bytes,
+ mk(
+ emqx_schema:bytesize(),
+ #{default => "100MB", desc => ?DESC(buffer_segment_bytes)}
+ )},
+ {memory_overload_protection,
+ mk(boolean(), #{
+ %% different from 4.x
+ default => true,
+ desc => ?DESC(buffer_memory_overload_protection)
+ })}
+ ].
+
+% fields(consumer_opts) ->
+% [
+% {kafka, mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})},
+% {mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})}
+% ];
+% fields(consumer_mqtt_opts) ->
+% [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
+% ];
+
+% fields(consumer_mqtt_opts) ->
+% [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
+% ];
+% fields(consumer_kafka_opts) ->
+% [ {topic, mk(string(), #{desc => ?DESC(consumer_kafka_topic)})}
+% ].
+
+desc("config") ->
+ ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+ ["Configuration for Kafka using `", string:to_upper(Method), "` method."];
+desc(Name) ->
+ lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
+ ?DESC(Name).
+
+struct_names() ->
+ [
+ auth_gssapi_kerberos,
+ auth_username_password,
+ kafka_message,
+ producer_buffer,
+ producer_kafka_opts,
+ producer_mqtt_opts,
+ socket_opts,
+ producer_opts
+ ].
+
+%% -------------------------------------------------------------------------------------------------
+%% internal
+type_field() ->
+ {type, mk(enum([kafka]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+ {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
+
+ref(Name) ->
+ hoconsc:ref(?MODULE, Name).
diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl
new file mode 100644
index 000000000..d1fad4765
--- /dev/null
+++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl
@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% Kafka connection configuration
+-module(emqx_bridge_impl_kafka).
+-behaviour(emqx_resource).
+
+%% callbacks of behaviour emqx_resource
+-export([
+ callback_mode/0,
+ on_start/2,
+ on_stop/2,
+ on_query/3,
+ on_get_status/2,
+ is_buffer_supported/0
+]).
+
+is_buffer_supported() -> true.
+
+callback_mode() -> async_if_possible.
+
+on_start(InstId, Config) ->
+ emqx_bridge_impl_kafka_producer:on_start(InstId, Config).
+
+on_stop(InstId, State) ->
+ emqx_bridge_impl_kafka_producer:on_stop(InstId, State).
+
+on_query(InstId, Msg, State) ->
+ emqx_bridge_impl_kafka_producer:on_query(InstId, Msg, State).
+
+on_get_status(InstId, State) ->
+ emqx_bridge_impl_kafka_producer:on_get_status(InstId, State).
diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl
new file mode 100644
index 000000000..ce82dbe2d
--- /dev/null
+++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl
@@ -0,0 +1,270 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_impl_kafka_producer).
+
+%% callbacks of behaviour emqx_resource
+-export([
+ callback_mode/0,
+ on_start/2,
+ on_stop/2,
+ on_query/3,
+ on_get_status/2
+]).
+
+-export([on_kafka_ack/3]).
+
+-include_lib("emqx/include/logger.hrl").
+
+callback_mode() -> async_if_possible.
+
+%% @doc Config schema is defined in emqx_ee_bridge_kafka.
+on_start(InstId, Config) ->
+ #{
+ bridge_name := BridgeName,
+ bootstrap_hosts := Hosts0,
+ connect_timeout := ConnTimeout,
+ metadata_request_timeout := MetaReqTimeout,
+ min_metadata_refresh_interval := MinMetaRefreshInterval,
+ socket_opts := SocketOpts,
+ authentication := Auth,
+ ssl := SSL
+ } = Config,
+ %% it's a bug if producer config is not found
+ %% the caller should not try to start a producer if
+ %% there is no producer config
+ ProducerConfigWrapper = get_required(producer, Config, no_kafka_producer_config),
+ ProducerConfig = get_required(kafka, ProducerConfigWrapper, no_kafka_producer_parameters),
+ MessageTemplate = get_required(message, ProducerConfig, no_kafka_message_template),
+ Hosts = hosts(Hosts0),
+ ClientId = make_client_id(BridgeName),
+ ClientConfig = #{
+ min_metadata_refresh_interval => MinMetaRefreshInterval,
+ connect_timeout => ConnTimeout,
+ client_id => ClientId,
+ request_timeout => MetaReqTimeout,
+ extra_sock_opts => socket_opts(SocketOpts),
+ sasl => sasl(Auth),
+ ssl => ssl(SSL)
+ },
+ #{
+ topic := KafkaTopic
+ } = ProducerConfig,
+ case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
+ {ok, _} ->
+ ?SLOG(info, #{
+ msg => "kafka_client_started",
+ instance_id => InstId,
+ kafka_hosts => Hosts
+ });
+ {error, Reason} ->
+ ?SLOG(error, #{
+ msg => "failed_to_start_kafka_client",
+ instance_id => InstId,
+ kafka_hosts => Hosts,
+ reason => Reason
+ }),
+ throw(failed_to_start_kafka_client)
+ end,
+ WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig),
+ case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
+ {ok, Producers} ->
+ {ok, #{
+ message_template => compile_message_template(MessageTemplate),
+ client_id => ClientId,
+ producers => Producers
+ }};
+ {error, Reason2} ->
+ ?SLOG(error, #{
+ msg => "failed_to_start_kafka_producer",
+ instance_id => InstId,
+ kafka_hosts => Hosts,
+ kafka_topic => KafkaTopic,
+ reason => Reason2
+ }),
+ throw(failed_to_start_kafka_producer)
+ end.
+
+on_stop(_InstId, #{client_id := ClientID, producers := Producers}) ->
+ with_log_at_error(
+ fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
+ #{
+ msg => "failed_to_delete_kafka_producer",
+ client_id => ClientID
+ }
+ ),
+ with_log_at_error(
+ fun() -> wolff:stop_and_delete_supervised_client(ClientID) end,
+ #{
+ msg => "failed_to_delete_kafka_client",
+ client_id => ClientID
+ }
+ ).
+
+%% @doc The callback API for rule-engine (or bridge without rules)
+%% The input argument `Message' is an enriched format (as a map())
+%% of the original #message{} record.
+%% The enrichment is done by rule-engine or by the data bridge framework.
+%% E.g. the output of rule-engine process chain
+%% or the direct mapping from an MQTT message.
+on_query(_InstId, {send_message, Message}, #{message_template := Template, producers := Producers}) ->
+ KafkaMessage = render_message(Template, Message),
+ %% The retuned information is discarded here.
+ %% If the producer process is down when sending, this function would
+ %% raise an error exception which is to be caught by the caller of this callback
+ {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
+ ok.
+
+compile_message_template(#{
+ key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate
+}) ->
+ #{
+ key => emqx_plugin_libs_rule:preproc_tmpl(KeyTemplate),
+ value => emqx_plugin_libs_rule:preproc_tmpl(ValueTemplate),
+ timestamp => emqx_plugin_libs_rule:preproc_tmpl(TimestampTemplate)
+ }.
+
+render_message(
+ #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
+) ->
+ #{
+ key => render(KeyTemplate, Message),
+ value => render(ValueTemplate, Message),
+ ts => render_timestamp(TimestampTemplate, Message)
+ }.
+
+render(Template, Message) ->
+ emqx_plugin_libs_rule:proc_tmpl(Template, Message).
+
+render_timestamp(Template, Message) ->
+ try
+ binary_to_integer(render(Template, Message))
+ catch
+ _:_ ->
+ erlang:system_time(millisecond)
+ end.
+
+on_kafka_ack(_Partition, _Offset, _Extra) ->
+ %% Do nothing so far.
+ %% Maybe need to bump some counters?
+ ok.
+
+on_get_status(_InstId, _State) ->
+ connected.
+
+%% Parse comma separated host:port list into a [{Host,Port}] list
+hosts(Hosts) when is_binary(Hosts) ->
+ hosts(binary_to_list(Hosts));
+hosts(Hosts) when is_list(Hosts) ->
+ kpro:parse_endpoints(Hosts).
+
+%% Extra socket options, such as sndbuf size etc.
+socket_opts(Opts) when is_map(Opts) ->
+ socket_opts(maps:to_list(Opts));
+socket_opts(Opts) when is_list(Opts) ->
+ socket_opts_loop(Opts, []).
+
+socket_opts_loop([], Acc) ->
+ lists:reverse(Acc);
+socket_opts_loop([{T, Bytes} | Rest], Acc) when
+ T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer
+->
+ Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)],
+ socket_opts_loop(Rest, Acc1);
+socket_opts_loop([Other | Rest], Acc) ->
+ socket_opts_loop(Rest, [Other | Acc]).
+
+%% https://www.erlang.org/doc/man/inet.html
+%% For TCP it is recommended to have val(buffer) >= val(recbuf)
+%% to avoid performance issues because of unnecessary copying.
+adjust_socket_buffer(Bytes, Opts) ->
+ case lists:keytake(buffer, 1, Opts) of
+ false ->
+ [{buffer, Bytes} | Opts];
+ {value, {buffer, Bytes1}, Acc1} ->
+ [{buffer, max(Bytes1, Bytes)} | Acc1]
+ end.
+
+sasl(none) ->
+ undefined;
+sasl(#{mechanism := Mechanism, username := Username, password := Password}) ->
+ {Mechanism, Username, emqx_secret:wrap(Password)};
+sasl(#{
+ kerberos_principal := Principal,
+ kerberos_keytab_file := KeyTabFile
+}) ->
+ {callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
+
+ssl(#{enable := true} = SSL) ->
+ emqx_tls_lib:to_client_opts(SSL);
+ssl(_) ->
+ [].
+
+producers_config(BridgeName, ClientId, Input) ->
+ #{
+ max_batch_bytes := MaxBatchBytes,
+ compression := Compression,
+ partition_strategy := PartitionStrategy,
+ required_acks := RequiredAcks,
+ partition_count_refresh_interval := PCntRefreshInterval,
+ max_inflight := MaxInflight,
+ buffer := #{
+ mode := BufferMode,
+ per_partition_limit := PerPartitionLimit,
+ segment_bytes := SegmentBytes,
+ memory_overload_protection := MemOLP
+ }
+ } = Input,
+
+ {OffloadMode, ReplayqDir} =
+ case BufferMode of
+ memory -> {false, false};
+ disk -> {false, replayq_dir(ClientId)};
+ hybrid -> {true, replayq_dir(ClientId)}
+ end,
+ #{
+ name => make_producer_name(BridgeName),
+ partitioner => PartitionStrategy,
+ partition_count_refresh_interval_seconds => PCntRefreshInterval,
+ replayq_dir => ReplayqDir,
+ replayq_offload_mode => OffloadMode,
+ replayq_max_total_bytes => PerPartitionLimit,
+ replayq_seg_bytes => SegmentBytes,
+ drop_if_highmem => MemOLP,
+ required_acks => RequiredAcks,
+ max_batch_bytes => MaxBatchBytes,
+ max_send_ahead => MaxInflight - 1,
+ compression => Compression
+ }.
+
+replayq_dir(ClientId) ->
+ filename:join([emqx:data_dir(), "kafka", ClientId]).
+
+%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
+make_client_id(BridgeName) when is_atom(BridgeName) ->
+ make_client_id(atom_to_list(BridgeName));
+make_client_id(BridgeName) ->
+ iolist_to_binary([BridgeName, ":", atom_to_list(node())]).
+
+%% Producer name must be an atom which will be used as a ETS table name for
+%% partition worker lookup.
+make_producer_name(BridgeName) when is_atom(BridgeName) ->
+ make_producer_name(atom_to_list(BridgeName));
+make_producer_name(BridgeName) ->
+ list_to_atom("kafka_producer_" ++ BridgeName).
+
+with_log_at_error(Fun, Log) ->
+ try
+ Fun()
+ catch
+ C:E ->
+ ?SLOG(error, Log#{
+ exception => C,
+ reason => E
+ })
+ end.
+
+get_required(Field, Config, Throw) ->
+ Value = maps:get(Field, Config, none),
+ Value =:= none andalso throw(Throw),
+ Value.
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl
new file mode 100644
index 000000000..fb929e692
--- /dev/null
+++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl
@@ -0,0 +1,559 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_impl_kafka_producer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("brod/include/brod.hrl").
+
+-define(PRODUCER, emqx_bridge_impl_kafka).
+
+%%------------------------------------------------------------------------------
+%% Things for REST API tests
+%%------------------------------------------------------------------------------
+
+-import(
+ emqx_common_test_http,
+ [
+ request_api/3,
+ request_api/5,
+ get_http_data/1
+ ]
+).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include("emqx_dashboard.hrl").
+
+-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
+
+-define(HOST, "http://127.0.0.1:18083").
+
+%% -define(API_VERSION, "v5").
+
+-define(BASE_PATH, "/api/v5").
+
+-define(APP_DASHBOARD, emqx_dashboard).
+-define(APP_MANAGEMENT, emqx_management).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+wait_until_kafka_is_up() ->
+ wait_until_kafka_is_up(0).
+
+wait_until_kafka_is_up(300) ->
+ ct:fail("Kafka is not up even though we have waited for a while");
+wait_until_kafka_is_up(Attempts) ->
+ KafkaTopic = "test-topic-one-partition",
+ case resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0) of
+ {ok, _} ->
+ ok;
+ _ ->
+ timer:sleep(1000),
+ wait_until_kafka_is_up(Attempts + 1)
+ end.
+
+init_per_suite(Config) ->
+ %% Need to unload emqx_authz. See emqx_machine_SUITE:init_per_suite for
+ %% more info.
+ application:unload(emqx_authz),
+ emqx_common_test_helpers:start_apps(
+ [emqx_conf, emqx_rule_engine, emqx_bridge, emqx_management, emqx_dashboard],
+ fun set_special_configs/1
+ ),
+ application:set_env(emqx_machine, applications, [
+ emqx_prometheus,
+ emqx_modules,
+ emqx_dashboard,
+ emqx_gateway,
+ emqx_statsd,
+ emqx_resource,
+ emqx_rule_engine,
+ emqx_bridge,
+ emqx_ee_bridge,
+ emqx_plugin_libs,
+ emqx_management,
+ emqx_retainer,
+ emqx_exhook,
+ emqx_authn,
+ emqx_authz,
+ emqx_plugin
+ ]),
+ {ok, _} = application:ensure_all_started(emqx_machine),
+ wait_until_kafka_is_up(),
+ %% Wait until bridges API is up
+ (fun WaitUntilRestApiUp() ->
+ case show(http_get(["bridges"])) of
+ {ok, 200, _Res} ->
+ ok;
+ Val ->
+ ct:pal("REST API for bridges not up. Wait and try again. Response: ~p", [Val]),
+ timer:sleep(1000),
+ WaitUntilRestApiUp()
+ end
+ end)(),
+ Config.
+
+end_per_suite(Config) ->
+ emqx_common_test_helpers:stop_apps([
+ emqx_prometheus,
+ emqx_modules,
+ emqx_dashboard,
+ emqx_gateway,
+ emqx_statsd,
+ emqx_resource,
+ emqx_rule_engine,
+ emqx_bridge,
+ emqx_ee_bridge,
+ emqx_plugin_libs,
+ emqx_management,
+ emqx_retainer,
+ emqx_exhook,
+ emqx_authn,
+ emqx_authz,
+ emqx_plugin,
+ emqx_conf,
+ emqx_bridge,
+ emqx_management,
+ emqx_dashboard,
+ emqx_machine
+ ]),
+ mria:stop(),
+ Config.
+
+set_special_configs(emqx_management) ->
+ Listeners = #{http => #{port => 8081}},
+ Config = #{
+ listeners => Listeners,
+ applications => [#{id => "admin", secret => "public"}]
+ },
+ emqx_config:put([emqx_management], Config),
+ ok;
+set_special_configs(emqx_dashboard) ->
+ emqx_dashboard_api_test_helpers:set_default_config(),
+ ok;
+set_special_configs(_) ->
+ ok.
+%%------------------------------------------------------------------------------
+%% Test cases for all combinations of SSL, no SSL and authentication types
+%%------------------------------------------------------------------------------
+
+t_publish_no_auth(_CtConfig) ->
+ publish_with_and_without_ssl("none").
+
+t_publish_sasl_plain(_CtConfig) ->
+ publish_with_and_without_ssl(valid_sasl_plain_settings()).
+
+t_publish_sasl_scram256(_CtConfig) ->
+ publish_with_and_without_ssl(valid_sasl_scram256_settings()).
+
+t_publish_sasl_scram512(_CtConfig) ->
+ publish_with_and_without_ssl(valid_sasl_scram512_settings()).
+
+t_publish_sasl_kerberos(_CtConfig) ->
+ publish_with_and_without_ssl(valid_sasl_kerberos_settings()).
+
+%%------------------------------------------------------------------------------
+%% Test cases for REST api
+%%------------------------------------------------------------------------------
+
+show(X) ->
+ % erlang:display('______________ SHOW ______________:'),
+ % erlang:display(X),
+ X.
+
+t_kafka_bridge_rest_api_plain_text(_CtConfig) ->
+ kafka_bridge_rest_api_all_auth_methods(false).
+
+t_kafka_bridge_rest_api_ssl(_CtConfig) ->
+ kafka_bridge_rest_api_all_auth_methods(true).
+
+kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
+ NormalHostsString =
+ case UseSSL of
+ true -> kafka_hosts_string_ssl();
+ false -> kafka_hosts_string()
+ end,
+ kafka_bridge_rest_api_helper(#{
+ <<"bootstrap_hosts">> => NormalHostsString,
+ <<"authentication">> => <<"none">>
+ }),
+ SASLHostsString =
+ case UseSSL of
+ true -> kafka_hosts_string_ssl_sasl();
+ false -> kafka_hosts_string_sasl()
+ end,
+ BinifyMap = fun(Map) ->
+ maps:from_list([
+ {erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)}
+ || {K, V} <- maps:to_list(Map)
+ ])
+ end,
+ SSLSettings =
+ case UseSSL of
+ true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())};
+ false -> #{}
+ end,
+ kafka_bridge_rest_api_helper(
+ maps:merge(
+ #{
+ <<"bootstrap_hosts">> => SASLHostsString,
+ <<"authentication">> => BinifyMap(valid_sasl_plain_settings())
+ },
+ SSLSettings
+ )
+ ),
+ kafka_bridge_rest_api_helper(
+ maps:merge(
+ #{
+ <<"bootstrap_hosts">> => SASLHostsString,
+ <<"authentication">> => BinifyMap(valid_sasl_scram256_settings())
+ },
+ SSLSettings
+ )
+ ),
+ kafka_bridge_rest_api_helper(
+ maps:merge(
+ #{
+ <<"bootstrap_hosts">> => SASLHostsString,
+ <<"authentication">> => BinifyMap(valid_sasl_scram512_settings())
+ },
+ SSLSettings
+ )
+ ),
+ kafka_bridge_rest_api_helper(
+ maps:merge(
+ #{
+ <<"bootstrap_hosts">> => SASLHostsString,
+ <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings())
+ },
+ SSLSettings
+ )
+ ),
+ ok.
+
+kafka_bridge_rest_api_helper(Config) ->
+ UrlEscColon = "%3A",
+ BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge",
+ BridgesParts = ["bridges"],
+ BridgesPartsId = ["bridges", BridgeIdUrlEnc],
+ OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end,
+ BridgesPartsOpDisable = OpUrlFun("disable"),
+ BridgesPartsOpEnable = OpUrlFun("enable"),
+ BridgesPartsOpRestart = OpUrlFun("restart"),
+ BridgesPartsOpStop = OpUrlFun("stop"),
+ %% List bridges
+ MyKafkaBridgeExists = fun() ->
+ {ok, _Code, BridgesData} = show(http_get(BridgesParts)),
+ Bridges = show(json(BridgesData)),
+ lists:any(
+ fun
+ (#{<<"name">> := <<"my_kafka_bridge">>}) -> true;
+ (_) -> false
+ end,
+ Bridges
+ )
+ end,
+ %% Delete if my_kafka_bridge exists
+ case MyKafkaBridgeExists() of
+ true ->
+ %% Delete the bridge my_kafka_bridge
+ show(
+ '========================================== DELETE ========================================'
+ ),
+ {ok, 204, <<>>} = show(http_delete(BridgesPartsId));
+ false ->
+ ok
+ end,
+ false = MyKafkaBridgeExists(),
+ %% Create new Kafka bridge
+ CreateBodyTmp = #{
+ <<"type">> => <<"kafka">>,
+ <<"name">> => <<"my_kafka_bridge">>,
+ <<"bootstrap_hosts">> => maps:get(<<"bootstrap_hosts">>, Config),
+ <<"enable">> => true,
+ <<"authentication">> => maps:get(<<"authentication">>, Config),
+ <<"producer">> => #{
+ <<"mqtt">> => #{
+ topic => <<"t/#">>
+ },
+ <<"kafka">> => #{
+ <<"topic">> => <<"test-topic-one-partition">>
+ }
+ }
+ },
+ CreateBody =
+ case maps:is_key(<<"ssl">>, Config) of
+ true -> CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)};
+ false -> CreateBodyTmp
+ end,
+ {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))),
+ %% Check that the new bridge is in the list of bridges
+ true = MyKafkaBridgeExists(),
+ %% Perform operations
+ {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
+ {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
+ {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
+ {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
+ {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
+ {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
+ {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})),
+ %% Cleanup
+ {ok, 204, _} = show(http_delete(BridgesPartsId)),
+ false = MyKafkaBridgeExists(),
+ ok.
+
+%%------------------------------------------------------------------------------
+%% Helper functions
+%%------------------------------------------------------------------------------
+
+publish_with_and_without_ssl(AuthSettings) ->
+ publish_helper(#{
+ auth_settings => AuthSettings,
+ ssl_settings => #{}
+ }),
+ publish_helper(#{
+ auth_settings => AuthSettings,
+ ssl_settings => valid_ssl_settings()
+ }).
+
+publish_helper(#{
+ auth_settings := AuthSettings,
+ ssl_settings := SSLSettings
+}) ->
+ HostsString =
+ case {AuthSettings, SSLSettings} of
+ {"none", Map} when map_size(Map) =:= 0 ->
+ kafka_hosts_string();
+ {"none", Map} when map_size(Map) =/= 0 ->
+ kafka_hosts_string_ssl();
+ {_, Map} when map_size(Map) =:= 0 ->
+ kafka_hosts_string_sasl();
+ {_, _} ->
+ kafka_hosts_string_ssl_sasl()
+ end,
+ Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]),
+ Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
+ InstId = emqx_bridge_resource:resource_id("kafka", Name),
+ KafkaTopic = "test-topic-one-partition",
+ Conf = config(#{
+ "authentication" => AuthSettings,
+ "kafka_hosts_string" => HostsString,
+ "kafka_topic" => KafkaTopic,
+ "instance_id" => InstId,
+ "ssl" => SSLSettings
+ }),
+ %% To make sure we get unique value
+ timer:sleep(1),
+ Time = erlang:monotonic_time(),
+ BinTime = integer_to_binary(Time),
+ Msg = #{
+ clientid => BinTime,
+ payload => <<"payload">>,
+ timestamp => Time
+ },
+ {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
+ ct:pal("base offset before testing ~p", [Offset]),
+ StartRes = ?PRODUCER:on_start(InstId, Conf),
+ {ok, State} = StartRes,
+ OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State),
+ ok = OnQueryRes,
+ {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
+ ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
+ ok = ?PRODUCER:on_stop(InstId, State),
+ ok.
+
+config(Args) ->
+ ConfText = hocon_config(Args),
+ ct:pal("Running tests with conf:\n~s", [ConfText]),
+ {ok, Conf} = hocon:binary(ConfText),
+ #{config := Parsed} = hocon_tconf:check_plain(
+ emqx_ee_bridge_kafka,
+ #{<<"config">> => Conf},
+ #{atom_key => true}
+ ),
+ InstId = maps:get("instance_id", Args),
+ <<"bridge:", BridgeId/binary>> = InstId,
+ Parsed#{bridge_name => erlang:element(2, emqx_bridge_resource:parse_bridge_id(BridgeId))}.
+
+hocon_config(Args) ->
+ AuthConf = maps:get("authentication", Args),
+ AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)),
+ AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
+ SSLConf = maps:get("ssl", Args, #{}),
+ SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)),
+ SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf),
+ Hocon = bbmustache:render(
+ iolist_to_binary(hocon_config_template()),
+ Args#{
+ "authentication" => AuthConfRendered,
+ "ssl" => SSLConfRendered
+ }
+ ),
+ Hocon.
+
+%% erlfmt-ignore
+hocon_config_template() ->
+"""
+bootstrap_hosts = \"{{ kafka_hosts_string }}\"
+enable = true
+authentication = {{{ authentication }}}
+ssl = {{{ ssl }}}
+producer = {
+ mqtt {
+ topic = \"t/#\"
+ }
+ kafka = {
+ topic = \"{{ kafka_topic }}\"
+ }
+}
+""".
+
+%% erlfmt-ignore
+hocon_config_template_authentication("none") ->
+ "none";
+hocon_config_template_authentication(#{"mechanism" := _}) ->
+"""
+{
+ mechanism = {{ mechanism }}
+ password = {{ password }}
+ username = {{ username }}
+}
+""";
+hocon_config_template_authentication(#{"kerberos_principal" := _}) ->
+"""
+{
+ kerberos_principal = \"{{ kerberos_principal }}\"
+ kerberos_keytab_file = \"{{ kerberos_keytab_file }}\"
+}
+""".
+
+%% erlfmt-ignore
+hocon_config_template_ssl(Map) when map_size(Map) =:= 0 ->
+"""
+{
+ enable = false
+}
+""";
+hocon_config_template_ssl(_) ->
+"""
+{
+ enable = true
+ cacertfile = \"{{{cacertfile}}}\"
+ certfile = \"{{{certfile}}}\"
+ keyfile = \"{{{keyfile}}}\"
+}
+""".
+
+kafka_hosts_string() ->
+ "kafka-1.emqx.net:9092,".
+
+kafka_hosts_string_sasl() ->
+ "kafka-1.emqx.net:9093,".
+
+kafka_hosts_string_ssl() ->
+ "kafka-1.emqx.net:9094,".
+
+kafka_hosts_string_ssl_sasl() ->
+ "kafka-1.emqx.net:9095,".
+
+valid_ssl_settings() ->
+ #{
+ "cacertfile" => <<"/var/lib/secret/ca.crt">>,
+ "certfile" => <<"/var/lib/secret/client.crt">>,
+ "keyfile" => <<"/var/lib/secret/client.key">>,
+ "enable" => <<"true">>
+ }.
+
+valid_sasl_plain_settings() ->
+ #{
+ "mechanism" => "plain",
+ "username" => "emqxuser",
+ "password" => "password"
+ }.
+
+valid_sasl_scram256_settings() ->
+ (valid_sasl_plain_settings())#{
+ "mechanism" => "scram_sha_256"
+ }.
+
+valid_sasl_scram512_settings() ->
+ (valid_sasl_plain_settings())#{
+ "mechanism" => "scram_sha_512"
+ }.
+
+valid_sasl_kerberos_settings() ->
+ #{
+ "kerberos_principal" => "rig@KDC.EMQX.NET",
+ "kerberos_keytab_file" => "/var/lib/secret/rig.keytab"
+ }.
+
+kafka_hosts() ->
+ kpro:parse_endpoints(kafka_hosts_string()).
+
+resolve_kafka_offset(Hosts, Topic, Partition) ->
+ brod:resolve_offset(Hosts, Topic, Partition, latest).
+
+%%------------------------------------------------------------------------------
+%% Internal functions rest API helpers
+%%------------------------------------------------------------------------------
+
+bin(X) -> iolist_to_binary(X).
+
+random_num() ->
+ erlang:system_time(nanosecond).
+
+http_get(Parts) ->
+ request_api(get, api_path(Parts), auth_header_()).
+
+http_delete(Parts) ->
+ request_api(delete, api_path(Parts), auth_header_()).
+
+http_post(Parts, Body) ->
+ request_api(post, api_path(Parts), [], auth_header_(), Body).
+
+http_put(Parts, Body) ->
+ request_api(put, api_path(Parts), [], auth_header_(), Body).
+
+request_dashboard(Method, Url, Auth) ->
+ Request = {Url, [Auth]},
+ do_request_dashboard(Method, Request).
+request_dashboard(Method, Url, QueryParams, Auth) ->
+ Request = {Url ++ "?" ++ QueryParams, [Auth]},
+ do_request_dashboard(Method, Request).
+do_request_dashboard(Method, Request) ->
+ ct:pal("Method: ~p, Request: ~p", [Method, Request]),
+ case httpc:request(Method, Request, [], []) of
+ {error, socket_closed_remotely} ->
+ {error, socket_closed_remotely};
+ {ok, {{"HTTP/1.1", Code, _}, _Headers, Return}} when
+ Code >= 200 andalso Code =< 299
+ ->
+ {ok, Return};
+ {ok, {Reason, _, _}} ->
+ {error, Reason}
+ end.
+
+auth_header_() ->
+ auth_header_(<<"admin">>, <<"public">>).
+
+auth_header_(Username, Password) ->
+ {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
+ {"Authorization", "Bearer " ++ binary_to_list(Token)}.
+
+api_path(Parts) ->
+ ?HOST ++ filename:join([?BASE_PATH | Parts]).
+
+json(Data) ->
+ {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]),
+ Jsx.
diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src
index 675a934aa..c1b86d20b 100644
--- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src
+++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src
@@ -5,7 +5,9 @@
kernel,
stdlib,
hstreamdb_erl,
- influxdb
+ influxdb,
+ wolff,
+ brod
]},
{env, []},
{modules, []},
diff --git a/mix.exs b/mix.exs
index 1274eb329..8a764bccd 100644
--- a/mix.exs
+++ b/mix.exs
@@ -44,7 +44,7 @@ defmodule EMQXUmbrella.MixProject do
# we need several overrides here because dependencies specify
# other exact versions, and not ranges.
[
- {:lc, github: "emqx/lc", tag: "0.3.1"},
+ {:lc, github: "emqx/lc", tag: "0.3.2", override: true},
{:redbug, "2.0.7"},
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.0", override: true},
@@ -57,7 +57,7 @@ defmodule EMQXUmbrella.MixProject do
{:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
{:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true},
- {:replayq, "0.3.4", override: true},
+ {:replayq, github: "emqx/replayq", tag: "0.3.4", override: true},
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
{:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.2", override: true},
{:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
@@ -129,7 +129,13 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
- {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true}
+ {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true},
+ {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"},
+ {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true},
+ {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
+ {:brod, github: "kafka4beam/brod", tag: "3.16.4"},
+ {:snappyer, "1.2.8", override: true},
+ {:supervisor3, "1.1.11", override: true}
]
end
diff --git a/rebar.config b/rebar.config
index 4a02104f0..ed13a192b 100644
--- a/rebar.config
+++ b/rebar.config
@@ -44,7 +44,7 @@
{post_hooks,[]}.
{deps,
- [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
+ [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}
, {redbug, "2.0.7"}
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
@@ -59,7 +59,7 @@
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
- , {replayq, "0.3.4"}
+ , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.4"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.2"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}
diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh
index 2c87bb0cf..45d32767c 100755
--- a/scripts/ct/run.sh
+++ b/scripts/ct/run.sh
@@ -10,12 +10,20 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
help() {
echo
echo "-h|--help: To display this usage info"
- echo "--app lib_dir/app_name: Print apps in json"
+ echo "--app lib_dir/app_name: For which app to run start docker-compose, and run common tests"
+ echo "--suites SUITE1,SUITE2: Comma separated SUITE names to run. e.g. apps/emqx/test/emqx_SUITE.erl"
echo "--console: Start EMQX in console mode"
+ echo "--attach: Attach to the Erlang docker container without running any test case"
+ echo "--only-up: Only start the testbed but do not run CT"
+ echo "--keep-up: Keep the testbed running after CT"
}
WHICH_APP='novalue'
CONSOLE='no'
+KEEP_UP='no'
+ONLY_UP='no'
+SUITES=''
+ATTACH='no'
while [ "$#" -gt 0 ]; do
case $1 in
-h|--help)
@@ -26,10 +34,26 @@ while [ "$#" -gt 0 ]; do
WHICH_APP="$2"
shift 2
;;
+ --only-up)
+ ONLY_UP='yes'
+ shift 1
+ ;;
+ --keep-up)
+ KEEP_UP='yes'
+ shift 1
+ ;;
+ --attach)
+ ATTACH='yes'
+ shift 1
+ ;;
--console)
CONSOLE='yes'
shift 1
;;
+ --suites)
+ SUITES="$2"
+ shift 2
+ ;;
*)
echo "unknown option $1"
exit 1
@@ -45,6 +69,16 @@ fi
ERLANG_CONTAINER='erlang24'
DOCKER_CT_ENVS_FILE="${WHICH_APP}/docker-ct"
+case "${WHICH_APP}" in
+ lib-ee*)
+ ## ensure enterprise profile when testing lib-ee applications
+ export PROFILE='emqx-enterprise'
+ ;;
+ *)
+ true
+ ;;
+esac
+
if [ -f "$DOCKER_CT_ENVS_FILE" ]; then
# shellcheck disable=SC2002
CT_DEPS="$(cat "$DOCKER_CT_ENVS_FILE" | xargs)"
@@ -80,6 +114,9 @@ for dep in ${CT_DEPS}; do
FILES+=( '.ci/docker-compose-file/docker-compose-pgsql-tcp.yaml'
'.ci/docker-compose-file/docker-compose-pgsql-tls.yaml' )
;;
+ kafka)
+ FILES+=( '.ci/docker-compose-file/docker-compose-kafka.yaml' )
+ ;;
*)
echo "unknown_ct_dependency $dep"
exit 1
@@ -104,13 +141,23 @@ if [[ -t 1 ]]; then
fi
docker exec -i $TTY "$ERLANG_CONTAINER" bash -c 'git config --global --add safe.directory /emqx'
-if [ "$CONSOLE" = 'yes' ]; then
+if [ "$ONLY_UP" = 'yes' ]; then
+ exit 0
+fi
+
+if [ "$ATTACH" = 'yes' ]; then
+ docker exec -it "$ERLANG_CONTAINER" bash
+elif [ "$CONSOLE" = 'yes' ]; then
docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make run"
else
set +e
- docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct"
+ docker exec -i $TTY -e EMQX_CT_SUITES="$SUITES" "$ERLANG_CONTAINER" bash -c "BUILD_WITHOUT_QUIC=1 make ${WHICH_APP}-ct"
RESULT=$?
- # shellcheck disable=2086 # no quotes for F_OPTIONS
- docker-compose $F_OPTIONS down
- exit $RESULT
+ if [ "$KEEP_UP" = 'yes' ]; then
+ exit $RESULT
+ else
+ # shellcheck disable=2086 # no quotes for F_OPTIONS
+ docker-compose $F_OPTIONS down
+ exit $RESULT
+ fi
fi
diff --git a/scripts/find-suites.sh b/scripts/find-suites.sh
index 4d2fd3bee..e7c1b422e 100755
--- a/scripts/find-suites.sh
+++ b/scripts/find-suites.sh
@@ -8,5 +8,9 @@ set -euo pipefail
# ensure dir
cd -P -- "$(dirname -- "$0")/.."
-TESTDIR="$1/test"
-find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ','
+if [ -z "${EMQX_CT_SUITES:-}" ]; then
+ TESTDIR="$1/test"
+ find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ','
+else
+ echo "${EMQX_CT_SUITES}"
+fi