From 97b94b88825a0b1f7b4eda0dc7baa58df8cdfa15 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 4 Apr 2023 08:55:43 -0300 Subject: [PATCH] feat: implement schema registry for 5.0 (avro) Part of https://emqx.atlassian.net/browse/EMQX-9251 This ports part of the Schema Registry app from 4.x to 5.0. Here, only support for Avro is added. Subsequent PRs will follow to add support for other formats. --- lib-ee/emqx_ee_schema_registry/README.md | 12 +++++--- .../src/emqx_ee_schema_registry.erl | 21 +++++++++---- .../src/emqx_ee_schema_registry_http_api.erl | 30 +++++++++---------- .../src/emqx_ee_schema_registry_schema.erl | 2 +- 4 files changed, 40 insertions(+), 25 deletions(-) diff --git a/lib-ee/emqx_ee_schema_registry/README.md b/lib-ee/emqx_ee_schema_registry/README.md index 9f477208c..c1c409c7d 100644 --- a/lib-ee/emqx_ee_schema_registry/README.md +++ b/lib-ee/emqx_ee_schema_registry/README.md @@ -52,12 +52,12 @@ WHERE | | [Registry] +------v--------------v------+ REGISTER SCHEMA | | - -------------------> | +--------+ - | | | | + INSTANCE | | +--------+ + -------------------> | | | [Management APIs] | Schema Registry ------ Schema | | | | | - -------------------> | +--------+ - LOAD PARSERS | | + | | +--------+ + | | +----------------------------+ / | \ +---/---+ +---|----+ +---\---+ @@ -67,3 +67,7 @@ WHERE +-------+ +--------+ +-------+ ``` + +- Register schema instance: adds a new instance of a schema of a + certain type. For example, when the user may have several Avro or + Protobuf schemas that they wish to use with different data flows. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl index 436777e9f..3569b246e 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -17,6 +17,7 @@ get_serde/1, add_schema/2, + get_schema/1, delete_schema/1, list_schemas/0 ]). @@ -26,6 +27,7 @@ init/1, handle_call/3, handle_cast/2, + handle_continue/2, terminate/2 ]). @@ -54,6 +56,15 @@ get_serde(SchemaName) -> {ok, serde_to_map(Serde)} end. +-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}. +get_schema(SchemaName) -> + case emqx_config:get([?CONF_KEY_ROOT, schemas, SchemaName], undefined) of + undefined -> + {error, not_found}; + Config -> + {ok, Config} + end. + -spec add_schema(schema_name(), schema()) -> ok | {error, term()}. add_schema(Name, Schema) -> RawSchema = emqx_map_lib:binary_key_map(Schema), @@ -130,9 +141,12 @@ init(_) -> process_flag(trap_exit, true), create_tables(), Schemas = emqx_conf:get([?CONF_KEY_ROOT, schemas], #{}), - async_build_serdes(Schemas), State = #{}, - {ok, State}. + {ok, State, {continue, {build_serdes, Schemas}}}. + +handle_continue({build_serdes, Schemas}, State) -> + do_build_serdes(Schemas), + {noreply, State}. handle_call(_Call, _From, State) -> {reply, {error, unknown_call}, State}. @@ -223,9 +237,6 @@ ensure_serde_absent(Name) -> ok end. -async_build_serdes(Schemas) -> - gen_server:cast(?MODULE, {build_serdes, Schemas}). - async_delete_serdes(Names) -> gen_server:cast(?MODULE, {delete_serdes, Names}). diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_http_api.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_http_api.erl index fca66a0b1..897d29e07 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_http_api.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_http_api.erl @@ -141,44 +141,44 @@ schema("/schema_registry/:name") -> ?OK(Response); '/schema_registry'(post, #{body := Params0 = #{<<"name">> := Name}}) -> Params = maps:without([<<"name">>], Params0), - case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of - undefined -> + case emqx_ee_schema_registry:get_schema(Name) of + {error, not_found} -> case emqx_ee_schema_registry:add_schema(Name, Params) of ok -> - Res = emqx_config:get([?CONF_KEY_ROOT, schemas, Name]), + {ok, Res} = emqx_ee_schema_registry:get_schema(Name), {201, Res#{name => Name}}; {error, Error} -> ?BAD_REQUEST(Error) end; - _ -> + {ok, _} -> ?BAD_REQUEST('ALREADY_EXISTS', <<"Schema already exists">>) end. '/schema_registry/:name'(get, #{bindings := #{name := Name}}) -> - case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of - undefined -> + case emqx_ee_schema_registry:get_schema(Name) of + {error, not_found} -> ?NOT_FOUND(<<"Schema not found">>); - Res -> - ?OK(Res#{name => Name}) + {ok, Schema} -> + ?OK(Schema#{name => Name}) end; '/schema_registry/:name'(put, #{bindings := #{name := Name}, body := Params}) -> - case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of - undefined -> + case emqx_ee_schema_registry:get_schema(Name) of + {error, not_found} -> ?NOT_FOUND(<<"Schema not found">>); - _ -> + {ok, _} -> case emqx_ee_schema_registry:add_schema(Name, Params) of ok -> - Res = emqx_config:get([?CONF_KEY_ROOT, schemas, Name]), + {ok, Res} = emqx_ee_schema_registry:get_schema(Name), ?OK(Res#{name => Name}); {error, Error} -> ?BAD_REQUEST(Error) end end; '/schema_registry/:name'(delete, #{bindings := #{name := Name}}) -> - case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of - undefined -> + case emqx_ee_schema_registry:get_schema(Name) of + {error, not_found} -> ?NOT_FOUND(<<"Schema not found">>); - _ -> + {ok, _} -> case emqx_ee_schema_registry:delete_schema(Name) of ok -> ?NO_CONTENT; diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl index 01177345a..bcdc63166 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl @@ -48,7 +48,7 @@ fields(?CONF_KEY_ROOT) -> ]; fields(avro) -> [ - {type, mk(hoconsc:enum([avro]), #{required => true, desc => ?DESC("schema_type")})}, + {type, mk(avro, #{required => true, desc => ?DESC("schema_type")})}, {source, mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})}, {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}