From b30bcf32bd93d359b89fed1f7ed12c1027bc8ff4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 14 Sep 2023 15:15:21 -0300 Subject: [PATCH] feat(ds): introduce keyspace concept Fixes https://emqx.atlassian.net/browse/EMQX-10579 This introduces the concept of "keyspaces" to our durable storage (DS) implementation, and also refactors some places where "shard" and "keyspace" would be mixed up. We might want to tune the storage options differently for distinct sets of topics, the keyspaces. The keyspace is composed by one or more shards. - Keyspaces are identified simply by binary strings. - DS configuration is scoped by keyspaces instead of shards. - Starting a new DS shard requires definining to which keyspace the shard belongs. --- apps/emqx/src/emqx_persistent_session_ds.erl | 11 ++++-- apps/emqx_durable_storage/src/emqx_ds.erl | 10 +++-- .../emqx_durable_storage/src/emqx_ds_conf.erl | 39 +++++++++++-------- .../src/emqx_ds_message_storage_bitmask.erl | 9 +++-- .../src/emqx_ds_storage_layer.erl | 38 +++++++++++------- .../src/emqx_ds_storage_layer_sup.erl | 14 +++---- .../test/emqx_ds_storage_layer_SUITE.erl | 16 +++++--- .../props/prop_replay_message_storage.erl | 5 ++- 8 files changed, 87 insertions(+), 55 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index aed3ece82..a61bf37f9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -46,6 +46,7 @@ %% FIXME -define(DS_SHARD, <<"local">>). +-define(DEFAULT_KEYSPACE, <<"#">>). -define(WHEN_ENABLED(DO), case is_store_enabled() of @@ -58,9 +59,13 @@ init() -> ?WHEN_ENABLED(begin - ok = emqx_ds:ensure_shard(?DS_SHARD, #{ - dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD]) - }), + ok = emqx_ds:ensure_shard( + ?DS_SHARD, + ?DEFAULT_KEYSPACE, + #{ + dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD]) + } + ), ok = emqx_persistent_session_ds_router:init_tables(), ok end). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 0a61cad43..8e6f5535d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -19,7 +19,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API: --export([ensure_shard/2]). +-export([ensure_shard/3]). %% Messages: -export([message_store/2, message_store/1, message_stats/0]). %% Iterator: @@ -39,6 +39,7 @@ -export([]). -export_type([ + keyspace/0, message_id/0, message_stats/0, message_store_opts/0, @@ -77,6 +78,7 @@ %% Parsed topic: -type topic() :: list(binary()). +-type keyspace() :: binary(). -type shard() :: binary(). %% Timestamp @@ -96,10 +98,10 @@ %% API funcions %%================================================================================ --spec ensure_shard(shard(), emqx_ds_storage_layer:options()) -> +-spec ensure_shard(shard(), keyspace(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}. -ensure_shard(Shard, Options) -> - case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of +ensure_shard(Shard, Keyspace, Options) -> + case emqx_ds_storage_layer_sup:start_shard(Shard, Keyspace, Options) of {ok, _Pid} -> ok; {error, {already_started, _Pid}} -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_conf.erl b/apps/emqx_durable_storage/src/emqx_ds_conf.erl index db8b14b45..5633cdf58 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_conf.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_conf.erl @@ -6,9 +6,9 @@ %% TODO: make a proper HOCON schema and all... %% API: --export([shard_config/1, db_options/0]). +-export([keyspace_config/1, db_options/1]). --export([shard_iteration_options/1]). +-export([iteration_options/1]). -export([default_iteration_options/0]). -type backend_config() :: @@ -23,16 +23,20 @@ -define(APP, emqx_ds). --spec shard_config(emqx_ds:shard()) -> backend_config(). -shard_config(Shard) -> - DefaultShardConfig = application:get_env(?APP, default_shard_config, default_shard_config()), - Shards = application:get_env(?APP, shard_config, #{}), - maps:get(Shard, Shards, DefaultShardConfig). +-spec keyspace_config(emqx_ds:keyspace()) -> backend_config(). +keyspace_config(Keyspace) -> + DefaultKeyspaceConfig = application:get_env( + ?APP, + default_keyspace_config, + default_keyspace_config() + ), + Keyspaces = application:get_env(?APP, keyspace_config, #{}), + maps:get(Keyspace, Keyspaces, DefaultKeyspaceConfig). --spec shard_iteration_options(emqx_ds:shard()) -> +-spec iteration_options(emqx_ds:keyspace()) -> emqx_ds_message_storage_bitmask:iteration_options(). -shard_iteration_options(Shard) -> - case shard_config(Shard) of +iteration_options(Keyspace) -> + case keyspace_config(Keyspace) of {emqx_ds_message_storage_bitmask, Config} -> maps:get(iteration, Config, default_iteration_options()); {_Module, _} -> @@ -41,12 +45,13 @@ shard_iteration_options(Shard) -> -spec default_iteration_options() -> emqx_ds_message_storage_bitmask:iteration_options(). default_iteration_options() -> - {emqx_ds_message_storage_bitmask, Config} = default_shard_config(), + {emqx_ds_message_storage_bitmask, Config} = default_keyspace_config(), maps:get(iteration, Config). --spec default_shard_config() -> backend_config(). -default_shard_config() -> +-spec default_keyspace_config() -> backend_config(). +default_keyspace_config() -> {emqx_ds_message_storage_bitmask, #{ + db_options => [], timestamp_bits => 64, topic_bits_per_level => [8, 8, 8, 32, 16], epoch => 5, @@ -55,6 +60,8 @@ default_shard_config() -> } }}. --spec db_options() -> emqx_ds_storage_layer:db_options(). -db_options() -> - application:get_env(?APP, db_options, []). +-spec db_options(emqx_ds:keyspace()) -> emqx_ds_storage_layer:db_options(). +db_options(Keyspace) -> + DefaultDBOptions = application:get_env(?APP, default_db_options, []), + Keyspaces = application:get_env(?APP, keyspace_config, #{}), + emqx_utils_maps:deep_get([Keyspace, db_options], Keyspaces, DefaultDBOptions). diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index a97b89580..ae292452e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -80,7 +80,7 @@ -behaviour(emqx_ds_storage_layer). %% API: --export([create_new/3, open/5]). +-export([create_new/3, open/6]). -export([make_keymapper/1]). -export([store/5]). @@ -174,6 +174,7 @@ -record(db, { shard :: emqx_ds:shard(), + keyspace :: emqx_ds:keyspace(), handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), @@ -236,16 +237,18 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( emqx_ds:shard(), + emqx_ds:keyspace(), rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), emqx_ds_storage_layer:cf_refs(), schema() ) -> db(). -open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> +open(Shard, Keyspace, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), #db{ shard = Shard, + keyspace = Keyspace, handle = DBHandle, cf = CFHandle, keymapper = Keymapper @@ -289,7 +292,7 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic -spec make_iterator(db(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> - Options = emqx_ds_conf:shard_iteration_options(DB#db.shard), + Options = emqx_ds_conf:iteration_options(DB#db.keyspace), make_iterator(DB, Replay, Options). -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index a16c9b476..f34d508d9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -6,7 +6,7 @@ -behaviour(gen_server). %% API: --export([start_link/2]). +-export([start_link/3]). -export([create_generation/3]). -export([store/5]). @@ -64,6 +64,7 @@ -record(s, { shard :: emqx_ds:shard(), + keyspace :: emqx_ds:keyspace(), db :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle(), cf_generations :: cf_refs() @@ -107,7 +108,14 @@ -callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) -> {_Schema, cf_refs()}. --callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> +-callback open( + emqx_ds:shard(), + emqx_ds:keyspace(), + rocksdb:db_handle(), + gen_id(), + cf_refs(), + _Schema +) -> term(). -callback store( @@ -135,9 +143,10 @@ %% API funcions %%================================================================================ --spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}. -start_link(Shard, Options) -> - gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). +-spec start_link(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> + {ok, pid()}. +start_link(Shard, Keyspace, Options) -> + gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Keyspace, Options}, []). -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) -> {ok, gen_id()} | {error, nonmonotonic}. @@ -249,9 +258,9 @@ foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> %% behaviour callbacks %%================================================================================ -init({Shard, Options}) -> +init({Shard, Keyspace, Options}) -> process_flag(trap_exit, true), - {ok, S0} = open_db(Shard, Options), + {ok, S0} = open_db(Shard, Keyspace, Options), S = ensure_current_generation(S0), ok = populate_metadata(S), {ok, S}. @@ -294,10 +303,10 @@ populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> meta_register_gen(Shard, GenId, Gen). -spec ensure_current_generation(state()) -> state(). -ensure_current_generation(S = #s{shard = Shard, db = DBHandle}) -> +ensure_current_generation(S = #s{keyspace = Keyspace, db = DBHandle}) -> case schema_get_current(DBHandle) of undefined -> - Config = emqx_ds_conf:shard_config(Shard), + Config = emqx_ds_conf:keyspace_config(Keyspace), {ok, _, NS} = create_new_gen(0, Config, S), NS; _GenId -> @@ -333,13 +342,13 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations }, {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. -open_db(Shard, Options) -> +-spec open_db(emqx_ds:shard(), emqx_ds:keyspace(), options()) -> {ok, state()} | {error, _TODO}. +open_db(Shard, Keyspace, Options) -> DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} - | emqx_ds_conf:db_options() + | emqx_ds_conf:db_options(Keyspace) ], _ = filelib:ensure_dir(DBDir), ExistingCFs = @@ -360,6 +369,7 @@ open_db(Shard, Options) -> {CFNames, _} = lists:unzip(ExistingCFs), {ok, #s{ shard = Shard, + keyspace = Keyspace, db = DBHandle, cf_iterator = CFIterator, cf_generations = lists:zip(CFNames, CFRefs) @@ -372,9 +382,9 @@ open_db(Shard, Options) -> open_gen( GenId, Gen = #{module := Mod, data := Data}, - #s{shard = Shard, db = DBHandle, cf_generations = CFs} + #s{shard = Shard, keyspace = Keyspace, db = DBHandle, cf_generations = CFs} ) -> - DB = Mod:open(Shard, DBHandle, GenId, CFs, Data), + DB = Mod:open(Shard, Keyspace, DBHandle, GenId, CFs, Data), Gen#{data := DB}. -spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index 56c8c760a..cf84c905c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -6,7 +6,7 @@ -behaviour(supervisor). %% API: --export([start_link/0, start_shard/2, stop_shard/1]). +-export([start_link/0, start_shard/3, stop_shard/1]). %% behaviour callbacks: -export([init/1]). @@ -25,10 +25,10 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec start_shard(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> supervisor:startchild_ret(). -start_shard(Shard, Options) -> - supervisor:start_child(?SUP, shard_child_spec(Shard, Options)). +start_shard(Shard, Keyspace, Options) -> + supervisor:start_child(?SUP, shard_child_spec(Shard, Keyspace, Options)). -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> @@ -52,12 +52,12 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec shard_child_spec(emqx_ds:shard(), emqx_ds:keyspace(), emqx_ds_storage_layer:options()) -> supervisor:child_spec(). -shard_child_spec(Shard, Options) -> +shard_child_spec(Shard, Keyspace, Options) -> #{ id => Shard, - start => {emqx_ds_storage_layer, start_link, [Shard, Options]}, + start => {emqx_ds_storage_layer, start_link, [Shard, Keyspace, Options]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index c5c227333..e19c83dc1 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("stdlib/include/assert.hrl"). -define(SHARD, shard(?FUNCTION_NAME)). +-define(KEYSPACE, keyspace(?FUNCTION_NAME)). -define(DEFAULT_CONFIG, {emqx_ds_message_storage_bitmask, #{ @@ -33,7 +34,7 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}). + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, ?KEYSPACE, #{}). %% Smoke test of store function t_store(_Config) -> @@ -262,15 +263,18 @@ end_per_suite(_Config) -> ok = application:stop(emqx_durable_storage). init_per_testcase(TC, Config) -> - ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}), + ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), keyspace(TC), #{}), Config. end_per_testcase(TC, _Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). -shard(TC) -> +keyspace(TC) -> list_to_binary(lists:concat([?MODULE, "_", TC])). -set_shard_config(Shard, Config) -> - ok = application:set_env(emqx_ds, shard_config, #{Shard => Config}). +shard(TC) -> + <<(keyspace(TC))/binary, "_shard">>. + +set_keyspace_config(Keyspace, Config) -> + ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}). diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl index 7452906b8..e1981888b 100644 --- a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl @@ -10,7 +10,8 @@ -define(WORK_DIR, ["_build", "test"]). -define(RUN_ID, {?MODULE, testrun_id}). --define(ZONE, ?MODULE). +-define(KEYSPACE, atom_to_binary(?MODULE)). +-define(SHARD, <<(?KEYSPACE)/binary, "_shard">>). -define(GEN_ID, 42). %%-------------------------------------------------------------------- @@ -255,7 +256,7 @@ iterate_shim(Shim, Iteration) -> open_db(Filepath, Options) -> {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options), - DB = emqx_ds_message_storage_bitmask:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema), + DB = emqx_ds_message_storage_bitmask:open(?SHARD, ?KEYSPACE, Handle, ?GEN_ID, CFRefs, Schema), {DB, Handle}. close_db(Handle) ->