diff --git a/.ci/docker-compose-file/docker-compose-greptimedb.yaml b/.ci/docker-compose-file/docker-compose-greptimedb.yaml new file mode 100644 index 000000000..8980c946d --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-greptimedb.yaml @@ -0,0 +1,22 @@ +version: '3.9' + +services: + greptimedb: + container_name: greptimedb + hostname: greptimedb + image: greptime/greptimedb:0.3.2 + expose: + - "4000" + - "4001" + # uncomment for local testing + # ports: + # - "4000:4000" + # - "4001:4001" + restart: always + networks: + - emqx_bridge + command: + standalone start + --user-provider=static_user_provider:cmd:greptime_user=greptime_pwd + --http-addr="0.0.0.0:4000" + --rpc-addr="0.0.0.0:4001" diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index 74d2583c9..d648d9d78 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -51,6 +51,9 @@ services: - 15670:5670 # Kinesis - 4566:4566 + # GreptimeDB + - 4000:4000 + - 4001:4001 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index c9590354b..f5df5a853 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -162,6 +162,18 @@ "upstream": "hstreamdb:6570", "enabled": true }, + { + "name": "greptimedb_http", + "listen": "0.0.0.0:4000", + "upstream": "greptimedb:4000", + "enabled": true + }, + { + "name": "greptimedb_grpc", + "listen": "0.0.0.0:4001", + "upstream": "greptimedb:4001", + "enabled": true + }, { "name": "kinesis", "listen": "0.0.0.0:4566", diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 47f1ae4b4..5ee4e2688 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.1.2"}, + {vsn, "5.1.3"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 11d199c9d..fabf4d334 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.23"}, + {vsn, "0.1.24"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d5fc42ade..612481663 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -89,7 +89,8 @@ T == pulsar_producer; T == oracle; T == iotdb; - T == kinesis_producer + T == kinesis_producer; + T == greptimedb ). -define(ROOT_KEY, bridges). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index e4ef94c9e..c23ffb6df 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -49,7 +49,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_oracle, <<"oracle">>, Method), api_ref(emqx_bridge_iotdb, <<"iotdb">>, Method), api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method), - api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer") + api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer"), + api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_grpc_v1") ]. schema_modules() -> @@ -75,7 +76,8 @@ schema_modules() -> emqx_bridge_oracle, emqx_bridge_iotdb, emqx_bridge_rabbitmq, - emqx_bridge_kinesis + emqx_bridge_kinesis, + emqx_bridge_greptimedb ]. examples(Method) -> @@ -121,7 +123,8 @@ resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer; resource_type(oracle) -> emqx_oracle; resource_type(iotdb) -> emqx_bridge_iotdb_impl; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; -resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer. +resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer; +resource_type(greptimedb) -> emqx_bridge_greptimedb_connector. fields(bridges) -> [ @@ -202,7 +205,8 @@ fields(bridges) -> influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++ - kinesis_structs(). + kinesis_structs() ++ + greptimedb_structs(). mongodb_structs() -> [ @@ -287,6 +291,21 @@ influxdb_structs() -> ] ]. +greptimedb_structs() -> + [ + {Protocol, + mk( + hoconsc:map(name, ref(emqx_bridge_greptimedb, Protocol)), + #{ + desc => <<"GreptimeDB Bridge Config">>, + required => false + } + )} + || Protocol <- [ + greptimedb + ] + ]. + redis_structs() -> [ {Type, diff --git a/apps/emqx_bridge_greptimedb/.gitignore b/apps/emqx_bridge_greptimedb/.gitignore new file mode 100644 index 000000000..45f82dfcd --- /dev/null +++ b/apps/emqx_bridge_greptimedb/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin + log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/apps/emqx_bridge_greptimedb/BSL.txt b/apps/emqx_bridge_greptimedb/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_greptimedb/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_greptimedb/README.md b/apps/emqx_bridge_greptimedb/README.md new file mode 100644 index 000000000..b92538c66 --- /dev/null +++ b/apps/emqx_bridge_greptimedb/README.md @@ -0,0 +1,27 @@ +# emqx_bridge_greptimedb +This application houses the GreptimeDB data integration to EMQX. +It provides the means to connect to GreptimeDB and publish messages to it. + +It implements connection management and interaction without the need for a + separate connector app, since it's not used for authentication and authorization + applications. + +## Docs + +For more information about GreptimeDB, please refer to [official + document](https://docs.greptime.com/). + +## Configurations + +Just like the InfluxDB data bridge but have some different parameters. Below are several important parameters: + - `server`: The IPv4 or IPv6 address or the hostname to connect to. + - `dbname`: The GreptimeDB database name. + - `write_syntax`: Like the `write_syntax` in `InfluxDB` conf, it's the conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported. + + +# Contributing - [Mandatory] +Please see our [contributing.md](../../CONTRIBUTING.md). + +# License + +See [BSL](./BSL.txt). diff --git a/apps/emqx_bridge_greptimedb/docker-ct b/apps/emqx_bridge_greptimedb/docker-ct new file mode 100644 index 000000000..1a9647132 --- /dev/null +++ b/apps/emqx_bridge_greptimedb/docker-ct @@ -0,0 +1,2 @@ +toxiproxy +greptimedb diff --git a/apps/emqx_bridge_greptimedb/rebar.config b/apps/emqx_bridge_greptimedb/rebar.config new file mode 100644 index 000000000..f0942f910 --- /dev/null +++ b/apps/emqx_bridge_greptimedb/rebar.config @@ -0,0 +1,12 @@ +{erl_opts, [ + debug_info +]}. + +{deps, [ + {emqx_connector, {path, "../../apps/emqx_connector"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_bridge, {path, "../../apps/emqx_bridge"}}, + {greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.2"}}} +]}. +{plugins, [rebar3_path_deps]}. +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src new file mode 100644 index 000000000..c048a0d0c --- /dev/null +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -0,0 +1,14 @@ +{application, emqx_bridge_greptimedb, [ + {description, "EMQX GreptimeDB Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx_resource, + greptimedb + ]}, + {env, []}, + {modules, []}, + {links, []} +]}. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl new file mode 100644 index 000000000..877e464dd --- /dev/null +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.erl @@ -0,0 +1,299 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_greptimedb). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-type write_syntax() :: list(). +-reflect_type([write_syntax/0]). +-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}). +-export([to_influx_lines/1]). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"greptimedb">> => #{ + summary => <<"Greptimedb HTTP API V2 Bridge">>, + value => values("greptimedb", Method) + } + } + ]. + +values(Protocol, get) -> + values(Protocol, post); +values("greptimedb", post) -> + SupportUint = <<"uint_value=${payload.uint_key}u,">>, + TypeOpts = #{ + bucket => <<"example_bucket">>, + org => <<"examlpe_org">>, + token => <<"example_token">>, + server => <<"127.0.0.1:4001">> + }, + values(common, "greptimedb", SupportUint, TypeOpts); +values(Protocol, put) -> + values(Protocol, post). + +values(common, Protocol, SupportUint, TypeOpts) -> + CommonConfigs = #{ + type => list_to_atom(Protocol), + name => <<"demo">>, + enable => true, + local_topic => <<"local/topic/#">>, + write_syntax => + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", SupportUint/binary, + "bool=${payload.bool}">>, + precision => ms, + resource_opts => #{ + batch_size => 100, + batch_time => <<"20ms">> + }, + server => <<"127.0.0.1:4001">>, + ssl => #{enable => false} + }, + maps:merge(TypeOpts, CommonConfigs). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_greptimedb". + +roots() -> []. + +fields("post_grpc_v1") -> + method_fields(post, greptimedb); +fields("put_grpc_v1") -> + method_fields(put, greptimedb); +fields("get_grpc_v1") -> + method_fields(get, greptimedb); +fields(Type) when + Type == greptimedb +-> + greptimedb_bridge_common_fields() ++ + connector_fields(Type). + +method_fields(post, ConnectorType) -> + greptimedb_bridge_common_fields() ++ + connector_fields(ConnectorType) ++ + type_name_fields(ConnectorType); +method_fields(get, ConnectorType) -> + greptimedb_bridge_common_fields() ++ + connector_fields(ConnectorType) ++ + type_name_fields(ConnectorType) ++ + emqx_bridge_schema:status_fields(); +method_fields(put, ConnectorType) -> + greptimedb_bridge_common_fields() ++ + connector_fields(ConnectorType). + +greptimedb_bridge_common_fields() -> + emqx_bridge_schema:common_bridge_fields() ++ + [ + {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, + {write_syntax, fun write_syntax/1} + ] ++ + emqx_resource_schema:fields("resource_opts"). + +connector_fields(Type) -> + emqx_bridge_greptimedb_connector:fields(Type). + +type_name_fields(Type) -> + [ + {type, mk(Type, #{required => true, desc => ?DESC("desc_type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} + ]. + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Greptimedb using `", string:to_upper(Method), "` method."]; +desc(greptimedb) -> + ?DESC(emqx_bridge_greptimedb_connector, "greptimedb"); +desc(_) -> + undefined. + +write_syntax(type) -> + ?MODULE:write_syntax(); +write_syntax(required) -> + true; +write_syntax(validator) -> + [?NOT_EMPTY("the value of the field 'write_syntax' cannot be empty")]; +write_syntax(converter) -> + fun to_influx_lines/1; +write_syntax(desc) -> + ?DESC("write_syntax"); +write_syntax(format) -> + <<"sql">>; +write_syntax(_) -> + undefined. + +to_influx_lines(RawLines) -> + try + influx_lines(str(RawLines), []) + catch + _:Reason:Stacktrace -> + Msg = lists:flatten( + io_lib:format("Unable to parse Greptimedb line protocol: ~p", [RawLines]) + ), + ?SLOG(error, #{msg => Msg, error_reason => Reason, stacktrace => Stacktrace}), + throw(Msg) + end. + +-define(MEASUREMENT_ESC_CHARS, [$,, $\s]). +-define(TAG_FIELD_KEY_ESC_CHARS, [$,, $=, $\s]). +-define(FIELD_VAL_ESC_CHARS, [$", $\\]). +% Common separator for both tags and fields +-define(SEP, $\s). +-define(MEASUREMENT_TAG_SEP, $,). +-define(KEY_SEP, $=). +-define(VAL_SEP, $,). +-define(NON_EMPTY, [_ | _]). + +influx_lines([] = _RawLines, Acc) -> + ?NON_EMPTY = lists:reverse(Acc); +influx_lines(RawLines, Acc) -> + {Acc1, RawLines1} = influx_line(string:trim(RawLines, leading, "\s\n"), Acc), + influx_lines(RawLines1, Acc1). + +influx_line([], Acc) -> + {Acc, []}; +influx_line(Line, Acc) -> + {?NON_EMPTY = Measurement, Line1} = measurement(Line), + {Tags, Line2} = tags(Line1), + {?NON_EMPTY = Fields, Line3} = influx_fields(Line2), + {Timestamp, Line4} = timestamp(Line3), + { + [ + #{ + measurement => Measurement, + tags => Tags, + fields => Fields, + timestamp => Timestamp + } + | Acc + ], + Line4 + }. + +measurement(Line) -> + unescape(?MEASUREMENT_ESC_CHARS, [?MEASUREMENT_TAG_SEP, ?SEP], Line, []). + +tags([?MEASUREMENT_TAG_SEP | Line]) -> + tags1(Line, []); +tags(Line) -> + {[], Line}. + +%% Empty line is invalid as fields are required after tags, +%% need to break recursion here and fail later on parsing fields +tags1([] = Line, Acc) -> + {lists:reverse(Acc), Line}; +%% Matching non empty Acc treats lines like "m, field=field_val" invalid +tags1([?SEP | _] = Line, ?NON_EMPTY = Acc) -> + {lists:reverse(Acc), Line}; +tags1(Line, Acc) -> + {Tag, Line1} = tag(Line), + tags1(Line1, [Tag | Acc]). + +tag(Line) -> + {?NON_EMPTY = Key, Line1} = key(Line), + {?NON_EMPTY = Val, Line2} = tag_val(Line1), + {{Key, Val}, Line2}. + +tag_val(Line) -> + {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP], Line, []), + {Val, strip_l(Line1, ?VAL_SEP)}. + +influx_fields([?SEP | Line]) -> + fields1(string:trim(Line, leading, "\s"), []). + +%% Timestamp is optional, so fields may be at the very end of the line +fields1([Ch | _] = Line, Acc) when Ch =:= ?SEP; Ch =:= $\n -> + {lists:reverse(Acc), Line}; +fields1([] = Line, Acc) -> + {lists:reverse(Acc), Line}; +fields1(Line, Acc) -> + {Field, Line1} = field(Line), + fields1(Line1, [Field | Acc]). + +field(Line) -> + {?NON_EMPTY = Key, Line1} = key(Line), + {Val, Line2} = field_val(Line1), + {{Key, Val}, Line2}. + +field_val([$" | Line]) -> + {Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []), + %% Quoted val can be empty + {Val, strip_l(Line1, ?VAL_SEP)}; +field_val(Line) -> + %% Unquoted value should not be un-escaped according to Greptimedb protocol, + %% as it can only hold float, integer, uinteger or boolean value. + %% However, as templates are possible, un-escaping is applied here, + %% which also helps to detect some invalid lines, e.g.: "m,tag=1 field= ${timestamp}" + {Val, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?VAL_SEP, ?SEP, $\n], Line, []), + {?NON_EMPTY = Val, strip_l(Line1, ?VAL_SEP)}. + +timestamp([?SEP | Line]) -> + Line1 = string:trim(Line, leading, "\s"), + %% Similarly to unquoted field value, un-escape a timestamp to validate and handle + %% potentially escaped characters in a template + {T, Line2} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?SEP, $\n], Line1, []), + {timestamp1(T), Line2}; +timestamp(Line) -> + {undefined, Line}. + +timestamp1(?NON_EMPTY = Ts) -> Ts; +timestamp1(_Ts) -> undefined. + +%% Common for both tag and field keys +key(Line) -> + {Key, Line1} = unescape(?TAG_FIELD_KEY_ESC_CHARS, [?KEY_SEP], Line, []), + {Key, strip_l(Line1, ?KEY_SEP)}. + +%% Only strip a character between pairs, don't strip it(and let it fail) +%% if the char to be stripped is at the end, e.g.: m,tag=val, field=val +strip_l([Ch, Ch1 | Str], Ch) when Ch1 =/= ?SEP -> + [Ch1 | Str]; +strip_l(Str, _Ch) -> + Str. + +unescape(EscapeChars, SepChars, [$\\, Char | T], Acc) -> + ShouldEscapeBackslash = lists:member($\\, EscapeChars), + Acc1 = + case lists:member(Char, EscapeChars) of + true -> [Char | Acc]; + false when not ShouldEscapeBackslash -> [Char, $\\ | Acc] + end, + unescape(EscapeChars, SepChars, T, Acc1); +unescape(EscapeChars, SepChars, [Char | T] = L, Acc) -> + IsEscapeChar = lists:member(Char, EscapeChars), + case lists:member(Char, SepChars) of + true -> {lists:reverse(Acc), L}; + false when not IsEscapeChar -> unescape(EscapeChars, SepChars, T, [Char | Acc]) + end; +unescape(_EscapeChars, _SepChars, [] = L, Acc) -> + {lists:reverse(Acc), L}. + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl new file mode 100644 index 000000000..4be100594 --- /dev/null +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -0,0 +1,636 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_greptimedb_connector). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +-export([ + roots/0, + namespace/0, + fields/1, + desc/1 +]). + +%% only for test +-ifdef(TEST). +-export([is_unrecoverable_error/1]). +-endif. + +-type ts_precision() :: ns | us | ms | s. + +%% Allocatable resources +-define(greptime_client, greptime_client). + +-define(GREPTIMEDB_DEFAULT_PORT, 4001). + +-define(DEFAULT_DB, <<"public">>). + +-define(GREPTIMEDB_HOST_OPTIONS, #{ + default_port => ?GREPTIMEDB_DEFAULT_PORT +}). + +-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). + +-define(AUTO_RECONNECT_S, 1). + +%% ------------------------------------------------------------------------------------------------- +%% resource callback +callback_mode() -> always_sync. + +on_start(InstId, Config) -> + %% InstID as pool would be handled by greptimedb client + %% so there is no need to allocate pool_name here + %% See: greptimedb:start_client/1 + start_client(InstId, Config). + +on_stop(InstId, _State) -> + case emqx_resource:get_allocated_resources(InstId) of + #{?greptime_client := Client} -> + greptimedb:stop_client(Client); + _ -> + ok + end. + +on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + ?tp( + greptimedb_connector_send_query, + #{points => Points, batch => false, mode => sync} + ), + do_query(InstId, Client, Points); + {error, ErrorPoints} -> + ?tp( + greptimedb_connector_send_query_error, + #{batch => false, mode => sync, error => ErrorPoints} + ), + log_error_points(InstId, ErrorPoints), + {error, ErrorPoints} + end. + +%% Once a Batched Data trans to points failed. +%% This batch query failed +on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) -> + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + ?tp( + greptimedb_connector_send_query, + #{points => Points, batch => true, mode => sync} + ), + do_query(InstId, Client, Points); + {error, Reason} -> + ?tp( + greptimedb_connector_send_query_error, + #{batch => true, mode => sync, error => Reason} + ), + {error, {unrecoverable_error, Reason}} + end. + +on_get_status(_InstId, #{client := Client}) -> + case greptimedb:is_alive(Client) of + true -> + connected; + false -> + disconnected + end. + +%% ------------------------------------------------------------------------------------------------- +%% schema +namespace() -> connector_greptimedb. + +roots() -> + [ + {config, #{ + type => hoconsc:union( + [ + hoconsc:ref(?MODULE, greptimedb) + ] + ) + }} + ]. + +fields(common) -> + [ + {server, server()}, + {precision, + %% The greptimedb only supports these 4 precision + mk(enum([ns, us, ms, s]), #{ + required => false, default => ms, desc => ?DESC("precision") + })} + ]; +fields(greptimedb) -> + fields(common) ++ + [ + {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})}, + {username, mk(binary(), #{desc => ?DESC("username")})}, + {password, + mk(binary(), #{ + desc => ?DESC("password"), + format => <<"password">>, + sensitive => true, + converter => fun emqx_schema:password_converter/2 + })} + ] ++ emqx_connector_schema_lib:ssl_fields(). + +server() -> + Meta = #{ + required => false, + default => <<"127.0.0.1:4001">>, + desc => ?DESC("server"), + converter => fun convert_server/2 + }, + emqx_schema:servers_sc(Meta, ?GREPTIMEDB_HOST_OPTIONS). + +desc(common) -> + ?DESC("common"); +desc(greptimedb) -> + ?DESC("greptimedb"). + +%% ------------------------------------------------------------------------------------------------- +%% internal functions + +start_client(InstId, Config) -> + ClientConfig = client_config(InstId, Config), + ?SLOG(info, #{ + msg => "starting greptimedb connector", + connector => InstId, + config => emqx_utils:redact(Config), + client_config => emqx_utils:redact(ClientConfig) + }), + try do_start_client(InstId, ClientConfig, Config) of + Res = {ok, #{client := Client}} -> + ok = emqx_resource:allocate_resource(InstId, ?greptime_client, Client), + Res; + {error, Reason} -> + {error, Reason} + catch + E:R:S -> + ?tp(greptimedb_connector_start_exception, #{error => {E, R}}), + ?SLOG(warning, #{ + msg => "start greptimedb connector error", + connector => InstId, + error => E, + reason => emqx_utils:redact(R), + stack => emqx_utils:redact(S) + }), + {error, R} + end. + +do_start_client( + InstId, + ClientConfig, + Config = #{write_syntax := Lines} +) -> + Precision = maps:get(precision, Config, ms), + case greptimedb:start_client(ClientConfig) of + {ok, Client} -> + case greptimedb:is_alive(Client, true) of + true -> + State = #{ + client => Client, + dbname => proplists:get_value(dbname, ClientConfig, ?DEFAULT_DB), + write_syntax => to_config(Lines, Precision) + }, + ?SLOG(info, #{ + msg => "starting greptimedb connector success", + connector => InstId, + client => redact_auth(Client), + state => redact_auth(State) + }), + {ok, State}; + {false, Reason} -> + ?tp(greptimedb_connector_start_failed, #{ + error => greptimedb_client_not_alive, reason => Reason + }), + ?SLOG(warning, #{ + msg => "failed_to_start_greptimedb_connector", + connector => InstId, + client => redact_auth(Client), + reason => Reason + }), + %% no leak + _ = greptimedb:stop_client(Client), + {error, greptimedb_client_not_alive} + end; + {error, {already_started, Client0}} -> + ?tp(greptimedb_connector_start_already_started, #{}), + ?SLOG(info, #{ + msg => "restarting greptimedb connector, found already started client", + connector => InstId, + old_client => redact_auth(Client0) + }), + _ = greptimedb:stop_client(Client0), + do_start_client(InstId, ClientConfig, Config); + {error, Reason} -> + ?tp(greptimedb_connector_start_failed, #{error => Reason}), + ?SLOG(warning, #{ + msg => "failed_to_start_greptimedb_connector", + connector => InstId, + reason => Reason + }), + {error, Reason} + end. + +client_config( + InstId, + Config = #{ + server := Server + } +) -> + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?GREPTIMEDB_HOST_OPTIONS), + [ + {endpoints, [{http, str(Host), Port}]}, + {pool_size, erlang:system_info(schedulers)}, + {pool, InstId}, + {pool_type, random}, + {auto_reconnect, ?AUTO_RECONNECT_S}, + {timeunit, maps:get(precision, Config, ms)} + ] ++ protocol_config(Config). + +protocol_config( + #{ + dbname := DbName, + ssl := SSL + } = Config +) -> + [ + {dbname, str(DbName)} + ] ++ auth(Config) ++ + ssl_config(SSL). + +ssl_config(#{enable := false}) -> + [ + {https_enabled, false} + ]; +ssl_config(SSL = #{enable := true}) -> + [ + {https_enabled, true}, + {transport, ssl}, + {transport_opts, emqx_tls_lib:to_client_opts(SSL)} + ]. + +auth(#{username := Username, password := Password}) -> + [ + {auth, {basic, #{username => str(Username), password => str(Password)}}} + ]; +auth(_) -> + []. + +redact_auth(Term) -> + emqx_utils:redact(Term, fun is_auth_key/1). + +is_auth_key(Key) when is_binary(Key) -> + string:equal("authorization", Key, true); +is_auth_key(_) -> + false. + +%% ------------------------------------------------------------------------------------------------- +%% Query +do_query(InstId, Client, Points) -> + case greptimedb:write_batch(Client, Points) of + {ok, #{response := {affected_rows, #{value := Rows}}}} -> + ?SLOG(debug, #{ + msg => "greptimedb write point success", + connector => InstId, + points => Points + }), + {ok, {affected_rows, Rows}}; + {error, {unauth, _, _}} -> + ?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}), + ?SLOG(error, #{ + msg => "greptimedb_authorization_failed", + client => redact_auth(Client), + connector => InstId + }), + {error, {unrecoverable_error, <<"authorization failure">>}}; + {error, Reason} = Err -> + ?tp(greptimedb_connector_do_query_failure, #{error => Reason}), + ?SLOG(error, #{ + msg => "greptimedb write point failed", + connector => InstId, + reason => Reason + }), + case is_unrecoverable_error(Err) of + true -> + {error, {unrecoverable_error, Reason}}; + false -> + {error, {recoverable_error, Reason}} + end + end. + +%% ------------------------------------------------------------------------------------------------- +%% Tags & Fields Config Trans + +to_config(Lines, Precision) -> + to_config(Lines, [], Precision). + +to_config([], Acc, _Precision) -> + lists:reverse(Acc); +to_config([Item0 | Rest], Acc, Precision) -> + Ts0 = maps:get(timestamp, Item0, ?DEFAULT_TIMESTAMP_TMPL), + {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision), + Item = #{ + measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)), + timestamp => Ts, + precision => {FromPrecision, ToPrecision}, + tags => to_kv_config(maps:get(tags, Item0)), + fields => to_kv_config(maps:get(fields, Item0)) + }, + to_config(Rest, [Item | Acc], Precision). + +%% pre-process the timestamp template +%% returns a tuple of three elements: +%% 1. The timestamp template itself. +%% 2. The source timestamp precision (ms if the template ${timestamp} is used). +%% 3. The target timestamp precision (configured for the client). +preproc_tmpl_timestamp(undefined, Precision) -> + %% not configured, we default it to the message timestamp + preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision); +preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) -> + %% a const value is used which is very much unusual, but we have to add a special handling + {Ts, Precision, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) -> + preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision); +preproc_tmpl_timestamp(<> = Ts, Precision) -> + {emqx_placeholder:preproc_tmpl(Ts), ms, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) -> + %% a placehold is in use. e.g. ${payload.my_timestamp} + %% we can only hope it the value will be of the same precision in the configs + {emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}. + +to_kv_config(KVfields) -> + lists:foldl( + fun({K, V}, Acc) -> to_maps_config(K, V, Acc) end, + #{}, + KVfields + ). + +to_maps_config(K, V, Res) -> + NK = emqx_placeholder:preproc_tmpl(bin(K)), + NV = emqx_placeholder:preproc_tmpl(bin(V)), + Res#{NK => NV}. + +%% ------------------------------------------------------------------------------------------------- +%% Tags & Fields Data Trans +parse_batch_data(InstId, BatchData, SyntaxLines) -> + {Points, Errors} = lists:foldl( + fun({send_message, Data}, {ListOfPoints, ErrAccIn}) -> + case data_to_points(Data, SyntaxLines) of + {ok, Points} -> + {[Points | ListOfPoints], ErrAccIn}; + {error, ErrorPoints} -> + log_error_points(InstId, ErrorPoints), + {ListOfPoints, ErrAccIn + 1} + end + end, + {[], 0}, + BatchData + ), + case Errors of + 0 -> + {ok, lists:flatten(Points)}; + _ -> + ?SLOG(error, #{ + msg => io_lib:format("Greptimedb trans point failed, count: ~p", [Errors]), + connector => InstId, + reason => points_trans_failed + }), + {error, points_trans_failed} + end. + +-spec data_to_points(map(), [ + #{ + fields := [{binary(), binary()}], + measurement := binary(), + tags := [{binary(), binary()}], + timestamp := emqx_placeholder:tmpl_token() | integer(), + precision := {From :: ts_precision(), To :: ts_precision()} + } +]) -> {ok, [map()]} | {error, term()}. +data_to_points(Data, SyntaxLines) -> + lines_to_points(Data, SyntaxLines, [], []). + +%% When converting multiple rows data into Greptimedb Line Protocol, they are considered to be strongly correlated. +%% And once a row fails to convert, all of them are considered to have failed. +lines_to_points(_, [], Points, ErrorPoints) -> + case ErrorPoints of + [] -> + {ok, Points}; + _ -> + %% ignore trans succeeded points + {error, ErrorPoints} + end; +lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when + is_list(Ts) +-> + TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of + {ok, TsInt} -> + Item1 = Item#{timestamp => TsInt}, + continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); + {error, BadTs} -> + lines_to_points(Data, Rest, ResultPointsAcc, [ + {error, {bad_timestamp, BadTs}} | ErrorPointsAcc + ]) + end; +lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc) when + is_integer(Ts) +-> + continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc). + +parse_timestamp([TsInt]) when is_integer(TsInt) -> + {ok, TsInt}; +parse_timestamp([TsBin]) -> + try + {ok, binary_to_integer(TsBin)} + catch + _:_ -> + {error, TsBin} + end. + +continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) -> + case line_to_point(Data, Item) of + {_, [#{fields := Fields}]} when map_size(Fields) =:= 0 -> + %% greptimedb client doesn't like empty field maps... + ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc], + lines_to_points(Data, Rest, ResultPointsAcc, ErrorPointsAcc1); + Point -> + lines_to_points(Data, Rest, [Point | ResultPointsAcc], ErrorPointsAcc) + end. + +line_to_point( + Data, + #{ + measurement := Measurement, + tags := Tags, + fields := Fields, + timestamp := Ts, + precision := Precision + } = Item +) -> + {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), + {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), + TableName = emqx_placeholder:proc_tmpl(Measurement, Data), + {TableName, [ + maps:without([precision, measurement], Item#{ + tags => EncodedTags, + fields => EncodedFields, + timestamp => maybe_convert_time_unit(Ts, Precision) + }) + ]}. + +maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) -> + erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)). + +time_unit(s) -> second; +time_unit(ms) -> millisecond; +time_unit(us) -> microsecond; +time_unit(ns) -> nanosecond. + +maps_config_to_data(K, V, {Data, Res}) -> + KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, + VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), + NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions), + case {NK0, NV} of + {[undefined], _} -> + {Data, Res}; + %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] + {_, [undefined | _]} -> + {Data, Res}; + _ -> + NK = list_to_binary(NK0), + {Data, Res#{NK => value_type(NV)}} + end. + +value_type([Int, <<"i">>]) when + is_integer(Int) +-> + greptimedb_values:int64_value(Int); +value_type([UInt, <<"u">>]) when + is_integer(UInt) +-> + greptimedb_values:uint64_value(UInt); +value_type([<<"t">>]) -> + greptimedb_values:boolean_value(true); +value_type([<<"T">>]) -> + greptimedb_values:boolean_value(true); +value_type([true]) -> + greptimedb_values:boolean_value(true); +value_type([<<"TRUE">>]) -> + greptimedb_values:boolean_value(true); +value_type([<<"True">>]) -> + greptimedb_values:boolean_value(true); +value_type([<<"f">>]) -> + greptimedb_values:boolean_value(false); +value_type([<<"F">>]) -> + greptimedb_values:boolean_value(false); +value_type([false]) -> + greptimedb_values:boolean_value(false); +value_type([<<"FALSE">>]) -> + greptimedb_values:boolean_value(false); +value_type([<<"False">>]) -> + greptimedb_values:boolean_value(false); +value_type([Float]) when is_float(Float) -> + Float; +value_type(Val) -> + #{values => #{string_values => Val}, datatype => 'STRING'}. + +key_filter(undefined) -> undefined; +key_filter(Value) -> emqx_utils_conv:bin(Value). + +data_filter(undefined) -> undefined; +data_filter(Int) when is_integer(Int) -> Int; +data_filter(Number) when is_number(Number) -> Number; +data_filter(Bool) when is_boolean(Bool) -> Bool; +data_filter(Data) -> bin(Data). + +bin(Data) -> emqx_utils_conv:bin(Data). + +%% helper funcs +log_error_points(InstId, Errs) -> + lists:foreach( + fun({error, Reason}) -> + ?SLOG(error, #{ + msg => "greptimedb trans point failed", + connector => InstId, + reason => Reason + }) + end, + Errs + ). + +convert_server(<<"http://", Server/binary>>, HoconOpts) -> + convert_server(Server, HoconOpts); +convert_server(<<"https://", Server/binary>>, HoconOpts) -> + convert_server(Server, HoconOpts); +convert_server(Server, HoconOpts) -> + emqx_schema:convert_servers(Server, HoconOpts). + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. + +is_unrecoverable_error({error, {unrecoverable_error, _}}) -> + true; +is_unrecoverable_error(_) -> + false. + +%%=================================================================== +%% eunit tests +%%=================================================================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +is_auth_key_test_() -> + [ + ?_assert(is_auth_key(<<"Authorization">>)), + ?_assertNot(is_auth_key(<<"Something">>)), + ?_assertNot(is_auth_key(89)) + ]. + +%% for coverage +desc_test_() -> + [ + ?_assertMatch( + {desc, _, _}, + desc(common) + ), + ?_assertMatch( + {desc, _, _}, + desc(greptimedb) + ), + ?_assertMatch( + {desc, _, _}, + hocon_schema:field_schema(server(), desc) + ), + ?_assertMatch( + connector_greptimedb, + namespace() + ) + ]. +-endif. diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl new file mode 100644 index 000000000..d4bc5b01e --- /dev/null +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl @@ -0,0 +1,939 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_greptimedb_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, with_batch}, + {group, without_batch} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {with_batch, [ + {group, sync_query} + ]}, + {without_batch, [ + {group, sync_query} + ]}, + {sync_query, [ + {group, grpcv1_tcp} + %% uncomment tls when we are ready + %% {group, grpcv1_tls} + ]}, + {grpcv1_tcp, TCs} + %%{grpcv1_tls, TCs} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + delete_all_bridges(), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_connector_test_helpers:stop_apps([ + emqx_conf, emqx_bridge, emqx_resource, emqx_rule_engine + ]), + _ = application:stop(emqx_connector), + ok. + +init_per_group(GreptimedbType, Config0) when + GreptimedbType =:= grpcv1_tcp; + GreptimedbType =:= grpcv1_tls +-> + #{ + host := GreptimedbHost, + port := GreptimedbPort, + http_port := GreptimedbHttpPort, + use_tls := UseTLS, + proxy_name := ProxyName + } = + case GreptimedbType of + grpcv1_tcp -> + #{ + host => os:getenv("GREPTIMEDB_GRPCV1_TCP_HOST", "toxiproxy"), + port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TCP_PORT", "4001")), + http_port => list_to_integer(os:getenv("GREPTIMEDB_HTTP_PORT", "4000")), + use_tls => false, + proxy_name => "greptimedb_grpc" + }; + grpcv1_tls -> + #{ + host => os:getenv("GREPTIMEDB_GRPCV1_TLS_HOST", "toxiproxy"), + port => list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TLS_PORT", "4001")), + http_port => list_to_integer(os:getenv("GREPTIMEDB_HTTP_PORT", "4000")), + use_tls => true, + proxy_name => "greptimedb_tls" + } + end, + case emqx_common_test_helpers:is_tcp_server_available(GreptimedbHost, GreptimedbHttpPort) of + true -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = start_apps(), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(greptimedb), + emqx_mgmt_api_test_util:init_suite(), + Config = [{use_tls, UseTLS} | Config0], + {Name, ConfigString, GreptimedbConfig} = greptimedb_config( + grpcv1, GreptimedbHost, GreptimedbPort, Config + ), + EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_http">>, + EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin), + {EHttpcTransport, EHttpcTransportOpts} = + case UseTLS of + true -> {tls, [{verify, verify_none}]}; + false -> {tcp, []} + end, + EHttpcPoolOpts = [ + {host, GreptimedbHost}, + {port, GreptimedbHttpPort}, + {pool_size, 1}, + {transport, EHttpcTransport}, + {transport_opts, EHttpcTransportOpts} + ], + {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts), + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {proxy_name, ProxyName}, + {greptimedb_host, GreptimedbHost}, + {greptimedb_port, GreptimedbPort}, + {greptimedb_http_port, GreptimedbHttpPort}, + {greptimedb_type, grpcv1}, + {greptimedb_config, GreptimedbConfig}, + {greptimedb_config_string, ConfigString}, + {ehttpc_pool_name, EHttpcPoolName}, + {greptimedb_name, Name} + | Config + ]; + false -> + {skip, no_greptimedb} + end; +init_per_group(sync_query, Config) -> + [{query_mode, sync} | Config]; +init_per_group(with_batch, Config) -> + [{batch_size, 100} | Config]; +init_per_group(without_batch, Config) -> + [{batch_size, 1} | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when + Group =:= grpcv1_tcp; + Group =:= grpcv1_tls +-> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + EHttpcPoolName = ?config(ehttpc_pool_name, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ehttpc_sup:stop_pool(EHttpcPoolName), + delete_bridge(Config), + _ = application:stop(greptimedb), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_Testcase, Config) -> + delete_all_rules(), + delete_all_bridges(), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ok = snabbkaffe:stop(), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_rules(), + delete_all_bridges(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +start_apps() -> + %% some configs in emqx_conf app are mandatory + %% we want to make sure they are loaded before + %% ekka start in emqx_common_test_helpers:start_apps/1 + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]). + +example_write_syntax() -> + %% N.B.: this single space character is relevant + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", + "uint_value=${payload.uint_key}u," + "float_value=${payload.float_key},", "undef_value=${payload.undef},", + "${undef_key}=\"hard-coded-value\",", "bool=${payload.bool}">>. + +greptimedb_config(grpcv1 = Type, GreptimedbHost, GreptimedbPort, Config) -> + BatchSize = proplists:get_value(batch_size, Config, 100), + QueryMode = proplists:get_value(query_mode, Config, sync), + UseTLS = proplists:get_value(use_tls, Config, false), + Name = atom_to_binary(?MODULE), + WriteSyntax = example_write_syntax(), + ConfigString = + io_lib:format( + "bridges.greptimedb.~s {\n" + " enable = true\n" + " server = \"~p:~b\"\n" + " dbname = public\n" + " username = greptime_user\n" + " password = greptime_pwd\n" + " precision = ns\n" + " write_syntax = \"~s\"\n" + " resource_opts = {\n" + " request_ttl = 1s\n" + " query_mode = ~s\n" + " batch_size = ~b\n" + " }\n" + " ssl {\n" + " enable = ~p\n" + " verify = verify_none\n" + " }\n" + "}\n", + [ + Name, + GreptimedbHost, + GreptimedbPort, + WriteSyntax, + QueryMode, + BatchSize, + UseTLS + ] + ), + {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}. + +parse_and_check(ConfigString, Type, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + TypeBin = greptimedb_type_bin(Type), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, + Config. + +greptimedb_type_bin(grpcv1) -> + <<"greptimedb">>. + +create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> + Type = greptimedb_type_bin(?config(greptimedb_type, Config)), + Name = ?config(greptimedb_name, Config), + GreptimedbConfig0 = ?config(greptimedb_config, Config), + GreptimedbConfig = emqx_utils_maps:deep_merge(GreptimedbConfig0, Overrides), + emqx_bridge:create(Type, Name, GreptimedbConfig). + +delete_bridge(Config) -> + Type = greptimedb_type_bin(?config(greptimedb_type, Config)), + Name = ?config(greptimedb_name, Config), + emqx_bridge:remove(Type, Name). + +delete_all_bridges() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +delete_all_rules() -> + lists:foreach( + fun(#{id := RuleId}) -> + ok = emqx_rule_engine:delete_rule(RuleId) + end, + emqx_rule_engine:get_rules() + ). + +create_rule_and_action_http(Config) -> + create_rule_and_action_http(Config, _Overrides = #{}). + +create_rule_and_action_http(Config, Overrides) -> + GreptimedbName = ?config(greptimedb_name, Config), + Type = greptimedb_type_bin(?config(greptimedb_type, Config)), + BridgeId = emqx_bridge_resource:bridge_id(Type, GreptimedbName), + Params0 = #{ + enable => true, + sql => <<"SELECT * FROM \"t/topic\"">>, + actions => [BridgeId] + }, + Params = emqx_utils_maps:deep_merge(Params0, Overrides), + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + +send_message(Config, Payload) -> + Name = ?config(greptimedb_name, Config), + Type = greptimedb_type_bin(?config(greptimedb_type, Config)), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + Resp = emqx_bridge:send_message(BridgeId, Payload), + Resp. + +query_by_clientid(Topic, ClientId, Config) -> + GreptimedbHost = ?config(greptimedb_host, Config), + GreptimedbPort = ?config(greptimedb_http_port, Config), + EHttpcPoolName = ?config(ehttpc_pool_name, Config), + UseTLS = ?config(use_tls, Config), + Path = <<"/v1/sql?db=public">>, + Scheme = + case UseTLS of + true -> <<"https://">>; + false -> <<"http://">> + end, + URI = iolist_to_binary([ + Scheme, + list_to_binary(GreptimedbHost), + ":", + integer_to_binary(GreptimedbPort), + Path + ]), + Headers = [ + {"Authorization", "Basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q="}, + {"Content-Type", "application/x-www-form-urlencoded"} + ], + Body = <<"sql=select * from \"", Topic/binary, "\" where clientid='", ClientId/binary, "'">>, + {ok, 200, _Headers, RawBody0} = + ehttpc:request( + EHttpcPoolName, + post, + {URI, Headers, Body}, + _Timeout = 10_000, + _Retry = 0 + ), + + case emqx_utils_json:decode(RawBody0, [return_maps]) of + #{ + <<"code">> := 0, + <<"output">> := [ + #{ + <<"records">> := #{ + <<"rows">> := Rows, + <<"schema">> := Schema + } + } + ] + } -> + make_row(Schema, Rows); + #{ + <<"code">> := Code, + <<"error">> := Error + } -> + GreptimedbName = ?config(greptimedb_name, Config), + Type = greptimedb_type_bin(?config(greptimedb_type, Config)), + BridgeId = emqx_bridge_resource:bridge_id(Type, GreptimedbName), + + ?SLOG(error, #{ + msg => io_lib:format("Failed to query: ~p, ~p", [Code, Error]), + connector => BridgeId, + reason => Error + }), + %% TODO(dennis): check the error by code + case binary:match(Error, <<"Table not found">>) of + nomatch -> + {error, Error}; + _ -> + %% Table not found + #{} + end + end. + +make_row(null, _Rows) -> + #{}; +make_row(_Schema, []) -> + #{}; +make_row(#{<<"column_schemas">> := ColumnsSchemas}, [Row]) -> + Columns = lists:map(fun(#{<<"name">> := Name}) -> Name end, ColumnsSchemas), + maps:from_list(lists:zip(Columns, Row)). + +assert_persisted_data(ClientId, Expected, PersistedData) -> + ClientIdIntKey = <>, + maps:foreach( + fun + (int_value, ExpectedValue) -> + ?assertMatch( + ExpectedValue, + maps:get(ClientIdIntKey, PersistedData) + ); + (Key, ExpectedValue) -> + ?assertMatch( + ExpectedValue, + maps:get(atom_to_binary(Key), PersistedData), + #{expected => ExpectedValue} + ) + end, + Expected + ), + ok. + +resource_id(Config) -> + Type = greptimedb_type_bin(?config(greptimedb_type, Config)), + Name = ?config(greptimedb_name, Config), + emqx_bridge_resource:resource_id(Type, Name). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_ok(Config) -> + QueryMode = ?config(query_mode, Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => true, + float_key => 24.5, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) + }, + ?check_trace( + begin + case QueryMode of + sync -> + ?assertMatch({ok, _}, send_message(Config, SentData)) + end, + PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config), + Expected = #{ + bool => true, + int_value => -123, + uint_value => 123, + float_value => 24.5, + payload => emqx_utils_json:encode(Payload) + }, + assert_persisted_data(ClientId, Expected, PersistedData), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(greptimedb_connector_send_query, Trace0), + ?assertMatch([#{points := [_]}], Trace), + [#{points := [Point0]}] = Trace, + {Measurement, [Point]} = Point0, + ct:pal("sent point: ~p", [Point]), + ?assertMatch( + <<_/binary>>, + Measurement + ), + ?assertMatch( + #{ + fields := #{}, + tags := #{}, + timestamp := TS + } when is_integer(TS), + Point + ), + #{fields := Fields} = Point, + ?assert(lists:all(fun is_binary/1, maps:keys(Fields))), + ?assertNot(maps:is_key(<<"undefined">>, Fields)), + ?assertNot(maps:is_key(<<"undef_value">>, Fields)), + ok + end + ), + ok. + +t_start_already_started(Config) -> + Type = greptimedb_type_bin(?config(greptimedb_type, Config)), + Name = ?config(greptimedb_name, Config), + GreptimedbConfigString = ?config(greptimedb_config_string, Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + ResourceId = resource_id(Config), + TypeAtom = binary_to_atom(Type), + NameAtom = binary_to_atom(Name), + {ok, #{bridges := #{TypeAtom := #{NameAtom := GreptimedbConfigMap}}}} = emqx_hocon:check( + emqx_bridge_schema, GreptimedbConfigString + ), + ?check_trace( + emqx_bridge_greptimedb_connector:on_start(ResourceId, GreptimedbConfigMap), + fun(Result, Trace) -> + ?assertMatch({ok, _}, Result), + ?assertMatch([_], ?of_kind(greptimedb_connector_start_already_started, Trace)), + ok + end + ), + ok. + +t_start_ok_timestamp_write_syntax(Config) -> + GreptimedbType = ?config(greptimedb_type, Config), + GreptimedbName = ?config(greptimedb_name, Config), + GreptimedbConfigString0 = ?config(greptimedb_config_string, Config), + GreptimedbTypeCfg = + case GreptimedbType of + grpcv1 -> "greptimedb" + end, + WriteSyntax = + %% N.B.: this single space characters are relevant + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", + "uint_value=${payload.uint_key}u," + "bool=${payload.bool}", " ", "${timestamp}">>, + %% append this to override the config + GreptimedbConfigString1 = + io_lib:format( + "bridges.~s.~s {\n" + " write_syntax = \"~s\"\n" + "}\n", + [GreptimedbTypeCfg, GreptimedbName, WriteSyntax] + ), + GreptimedbConfig1 = parse_and_check( + GreptimedbConfigString0 ++ GreptimedbConfigString1, + GreptimedbType, + GreptimedbName + ), + Config1 = [{greptimedb_config, GreptimedbConfig1} | Config], + ?assertMatch( + {ok, _}, + create_bridge(Config1) + ), + ok. + +t_start_ok_no_subject_tags_write_syntax(Config) -> + GreptimedbType = ?config(greptimedb_type, Config), + GreptimedbName = ?config(greptimedb_name, Config), + GreptimedbConfigString0 = ?config(greptimedb_config_string, Config), + GreptimedbTypeCfg = + case GreptimedbType of + grpcv1 -> "greptimedb" + end, + WriteSyntax = + %% N.B.: this single space characters are relevant + <<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,", + "uint_value=${payload.uint_key}u," + "bool=${payload.bool}", " ", "${timestamp}">>, + %% append this to override the config + GreptimedbConfigString1 = + io_lib:format( + "bridges.~s.~s {\n" + " write_syntax = \"~s\"\n" + "}\n", + [GreptimedbTypeCfg, GreptimedbName, WriteSyntax] + ), + GreptimedbConfig1 = parse_and_check( + GreptimedbConfigString0 ++ GreptimedbConfigString1, + GreptimedbType, + GreptimedbName + ), + Config1 = [{greptimedb_config, GreptimedbConfig1} | Config], + ?assertMatch( + {ok, _}, + create_bridge(Config1) + ), + ok. + +t_const_timestamp(Config) -> + QueryMode = ?config(query_mode, Config), + Const = erlang:system_time(nanosecond), + ConstBin = integer_to_binary(Const), + ?assertMatch( + {ok, _}, + create_bridge( + Config, + #{ + <<"write_syntax">> => + <<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>> + } + ) + ), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{<<"foo">> => 123}, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) + }, + case QueryMode of + sync -> + ?assertMatch({ok, _}, send_message(Config, SentData)) + end, + PersistedData = query_by_clientid(<<"mqtt">>, ClientId, Config), + Expected = #{foo => 123}, + assert_persisted_data(ClientId, Expected, PersistedData), + TimeReturned = maps:get(<<"greptime_timestamp">>, PersistedData), + ?assertEqual(Const, TimeReturned). + +t_boolean_variants(Config) -> + QueryMode = ?config(query_mode, Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + BoolVariants = #{ + true => true, + false => false, + <<"t">> => true, + <<"f">> => false, + <<"T">> => true, + <<"F">> => false, + <<"TRUE">> => true, + <<"FALSE">> => false, + <<"True">> => true, + <<"False">> => false + }, + maps:foreach( + fun(BoolVariant, Translation) -> + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => BoolVariant, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"timestamp">> => erlang:system_time(millisecond), + <<"payload">> => Payload + }, + case QueryMode of + sync -> + ?assertMatch({ok, _}, send_message(Config, SentData)) + end, + case QueryMode of + sync -> ok + end, + PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config), + Expected = #{ + bool => Translation, + int_value => -123, + uint_value => 123, + payload => emqx_utils_json:encode(Payload) + }, + assert_persisted_data(ClientId, Expected, PersistedData), + ok + end, + BoolVariants + ), + ok. + +t_bad_timestamp(Config) -> + GreptimedbType = ?config(greptimedb_type, Config), + GreptimedbName = ?config(greptimedb_name, Config), + QueryMode = ?config(query_mode, Config), + BatchSize = ?config(batch_size, Config), + GreptimedbConfigString0 = ?config(greptimedb_config_string, Config), + GreptimedbTypeCfg = + case GreptimedbType of + grpcv1 -> "greptimedb" + end, + WriteSyntax = + %% N.B.: this single space characters are relevant + <<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,", + "uint_value=${payload.uint_key}u," + "bool=${payload.bool}", " ", "bad_timestamp">>, + %% append this to override the config + GreptimedbConfigString1 = + io_lib:format( + "bridges.~s.~s {\n" + " write_syntax = \"~s\"\n" + "}\n", + [GreptimedbTypeCfg, GreptimedbName, WriteSyntax] + ), + GreptimedbConfig1 = parse_and_check( + GreptimedbConfigString0 ++ GreptimedbConfigString1, + GreptimedbType, + GreptimedbName + ), + Config1 = [{greptimedb_config, GreptimedbConfig1} | Config], + ?assertMatch( + {ok, _}, + create_bridge(Config1) + ), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => false, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"timestamp">> => erlang:system_time(millisecond), + <<"payload">> => Payload + }, + ?check_trace( + ?wait_async_action( + send_message(Config1, SentData), + #{?snk_kind := greptimedb_connector_send_query_error}, + 10_000 + ), + fun(Result, _Trace) -> + ?assertMatch({_, {ok, _}}, Result), + {Return, {ok, _}} = Result, + IsBatch = BatchSize > 1, + case {QueryMode, IsBatch} of + {sync, false} -> + ?assertEqual( + {error, [ + {error, {bad_timestamp, <<"bad_timestamp">>}} + ]}, + Return + ); + {sync, true} -> + ?assertEqual({error, {unrecoverable_error, points_trans_failed}}, Return) + end, + ok + end + ), + ok. + +t_get_status(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + {ok, _} = create_bridge(Config), + ResourceId = resource_id(Config), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) + end), + ok. + +t_create_disconnected(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ?check_trace( + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch({ok, _}, create_bridge(Config)) + end), + fun(Trace) -> + ?assertMatch( + [#{error := greptimedb_client_not_alive, reason := _SomeReason}], + ?of_kind(greptimedb_connector_start_failed, Trace) + ), + ok + end + ), + ok. + +t_start_error(Config) -> + %% simulate client start error + ?check_trace( + emqx_common_test_helpers:with_mock( + greptimedb, + start_client, + fun(_Config) -> {error, some_error} end, + fun() -> + ?wait_async_action( + ?assertMatch({ok, _}, create_bridge(Config)), + #{?snk_kind := greptimedb_connector_start_failed}, + 10_000 + ) + end + ), + fun(Trace) -> + ?assertMatch( + [#{error := some_error}], + ?of_kind(greptimedb_connector_start_failed, Trace) + ), + ok + end + ), + ok. + +t_start_exception(Config) -> + %% simulate client start exception + ?check_trace( + emqx_common_test_helpers:with_mock( + greptimedb, + start_client, + fun(_Config) -> error(boom) end, + fun() -> + ?wait_async_action( + ?assertMatch({ok, _}, create_bridge(Config)), + #{?snk_kind := greptimedb_connector_start_exception}, + 10_000 + ) + end + ), + fun(Trace) -> + ?assertMatch( + [#{error := {error, boom}}], + ?of_kind(greptimedb_connector_start_exception, Trace) + ), + ok + end + ), + ok. + +t_write_failure(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + QueryMode = ?config(query_mode, Config), + {ok, _} = create_bridge(Config), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => true, + float_key => 24.5, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"timestamp">> => erlang:system_time(millisecond), + <<"payload">> => Payload + }, + ?check_trace( + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + case QueryMode of + sync -> + ?wait_async_action( + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, + send_message(Config, SentData) + ), + #{?snk_kind := greptimedb_connector_do_query_failure, action := nack}, + 16_000 + ) + end + end), + fun(Trace) -> + case QueryMode of + sync -> + ?assertMatch( + [#{error := _} | _], + ?of_kind(greptimedb_connector_do_query_failure, Trace) + ) + end, + ok + end + ), + ok. + +t_missing_field(Config) -> + BatchSize = ?config(batch_size, Config), + IsBatch = BatchSize > 1, + {ok, _} = + create_bridge( + Config, + #{ + <<"resource_opts">> => #{<<"worker_pool_size">> => 1}, + <<"write_syntax">> => <<"${clientid} foo=${foo}i">> + } + ), + %% note: we don't select foo here, but we interpolate it in the + %% fields, so it'll become undefined. + {ok, _} = create_rule_and_action_http(Config, #{sql => <<"select * from \"t/topic\"">>}), + ClientId0 = emqx_guid:to_hexstr(emqx_guid:gen()), + ClientId1 = emqx_guid:to_hexstr(emqx_guid:gen()), + %% Message with the field that we "forgot" to select in the rule + Msg0 = emqx_message:make(ClientId0, <<"t/topic">>, emqx_utils_json:encode(#{foo => 123})), + %% Message without any fields + Msg1 = emqx_message:make(ClientId1, <<"t/topic">>, emqx_utils_json:encode(#{})), + ?check_trace( + begin + emqx:publish(Msg0), + emqx:publish(Msg1), + NEvents = 1, + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(NEvents, #{ + ?snk_kind := greptimedb_connector_send_query_error + }), + _Timeout1 = 16_000 + ), + ok + end, + fun(Trace) -> + PersistedData0 = query_by_clientid(ClientId0, ClientId0, Config), + PersistedData1 = query_by_clientid(ClientId1, ClientId1, Config), + case IsBatch of + true -> + ?assertMatch( + [#{error := points_trans_failed} | _], + ?of_kind(greptimedb_connector_send_query_error, Trace) + ); + false -> + ?assertMatch( + [#{error := [{error, no_fields}]} | _], + ?of_kind(greptimedb_connector_send_query_error, Trace) + ) + end, + %% nothing should have been persisted + ?assertEqual(#{}, PersistedData0), + ?assertEqual(#{}, PersistedData1), + ok + end + ), + ok. + +t_authentication_error_on_send_message(Config0) -> + ResourceId = resource_id(Config0), + QueryMode = proplists:get_value(query_mode, Config0, sync), + GreptimedbType = ?config(greptimedb_type, Config0), + GreptimeConfig0 = proplists:get_value(greptimedb_config, Config0), + GreptimeConfig = + case GreptimedbType of + grpcv1 -> GreptimeConfig0#{<<"password">> => <<"wrong_password">>} + end, + Config = lists:keyreplace(greptimedb_config, 1, Config0, {greptimedb_config, GreptimeConfig}), + + % Fake initialization to simulate credential update after bridge was created. + emqx_common_test_helpers:with_mock( + greptimedb, + check_auth, + fun(_) -> + ok + end, + fun() -> + {ok, _} = create_bridge(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ) + end + ), + + % Now back to wrong credentials + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => true, + float_key => 24.5, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"timestamp">> => erlang:system_time(millisecond), + <<"payload">> => Payload + }, + case QueryMode of + sync -> + ?assertMatch( + {error, {unrecoverable_error, <<"authorization failure">>}}, + send_message(Config, SentData) + ) + end, + ok. diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl new file mode 100644 index 000000000..a4acf5b4e --- /dev/null +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl @@ -0,0 +1,155 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_greptimedb_connector_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(GREPTIMEDB_RESOURCE_MOD, emqx_bridge_greptimedb_connector). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +groups() -> + []. + +init_per_suite(Config) -> + GreptimedbTCPHost = os:getenv("GREPTIMEDB_GRPCV1_TCP_HOST", "toxiproxy"), + GreptimedbTCPPort = list_to_integer(os:getenv("GREPTIMEDB_GRPCV1_TCP_PORT", "4001")), + Servers = [{GreptimedbTCPHost, GreptimedbTCPPort}], + case emqx_common_test_helpers:is_all_tcp_servers_available(Servers) of + true -> + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(greptimedb), + [ + {greptimedb_tcp_host, GreptimedbTCPHost}, + {greptimedb_tcp_port, GreptimedbTCPPort} + | Config + ]; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_greptimedb); + _ -> + {skip, no_greptimedb} + end + end. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector), + _ = application:stop(greptimedb), + ok. + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(_, _Config) -> + ok. + +% %%------------------------------------------------------------------------------ +% %% Testcases +% %%------------------------------------------------------------------------------ + +t_lifecycle(Config) -> + Host = ?config(greptimedb_tcp_host, Config), + Port = ?config(greptimedb_tcp_port, Config), + perform_lifecycle_check( + <<"emqx_bridge_greptimedb_connector_SUITE">>, + greptimedb_config(Host, Port) + ). + +perform_lifecycle_check(PoolName, InitialConfig) -> + {ok, #{config := CheckedConfig}} = + emqx_resource:check_config(?GREPTIMEDB_RESOURCE_MOD, InitialConfig), + % We need to add a write_syntax to the config since the connector + % expects this + FullConfig = CheckedConfig#{write_syntax => greptimedb_write_syntax()}, + {ok, #{ + state := #{client := #{pool := ReturnedPoolName}} = State, + status := InitialStatus + }} = emqx_resource:create_local( + PoolName, + ?CONNECTOR_RESOURCE_GROUP, + ?GREPTIMEDB_RESOURCE_MOD, + FullConfig, + #{} + ), + ?assertEqual(InitialStatus, connected), + % Instance should match the state and status of the just started resource + {ok, ?CONNECTOR_RESOURCE_GROUP, #{ + state := State, + status := InitialStatus + }} = + emqx_resource:get_instance(PoolName), + ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + % % Perform query as further check that the resource is working as expected + ?assertMatch({ok, _}, emqx_resource:query(PoolName, test_query())), + ?assertEqual(ok, emqx_resource:stop(PoolName)), + % Resource will be listed still, but state will be changed and healthcheck will fail + % as the worker no longer exists. + {ok, ?CONNECTOR_RESOURCE_GROUP, #{ + state := State, + status := StoppedStatus + }} = + emqx_resource:get_instance(PoolName), + ?assertEqual(stopped, StoppedStatus), + ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)), + % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + % Can call stop/1 again on an already stopped instance + ?assertEqual(ok, emqx_resource:stop(PoolName)), + % Make sure it can be restarted and the healthchecks and queries work properly + ?assertEqual(ok, emqx_resource:restart(PoolName)), + % async restart, need to wait resource + timer:sleep(500), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = + emqx_resource:get_instance(PoolName), + ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + ?assertMatch({ok, _}, emqx_resource:query(PoolName, test_query())), + % Stop and remove the resource in one go. + ?assertEqual(ok, emqx_resource:remove_local(PoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + % Should not even be able to get the resource data out of ets now unlike just stopping. + ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). + +% %%------------------------------------------------------------------------------ +% %% Helpers +% %%------------------------------------------------------------------------------ + +greptimedb_config(Host, Port) -> + Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])), + ResourceConfig = #{ + <<"dbname">> => <<"public">>, + <<"server">> => Server, + <<"username">> => <<"greptime_user">>, + <<"password">> => <<"greptime_pwd">> + }, + #{<<"config">> => ResourceConfig}. + +greptimedb_write_syntax() -> + [ + #{ + measurement => "${topic}", + tags => [{"clientid", "${clientid}"}], + fields => [{"payload", "${payload}"}], + timestamp => undefined + } + ]. + +test_query() -> + {send_message, #{ + <<"clientid">> => <<"something">>, + <<"payload">> => #{bool => true}, + <<"topic">> => <<"connector_test">>, + <<"timestamp">> => 1678220316257 + }}. diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl new file mode 100644 index 000000000..a07ccd92d --- /dev/null +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_tests.erl @@ -0,0 +1,348 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_greptimedb_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(INVALID_LINES, [ + " ", + " \n", + " \n\n\n ", + "\n", + " \n\n \n \n", + "measurement", + "measurement ", + "measurement,tag", + "measurement field", + "measurement,tag field", + "measurement,tag field ${timestamp}", + "measurement,tag=", + "measurement,tag=tag1", + "measurement,tag =", + "measurement field=", + "measurement field= ", + "measurement field = ", + "measurement, tag = field = ", + "measurement, tag = field = ", + "measurement, tag = tag_val field = field_val", + "measurement, tag = tag_val field = field_val ${timestamp}", + "measurement,= = ${timestamp}", + "measurement,t=a, f=a, ${timestamp}", + "measurement,t=a,t1=b, f=a,f1=b, ${timestamp}", + "measurement,t=a,t1=b, f=a,f1=b,", + "measurement,t=a, t1=b, f=a,f1=b,", + "measurement,t=a,,t1=b, f=a,f1=b,", + "measurement,t=a,,t1=b f=a,,f1=b", + "measurement,t=a,,t1=b f=a,f1=b ${timestamp}", + "measurement, f=a,f1=b", + "measurement, f=a,f1=b ${timestamp}", + "measurement,, f=a,f1=b ${timestamp}", + "measurement,, f=a,f1=b", + "measurement,, f=a,f1=b,, ${timestamp}", + "measurement f=a,f1=b,, ${timestamp}", + "measurement,t=a f=a,f1=b,, ${timestamp}", + "measurement,t=a f=a,f1=b,, ", + "measurement,t=a f=a,f1=b,,", + "measurement, t=a f=a,f1=b", + "measurement,t=a f=a, f1=b", + "measurement,t=a f=a, f1=b ${timestamp}", + "measurement, t=a f=a, f1=b ${timestamp}", + "measurement,t= a f=a,f1=b ${timestamp}", + "measurement,t= a f=a,f1 =b ${timestamp}", + "measurement, t = a f = a,f1 = b ${timestamp}", + "measurement,t=a f=a,f1=b \n ${timestamp}", + "measurement,t=a \n f=a,f1=b \n ${timestamp}", + "measurement,t=a \n f=a,f1=b \n ", + "\n measurement,t=a \n f=a,f1=b \n ${timestamp}", + "\n measurement,t=a \n f=a,f1=b \n", + %% not escaped backslash in a quoted field value is invalid + "measurement,tag=1 field=\"val\\1\"" +]). + +-define(VALID_LINE_PARSED_PAIRS, [ + {"m1,tag=tag1 field=field1 ${timestamp1}", #{ + measurement => "m1", + tags => [{"tag", "tag1"}], + fields => [{"field", "field1"}], + timestamp => "${timestamp1}" + }}, + {"m2,tag=tag2 field=field2", #{ + measurement => "m2", + tags => [{"tag", "tag2"}], + fields => [{"field", "field2"}], + timestamp => undefined + }}, + {"m3 field=field3 ${timestamp3}", #{ + measurement => "m3", + tags => [], + fields => [{"field", "field3"}], + timestamp => "${timestamp3}" + }}, + {"m4 field=field4", #{ + measurement => "m4", + tags => [], + fields => [{"field", "field4"}], + timestamp => undefined + }}, + {"m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5}", + #{ + measurement => "m5", + tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}], + fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}], + timestamp => "${timestamp5}" + }}, + {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b", #{ + measurement => "m6", + tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], + fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + timestamp => undefined + }}, + {"m7,tag=tag7,tag_a=\"tag7a\",tag_b=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\"", + #{ + measurement => "m7", + tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}], + fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}], + timestamp => undefined + }}, + {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}", + #{ + measurement => "m8", + tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], + fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}], + timestamp => "${timestamp8}" + }}, + {"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}", + #{ + measurement => "m9", + tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], + fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], + timestamp => "${timestamp9}" + }}, + {"m10 field=\"\" ${timestamp10}", #{ + measurement => "m10", + tags => [], + fields => [{"field", ""}], + timestamp => "${timestamp10}" + }} +]). + +-define(VALID_LINE_EXTRA_SPACES_PARSED_PAIRS, [ + {"\n m1,tag=tag1 field=field1 ${timestamp1} \n", #{ + measurement => "m1", + tags => [{"tag", "tag1"}], + fields => [{"field", "field1"}], + timestamp => "${timestamp1}" + }}, + {" m2,tag=tag2 field=field2 ", #{ + measurement => "m2", + tags => [{"tag", "tag2"}], + fields => [{"field", "field2"}], + timestamp => undefined + }}, + {" m3 field=field3 ${timestamp3} ", #{ + measurement => "m3", + tags => [], + fields => [{"field", "field3"}], + timestamp => "${timestamp3}" + }}, + {" \n m4 field=field4\n ", #{ + measurement => "m4", + tags => [], + fields => [{"field", "field4"}], + timestamp => undefined + }}, + {" \n m5,tag=tag5,tag_a=tag5a,tag_b=tag5b field=field5,field_a=field5a,field_b=field5b ${timestamp5} \n", + #{ + measurement => "m5", + tags => [{"tag", "tag5"}, {"tag_a", "tag5a"}, {"tag_b", "tag5b"}], + fields => [{"field", "field5"}, {"field_a", "field5a"}, {"field_b", "field5b"}], + timestamp => "${timestamp5}" + }}, + {" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=field6,field_a=field6a,field_b=field6b\n ", #{ + measurement => "m6", + tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], + fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + timestamp => undefined + }} +]). + +-define(VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS, [ + {"m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1}", #{ + measurement => "m =1,", + tags => [{",tag =", "=tag 1,"}], + fields => [{",fie ld ", " field,1"}], + timestamp => "${timestamp1}" + }}, + {"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{ + measurement => "m2", + tags => [{"tag", "tag2"}], + fields => [{"field", "field \"2\",\n"}], + timestamp => undefined + }}, + {"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{ + measurement => "m 3", + tags => [], + fields => [{"field", "field3"}], + timestamp => "${payload.timestamp 3}" + }}, + {"m4 field=\"\\\"field\\\\4\\\"\"", #{ + measurement => "m4", + tags => [], + fields => [{"field", "\"field\\4\""}], + timestamp => undefined + }}, + { + "m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5," + "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}", + #{ + measurement => "m5,mA", + tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}], + fields => [ + {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"} + ], + timestamp => "${timestamp5}" + } + }, + {"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"", + #{ + measurement => "m6", + tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], + fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + timestamp => undefined + }}, + { + "\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\"," + "field_a=field7a,field_b=\"field7b\\\\\n\"", + #{ + measurement => " m7 ", + tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}], + fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}], + timestamp => undefined + } + }, + { + "m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a," + "field_b=\"\\\"field\\\" = 8b\" ${timestamp8}", + #{ + measurement => "m8", + tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}], + fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}], + timestamp => "${timestamp8}" + } + }, + {"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}", + #{ + measurement => "m\\9", + tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}], + fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}], + timestamp => "${timestamp9}" + }}, + {"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{ + measurement => "m,10", + tags => [], + %% backslash should not be un-escaped in tag key + fields => [{"\"field\\\\\"", ""}], + timestamp => "${timestamp10}" + }} +]). + +-define(VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS, [ + {" \n m\\ =1\\,,\\,tag\\ \\==\\=tag\\ 1\\, \\,fie\\ ld\\ =\\ field\\,1 ${timestamp1} ", #{ + measurement => "m =1,", + tags => [{",tag =", "=tag 1,"}], + fields => [{",fie ld ", " field,1"}], + timestamp => "${timestamp1}" + }}, + {" m2,tag=tag2 field=\"field \\\"2\\\",\n\" ", #{ + measurement => "m2", + tags => [{"tag", "tag2"}], + fields => [{"field", "field \"2\",\n"}], + timestamp => undefined + }}, + {" m\\ 3 field=\"field3\" ${payload.timestamp\\ 3} ", #{ + measurement => "m 3", + tags => [], + fields => [{"field", "field3"}], + timestamp => "${payload.timestamp 3}" + }}, + {" m4 field=\"\\\"field\\\\4\\\"\" ", #{ + measurement => "m4", + tags => [], + fields => [{"field", "\"field\\4\""}], + timestamp => undefined + }}, + { + " m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5," + "field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5} ", + #{ + measurement => "m5,mA", + tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}], + fields => [ + {" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"} + ], + timestamp => "${timestamp5}" + } + }, + {" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\" ", + #{ + measurement => "m6", + tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}], + fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}], + timestamp => undefined + }} +]). + +invalid_write_syntax_line_test_() -> + [?_assertThrow(_, to_influx_lines(L)) || L <- ?INVALID_LINES]. + +invalid_write_syntax_multiline_test_() -> + LinesList = [ + join("\n", ?INVALID_LINES), + join("\n\n\n", ?INVALID_LINES), + join("\n\n", lists:reverse(?INVALID_LINES)) + ], + [?_assertThrow(_, to_influx_lines(Lines)) || Lines <- LinesList]. + +valid_write_syntax_test_() -> + test_pairs(?VALID_LINE_PARSED_PAIRS). + +valid_write_syntax_with_extra_spaces_test_() -> + test_pairs(?VALID_LINE_EXTRA_SPACES_PARSED_PAIRS). + +valid_write_syntax_escaped_chars_test_() -> + test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_PAIRS). + +valid_write_syntax_escaped_chars_with_extra_spaces_test_() -> + test_pairs(?VALID_LINE_PARSED_ESCAPED_CHARS_EXTRA_SPACES_PAIRS). + +test_pairs(PairsList) -> + {Lines, AllExpected} = lists:unzip(PairsList), + JoinedLines = join("\n", Lines), + JoinedLines1 = join("\n\n\n", Lines), + JoinedLines2 = join("\n\n", lists:reverse(Lines)), + SingleLineTests = + [ + ?_assertEqual([Expected], to_influx_lines(Line)) + || {Line, Expected} <- PairsList + ], + JoinedLinesTests = + [ + ?_assertEqual(AllExpected, to_influx_lines(JoinedLines)), + ?_assertEqual(AllExpected, to_influx_lines(JoinedLines1)), + ?_assertEqual(lists:reverse(AllExpected), to_influx_lines(JoinedLines2)) + ], + SingleLineTests ++ JoinedLinesTests. + +join(Sep, LinesList) -> + lists:flatten(lists:join(Sep, LinesList)). + +to_influx_lines(RawLines) -> + OldLevel = emqx_logger:get_primary_log_level(), + try + %% mute error logs from this call + emqx_logger:set_primary_log_level(none), + emqx_bridge_greptimedb:to_influx_lines(RawLines) + after + emqx_logger:set_primary_log_level(OldLevel) + end. diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index 500a47d8f..92f6b4bbd 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -85,6 +85,7 @@ emqx_bridge_opents, emqx_bridge_clickhouse, emqx_bridge_dynamo, + emqx_bridge_greptimedb, emqx_bridge_hstreamdb, emqx_bridge_influxdb, emqx_bridge_iotdb, diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index e81d4b53f..9a9dedc28 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.2.8"}, + {vsn, "0.2.9"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/changes/ee/feat-10647.en.md b/changes/ee/feat-10647.en.md new file mode 100644 index 000000000..b42ef1f94 --- /dev/null +++ b/changes/ee/feat-10647.en.md @@ -0,0 +1 @@ +Add enterprise data bridge for [GreptimeDB](https://github.com/GreptimeTeam/greptimedb). diff --git a/mix.exs b/mix.exs index c6b685893..21a238f22 100644 --- a/mix.exs +++ b/mix.exs @@ -171,6 +171,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_bridge_cassandra, :emqx_bridge_opents, :emqx_bridge_dynamo, + :emqx_bridge_greptimedb, :emqx_bridge_hstreamdb, :emqx_bridge_influxdb, :emqx_bridge_iotdb, @@ -208,6 +209,7 @@ defmodule EMQXUmbrella.MixProject do {:crc32cer, "0.1.8", override: true}, {:supervisor3, "1.1.12", override: true}, {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true}, + {:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.2", override: true}, # The following two are dependencies of rabbit_common. They are needed here to # make mix not complain about conflicting versions {:thoas, github: "emqx/thoas", tag: "v1.0.0", override: true}, diff --git a/rel/i18n/emqx_bridge_greptimedb.hocon b/rel/i18n/emqx_bridge_greptimedb.hocon new file mode 100644 index 000000000..939ed48d3 --- /dev/null +++ b/rel/i18n/emqx_bridge_greptimedb.hocon @@ -0,0 +1,50 @@ +emqx_bridge_greptimedb { + +config_enable.desc: +"""Enable or disable this bridge.""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +desc_config.desc: +"""Configuration for an GreptimeDB bridge.""" + +desc_config.label: +"""GreptimeDB Bridge Configuration""" + +desc_name.desc: +"""Bridge name.""" + +desc_name.label: +"""Bridge Name""" + +desc_type.desc: +"""The Bridge Type.""" + +desc_type.label: +"""Bridge Type""" + +local_topic.desc: +"""The MQTT topic filter to be forwarded to the GreptimeDB. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded.""" + +local_topic.label: +"""Local Topic""" + +write_syntax.desc: +"""Conf of GreptimeDB gRPC protocol to write data points.The write syntax is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported, which is the same as InfluxDB line protocol. +See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) and +[GreptimeDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+TLDR:
+``` +[,=[,=]] =[,=] [] +``` +Please note that a placeholder for an integer value must be annotated with a suffix `i`. For example `${payload.int_value}i`.""" + +write_syntax.label: +"""Write Syntax""" + +} diff --git a/rel/i18n/emqx_bridge_greptimedb_connector.hocon b/rel/i18n/emqx_bridge_greptimedb_connector.hocon new file mode 100644 index 000000000..9cb10951f --- /dev/null +++ b/rel/i18n/emqx_bridge_greptimedb_connector.hocon @@ -0,0 +1,47 @@ +emqx_bridge_greptimedb_connector { + +dbname.desc: +"""GreptimeDB database.""" + +dbname.label: +"""Database""" + +greptimedb.desc: +"""GreptimeDB's protocol. Support GreptimeDB v1.8 and before.""" + +greptimedb.label: +"""HTTP API Protocol""" + +password.desc: +"""GreptimeDB password.""" + +password.label: +"""Password""" + +precision.desc: +"""GreptimeDB time precision.""" + +precision.label: +"""Time Precision""" + +protocol.desc: +"""GreptimeDB's protocol. gRPC API.""" + +protocol.label: +"""Protocol""" + +server.desc: +"""The IPv4 or IPv6 address or the hostname to connect to.
+A host entry has the following form: `Host[:Port]`.
+The GreptimeDB default port 8086 is used if `[:Port]` is not specified.""" + +server.label: +"""Server Host""" + +username.desc: +"""GreptimeDB username.""" + +username.label: +"""Username""" + +} diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 785d4065d..578b9c4de 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -222,6 +222,9 @@ for dep in ${CT_DEPS}; do kinesis) FILES+=( '.ci/docker-compose-file/docker-compose-kinesis.yaml' ) ;; + greptimedb) + FILES+=( '.ci/docker-compose-file/docker-compose-greptimedb.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 953b0b762..b515a0010 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -29,6 +29,7 @@ EPMD ERL ETS FIXME +GreptimeDB GCM HMAC HOCON