From 552ba6b949f9c7daed5326ee2f70909258ec537a Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 7 Dec 2023 14:47:20 +0000 Subject: [PATCH 1/5] Presence server This adds a simple presence server on each rabbit node that sends heartbeats to all other configred rabbbit nodes and maintains a local view of which peers are present. This can be used to get the list of present nodes for functions where it is more important to return a result than to be 100% correct (which is very hard anyway). remove logs fix maybe maybe --- deps/rabbit/app.bzl | 3 + deps/rabbit/src/rabbit.erl | 7 ++ deps/rabbit/src/rabbit_presence.erl | 102 ++++++++++++++++++++++++++++ moduleindex.yaml | 1 + 4 files changed, 113 insertions(+) create mode 100644 deps/rabbit/src/rabbit_presence.erl diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index b8c2e83ccca8..231e44c2b1f9 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -196,6 +196,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_prelaunch_feature_flags.erl", "src/rabbit_prelaunch_logging.erl", "src/rabbit_prequeue.erl", + "src/rabbit_presence.erl", "src/rabbit_priority_queue.erl", "src/rabbit_process.erl", "src/rabbit_queue_consumers.erl", @@ -461,6 +462,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_prelaunch_feature_flags.erl", "src/rabbit_prelaunch_logging.erl", "src/rabbit_prequeue.erl", + "src/rabbit_presence.erl", "src/rabbit_priority_queue.erl", "src/rabbit_process.erl", "src/rabbit_queue_consumers.erl", @@ -745,6 +747,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_prelaunch_feature_flags.erl", "src/rabbit_prelaunch_logging.erl", "src/rabbit_prequeue.erl", + "src/rabbit_presence.erl", "src/rabbit_priority_queue.erl", "src/rabbit_process.erl", "src/rabbit_queue_consumers.erl", diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index c5d561e2bdf7..d4048d999df3 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -229,6 +229,13 @@ {requires, [core_initialized, recovery]}, {enables, routing_ready}]}). +-rabbit_boot_step({rabbit_presence, + [{description, "rabbit node presence server"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_presence]}}, + {requires, [core_initialized, recovery]}, + {enables, routing_ready}]}). + -rabbit_boot_step({rabbit_looking_glass, [{description, "Looking Glass tracer and profiler"}, {mfa, {rabbit_looking_glass, boot, []}}, diff --git a/deps/rabbit/src/rabbit_presence.erl b/deps/rabbit/src/rabbit_presence.erl new file mode 100644 index 000000000000..916a7d19fa4e --- /dev/null +++ b/deps/rabbit/src/rabbit_presence.erl @@ -0,0 +1,102 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_presence). + +-behaviour(gen_server). + +-export([list_present/0]). +-export([start_link/0]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). +-define(INTERVAL, 1000). + +-record(?MODULE, {tbl :: ets:table(), + nodes = [] :: [node()]}). + +%%---------------------------------------------------------------------------- +%% A presence server that heartbeats all configured servers with the goal of +%% providing a very quickly accessible idea of node availability without +%% having to use rabbit_nodes:all_running/1 which can block for a long time. +%%---------------------------------------------------------------------------- + +-spec list_present() -> [node()]. +list_present() -> + case whereis(?MODULE) of + undefined -> + %% TODO: change return type to ok | error? + exit(presence_server_not_running); + _ -> + Cutoff = erlang:system_time(millisecond) - 5000, + [N || {N, SeenMs} <- ets:tab2list(?MODULE), + %% if it hasn't been seen since the cutoff + SeenMs > Cutoff, + %% if not in nodes() it is also considered not present + lists:member(N, nodes())] + end. + +-spec start_link() -> rabbit_types:ok_pid_or_error(). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + process_flag(trap_exit, true), + Ref = ets:new(?MODULE, [set, named_table, public]), + Nodes = rabbit_nodes:list_members(), + beat_all(Nodes), + erlang:send_after(?INTERVAL, self(), beat), + {ok, #?MODULE{tbl = Ref, + nodes = Nodes}}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(beat, #?MODULE{tbl = _Tbl, + nodes = Nodes} = State) -> + _ = erlang:send_after(?INTERVAL, self(), beat), + _ = beat_all(Nodes), + {noreply, State}; +handle_info({hb, Node}, #?MODULE{tbl = Tbl, + nodes = _Nodes} = State) -> + ets:insert(Tbl, {Node, erlang:system_time(millisecond)}), + {noreply, State}; +handle_info({terminate, Node}, #?MODULE{tbl = Tbl, + nodes = _Nodes} = State) -> + ets:delete(Tbl, Node), + {noreply, State}; +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, #?MODULE{nodes = Nodes}) -> + %% only send terminate if reason is `shutdown`? + _ = send_terminate(Nodes), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%% INTERNAL + +beat_all(Nodes) -> + [send(N, {hb, node()}) || N <- Nodes, N =/= node()]. + +send_terminate(Nodes) -> + [send(N, {terminate, node()}) || N <- Nodes, N =/= node()]. + +send(Node, Msg) -> + erlang:send({?SERVER, Node}, Msg, [noconnect, nosuspend]). diff --git a/moduleindex.yaml b/moduleindex.yaml index b350663ad98e..2f9b36a640f4 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -803,6 +803,7 @@ rabbit: - rabbit_prelaunch_feature_flags - rabbit_prelaunch_logging - rabbit_prequeue +- rabbit_presence - rabbit_priority_queue - rabbit_process - rabbit_queue_consumers From 6174cf954a1b508f7282dd081642ec3f2b5d6e68 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 10 Jan 2024 09:58:53 +0000 Subject: [PATCH 2/5] fixes --- deps/rabbit/src/rabbit_presence.erl | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/deps/rabbit/src/rabbit_presence.erl b/deps/rabbit/src/rabbit_presence.erl index 916a7d19fa4e..78e5b983a123 100644 --- a/deps/rabbit/src/rabbit_presence.erl +++ b/deps/rabbit/src/rabbit_presence.erl @@ -21,6 +21,7 @@ -define(SERVER, ?MODULE). -define(INTERVAL, 1000). +-define(CUTOFF, ?INTERVAL * 3). -record(?MODULE, {tbl :: ets:table(), nodes = [] :: [node()]}). @@ -38,7 +39,7 @@ list_present() -> %% TODO: change return type to ok | error? exit(presence_server_not_running); _ -> - Cutoff = erlang:system_time(millisecond) - 5000, + Cutoff = erlang:system_time(millisecond) - ?CUTOFF, [N || {N, SeenMs} <- ets:tab2list(?MODULE), %% if it hasn't been seen since the cutoff SeenMs > Cutoff, @@ -52,10 +53,10 @@ start_link() -> init([]) -> process_flag(trap_exit, true), - Ref = ets:new(?MODULE, [set, named_table, public]), + Ref = ets:new(?MODULE, [set, named_table, protected]), + _ = erlang:send_after(?INTERVAL, self(), beat), Nodes = rabbit_nodes:list_members(), - beat_all(Nodes), - erlang:send_after(?INTERVAL, self(), beat), + _ = beat_all(Nodes), {ok, #?MODULE{tbl = Ref, nodes = Nodes}}. @@ -69,27 +70,33 @@ handle_info(beat, #?MODULE{tbl = _Tbl, nodes = Nodes} = State) -> _ = erlang:send_after(?INTERVAL, self(), beat), _ = beat_all(Nodes), - {noreply, State}; + %% this will only be efficient to do this often once list_members + %% make use of the ra_leaderboard rather than calling into the local + %% khepri process + case rabbit_nodes:list_members() of + Nodes -> + {noreply, State}; + NewNodes -> + {noreply, State#?MODULE{nodes = NewNodes}} + end; handle_info({hb, Node}, #?MODULE{tbl = Tbl, nodes = _Nodes} = State) -> ets:insert(Tbl, {Node, erlang:system_time(millisecond)}), {noreply, State}; -handle_info({terminate, Node}, #?MODULE{tbl = Tbl, - nodes = _Nodes} = State) -> - ets:delete(Tbl, Node), +handle_info({terminate, Node}, #?MODULE{tbl = Tbl} = State) -> + _ = ets:delete(Tbl, Node), {noreply, State}; handle_info(_Msg, State) -> {noreply, State}. terminate(_Reason, #?MODULE{nodes = Nodes}) -> - %% only send terminate if reason is `shutdown`? + %% TODO: only send terminate if reason is `shutdown`? _ = send_terminate(Nodes), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. - %% INTERNAL beat_all(Nodes) -> From 77f694fcf7bf3ef842c810b07eb7fd7a19788046 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 10 Jan 2024 11:54:51 +0000 Subject: [PATCH 3/5] wip --- deps/rabbit/src/rabbit_channel.erl | 5 +- deps/rabbit/src/rabbit_queue_type_util.erl | 30 ++++++++++- deps/rabbit/src/rabbit_quorum_queue.erl | 32 ++---------- deps/rabbit/src/rabbit_stream_coordinator.erl | 50 +++++++++++++++---- .../rabbit/test/rabbit_stream_queue_SUITE.erl | 7 +-- .../src/rabbit_mgmt_db.erl | 2 +- .../src/rabbit_mgmt_wm_queues.erl | 7 ++- 7 files changed, 84 insertions(+), 49 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 914ba83b2039..cf715b4d9809 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -366,8 +366,9 @@ declare_fast_reply_to_v1(EncodedBin) -> -spec list() -> [pid()]. list() -> - Nodes = rabbit_nodes:list_running(), - rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, [], ?RPC_TIMEOUT). + Nodes = rabbit_presence:list_present(), + rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, + [], ?RPC_TIMEOUT). -spec list_local() -> [pid()]. diff --git a/deps/rabbit/src/rabbit_queue_type_util.erl b/deps/rabbit/src/rabbit_queue_type_util.erl index e3138b970ad8..733d61a6540f 100644 --- a/deps/rabbit/src/rabbit_queue_type_util.erl +++ b/deps/rabbit/src/rabbit_queue_type_util.erl @@ -12,7 +12,8 @@ check_auto_delete/1, check_exclusive/1, check_non_durable/1, - run_checks/2]). + run_checks/2, + erpc_call/5]). -include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). @@ -70,3 +71,30 @@ run_checks([C | Checks], Q) -> Err -> Err end. + +-spec erpc_call(node(), module(), atom(), list(), non_neg_integer()) -> + term() | {error, term()}. +erpc_call(Node, M, F, A, _Timeout) + when Node =:= node() -> + %% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned: + %% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121 + try erpc:call(Node, M, F, A, infinity) of + Result -> + Result + catch + error:Err -> + {error, Err} + end; +erpc_call(Node, M, F, A, Timeout) -> + case lists:member(Node, nodes()) of + true -> + try erpc:call(Node, M, F, A, Timeout) of + Result -> + Result + catch + error:Err -> + {error, Err} + end; + false -> + {error, noconnection} + end. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 377c8db23739..d34c881da968 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -78,7 +78,9 @@ force_all_queues_shrink_member_to_current_member/0]). -import(rabbit_queue_type_util, [args_policy_lookup/3, - qname_to_internal_name/1]). + qname_to_internal_name/1, + erpc_call/5 + ]). -include_lib("stdlib/include/qlc.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -1676,8 +1678,7 @@ format(Q, Ctx) when ?is_amqqueue(Q) -> #{running_nodes := Running0} -> Running0; _ -> - %% WARN: slow - rabbit_nodes:list_running() + rabbit_presence:list_present() end, Online = [N || N <- Nodes, lists:member(N, Running)], {_, LeaderNode} = amqqueue:get_pid(Q), @@ -1840,31 +1841,6 @@ notify_decorators(QName, F, A) -> ok end. -erpc_call(Node, M, F, A, _Timeout) - when Node =:= node() -> - %% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned: - %% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121 - try erpc:call(Node, M, F, A, infinity) of - Result -> - Result - catch - error:Err -> - {error, Err} - end; -erpc_call(Node, M, F, A, Timeout) -> - case lists:member(Node, nodes()) of - true -> - try erpc:call(Node, M, F, A, Timeout) of - Result -> - Result - catch - error:Err -> - {error, Err} - end; - false -> - {error, noconnection} - end. - is_stateful() -> true. force_shrink_member_to_current_member(VHost, Name) -> diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 0118645dbbfd..28d21fb4d238 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -57,6 +57,11 @@ -export([eval_listeners/3, state/0]). + +-import(rabbit_queue_type_util, [ + erpc_call/5 + ]). + -rabbit_boot_step({?MODULE, [{description, "Restart stream coordinator"}, {mfa, {?MODULE, recover, []}}, @@ -396,17 +401,24 @@ process_command([Server | Servers], Cmd) -> ensure_coordinator_started() -> Local = {?MODULE, node()}, - AllNodes = all_coord_members(), + ExpectedMembers = expected_coord_members(), case whereis(?MODULE) of undefined -> global:set_lock(?STREAM_COORDINATOR_STARTUP), Nodes = case ra:restart_server(?RA_SYSTEM, Local) of {error, Reason} when Reason == not_started orelse Reason == name_not_registered -> - OtherNodes = all_coord_members() -- [Local], + OtherNodes = ExpectedMembers -- [Local], + %% this could potentially be slow if some expected + %% members are on nodes that have recently terminated + %% and have left a dangling TCP connection + %% I suspect this rarely happens as the local coordinator + %% server is started in recover/0 case lists:filter( fun({_, N}) -> - erpc:call(N, erlang, whereis, [?MODULE]) =/= undefined + is_pid(erpc_call(N, erlang, + whereis, [?MODULE], + 1000)) end, OtherNodes) of [] -> start_coordinator_cluster(); @@ -414,16 +426,19 @@ ensure_coordinator_started() -> OtherNodes end; ok -> - AllNodes; + %% TODO: it may be better to do a leader call + %% here as the local member will not have caught up + %% yet + locally_known_members(); {error, {already_started, _}} -> - AllNodes; + locally_known_members(); _ -> - AllNodes + locally_known_members() end, global:del_lock(?STREAM_COORDINATOR_STARTUP), Nodes; _ -> - AllNodes + locally_known_members() end. start_coordinator_cluster() -> @@ -440,9 +455,15 @@ start_coordinator_cluster() -> [] end. -all_coord_members() -> - Nodes = rabbit_nodes:list_running() -- [node()], - [{?MODULE, Node} || Node <- [node() | Nodes]]. +present_coord_members() -> + Local = {?MODULE, node()}, + Nodes = rabbit_presence:list_present(), + [Local] ++ [{?MODULE, Node} || Node <- [node() | Nodes]]. + +expected_coord_members() -> + Local = {?MODULE, node()}, + Nodes = rabbit_nodes:list_members(), + [Local] ++ [{?MODULE, Node} || Node <- [node() | Nodes]]. version() -> 4. @@ -681,6 +702,15 @@ all_member_nodes(Streams) -> tick(_Ts, _State) -> [{aux, maybe_resize_coordinator_cluster}]. +locally_known_members() -> + %% TODO: use ra_leaderboard and fallback if leaderboard not populated + case ra:members({local, {?MODULE, node()}}) of + {_, Members, _} -> + Members; + Err -> + exit({error_fetching_locally_known_coordinator_members, Err}) + end. + maybe_resize_coordinator_cluster() -> spawn(fun() -> case ra:members({?MODULE, node()}) of diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 6c186d70ca05..2a9c42c40e46 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -645,9 +645,9 @@ grow_coordinator_cluster(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - ok = rabbit_control_helper:command(stop_app, Server1), + % ok = rabbit_control_helper:command(stop_app, Server1), ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), - rabbit_control_helper:command(start_app, Server1), + % rabbit_control_helper:command(start_app, Server1), %% at this point there _probably_ won't be a stream coordinator member on %% Server1 @@ -673,7 +673,8 @@ grow_coordinator_cluster(Config) -> false end end, 60000), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + % rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]), + ok. shrink_coordinator_cluster(Config) -> [Server0, Server1, Server2] = diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl index 83c1b1f3e5e6..f8c38555c9da 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl @@ -258,7 +258,7 @@ handle_pre_hibernate(State) -> %% rabbit_mgmt_db is hibernating the odds are rabbit_event is %% quiescing in some way too). _ = rpc:multicall( - rabbit_nodes:list_running(), rabbit_mgmt_db_handler, gc, []), + rabbit_presence:list_present(), rabbit_mgmt_db_handler, gc, []), {hibernate, State}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl index 30962ca830ff..f14c9282375a 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl @@ -72,10 +72,9 @@ is_authorized(ReqData, {Mode, Context}) -> %% Exported functions basic(ReqData) -> - %% rabbit_nodes:list_running/1 is a potentially slow function that performs - %% a cluster wide query with a reasonably long (10s) timeout. - %% TODO: replace with faster approximate function - Running = rabbit_nodes:list_running(), + %% rabbit_presence:list_present/1 is an approximate, "good enough" view of + %% the current active running cluster members. + Running = rabbit_presence:list_present(), Ctx = #{running_nodes => Running}, FmtQ = fun (Q) -> rabbit_mgmt_format:queue(Q, Ctx) end, case rabbit_mgmt_util:disable_stats(ReqData) of From 70c0d0c7e99ae94200eac578cb8cdcb8e51c1ae6 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 10 Jan 2024 14:39:22 +0000 Subject: [PATCH 4/5] Fixes --- deps/rabbit/src/rabbit.erl | 14 +++++++------- deps/rabbit/src/rabbit_stream_coordinator.erl | 2 ++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index d4048d999df3..39b1a21f0f64 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -168,6 +168,13 @@ {requires, kernel_ready}, {enables, core_initialized}]}). +-rabbit_boot_step({rabbit_presence, + [{description, "rabbit node presence server"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_presence]}}, + {requires, [database]}, + {enables, core_initialized}]}). + -rabbit_boot_step({rabbit_node_monitor, [{description, "node monitor"}, {mfa, {rabbit_sup, start_restartable_child, @@ -229,13 +236,6 @@ {requires, [core_initialized, recovery]}, {enables, routing_ready}]}). --rabbit_boot_step({rabbit_presence, - [{description, "rabbit node presence server"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_presence]}}, - {requires, [core_initialized, recovery]}, - {enables, routing_ready}]}). - -rabbit_boot_step({rabbit_looking_glass, [{description, "Looking Glass tracer and profiler"}, {mfa, {rabbit_looking_glass, boot, []}}, diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 28d21fb4d238..17f06e0dfbef 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -395,6 +395,8 @@ process_command([Server | Servers], Cmd) -> process_command(Servers, Cmd); {error, noproc} -> process_command(Servers, Cmd); + {error, nodedown} -> + process_command(Servers, Cmd); Reply -> Reply end. From 00ce4f467ca4604ff85835959cd3d9d8198ceec5 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 11 Jan 2024 12:08:29 +0000 Subject: [PATCH 5/5] Various stream coordinator cluster fixes. --- deps/rabbit/src/rabbit_amqqueue.erl | 12 +- deps/rabbit/src/rabbit_stream_coordinator.erl | 131 ++++++++++++------ deps/rabbit/src/rabbit_stream_queue.erl | 2 +- .../rabbit/test/rabbit_stream_queue_SUITE.erl | 61 +++++++- .../commands/forget_cluster_node_command.ex | 3 + 5 files changed, 158 insertions(+), 51 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index db20cf0d5f4b..3d58014c746f 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -40,7 +40,8 @@ -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). -export([has_synchronised_mirrors_online/1, is_match/2, is_in_virtual_host/2]). -export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]). --export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, list_local_stream_queues/0, +-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, + list_local_stream_queues/0, list_stream_queues_on/1, list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0, list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1, list_local_mirrored_classic_without_synchronised_mirrors/0, @@ -1220,9 +1221,12 @@ list_local_quorum_queues() -> -spec list_local_stream_queues() -> [amqqueue:amqqueue()]. list_local_stream_queues() -> - [ Q || Q <- list_by_type(stream), - amqqueue:get_state(Q) =/= crashed, - lists:member(node(), get_quorum_nodes(Q))]. + list_stream_queues_on(node()). + +-spec list_stream_queues_on(node()) -> [amqqueue:amqqueue()]. +list_stream_queues_on(Node) when is_atom(Node) -> + [Q || Q <- list_by_type(rabbit_stream_queue), + lists:member(Node, get_quorum_nodes(Q))]. -spec list_local_leaders() -> [amqqueue:amqqueue()]. list_local_leaders() -> diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 17f06e0dfbef..14420566eab2 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -11,6 +11,7 @@ -export([format_ra_event/2]). +%% machine callbacks -export([init/1, apply/3, state_enter/2, @@ -19,42 +20,47 @@ tick/2, version/0, which_module/1, - overview/1]). + overview/1, + policy_changed/1]). --export([recover/0, - stop/0, - add_replica/2, - delete_replica/2, - register_listener/1, - register_local_member_listener/1]). - --export([new_stream/2, +%% stream API +-export([ + new_stream/2, restart_stream/1, restart_stream/2, delete_stream/2, - transfer_leadership/1]). - --export([policy_changed/1]). - --export([local_pid/1, + add_replica/2, + delete_replica/2, + register_listener/1, + register_local_member_listener/1, + local_pid/1, writer_pid/1, members/1, - stream_overview/1]). + stream_overview/1 + ]). + +%% coordinator API +-export([process_command/1, + recover/0, + stop/0, + transfer_leadership/1, + forget_node/1 + ]). + +%% queries -export([query_local_pid/3, query_writer_pid/2, query_members/2, query_stream_overview/2]). - -export([log_overview/1]). --export([replay/1]). %% for SAC coordinator --export([process_command/1, - sac_state/1]). +-export([sac_state/1]). %% for testing and debugging -export([eval_listeners/3, + replay/1, state/0]). @@ -414,7 +420,7 @@ ensure_coordinator_started() -> %% this could potentially be slow if some expected %% members are on nodes that have recently terminated %% and have left a dangling TCP connection - %% I suspect this rarely happens as the local coordinator + %% I suspect this very rarely happens as the local coordinator %% server is started in recover/0 case lists:filter( fun({_, N}) -> @@ -429,7 +435,7 @@ ensure_coordinator_started() -> end; ok -> %% TODO: it may be better to do a leader call - %% here as the local member will not have caught up + %% here as the local member may not have caught up %% yet locally_known_members(); {error, {already_started, _}} -> @@ -457,10 +463,10 @@ start_coordinator_cluster() -> [] end. -present_coord_members() -> - Local = {?MODULE, node()}, - Nodes = rabbit_presence:list_present(), - [Local] ++ [{?MODULE, Node} || Node <- [node() | Nodes]]. +% present_coord_members() -> +% Local = {?MODULE, node()}, +% Nodes = rabbit_presence:list_present(), +% [Local] ++ [{?MODULE, Node} || Node <- [node() | Nodes]]. expected_coord_members() -> Local = {?MODULE, node()}, @@ -718,18 +724,22 @@ maybe_resize_coordinator_cluster() -> case ra:members({?MODULE, node()}) of {_, Members, _} -> MemberNodes = [Node || {_, Node} <- Members], - Running = rabbit_nodes:list_running(), - All = rabbit_nodes:list_members(), - case Running -- MemberNodes of + Present = rabbit_presence:list_present(), + RabbitNodes = rabbit_nodes:list_members(), + AddableNodes = [N || N <- RabbitNodes, + lists:member(N, Present)], + case AddableNodes -- MemberNodes of [] -> ok; - New -> + [New | _] -> + %% any remaining members will be added + %% next tick rabbit_log:info("~ts: New rabbit node(s) detected, " "adding : ~w", [?MODULE, New]), - add_members(Members, New) + add_member(Members, New) end, - case MemberNodes -- All of + case MemberNodes -- RabbitNodes of [] -> ok; Old -> @@ -742,30 +752,43 @@ maybe_resize_coordinator_cluster() -> end end). -add_members(_, []) -> +add_member(_, []) -> ok; -add_members(Members, [Node | Nodes]) -> +add_member(Members, Node) -> Conf = make_ra_conf(Node, [N || {_, N} <- Members]), case ra:start_server(?RA_SYSTEM, Conf) of ok -> case ra:add_member(Members, {?MODULE, Node}) of - {ok, NewMembers, _} -> - add_members(NewMembers, Nodes); - _ -> - add_members(Members, Nodes) + {ok, _, _} -> + ok; + {error, Err} -> + rabbit_log:warning("~ts: Failed to add member, reason ~w" + "deleting started server on ~w", + [?MODULE, Err, Node]), + case ra:force_delete_server(?RA_SYSTEM, {?MODULE, Node}) of + ok -> + ok; + Err -> + rabbit_log:warning("~ts: Failed to delete server " + "on ~w, reason ~w", + [?MODULE, Node, Err]), + ok + end end; Error -> - rabbit_log:warning("Stream coordinator failed to start on node ~ts : ~W", + %% TODO: there is a chance here that a server was started but never + %% added to the cluster + rabbit_log:warning("Stream coordinator server failed to start on node ~ts : ~W", [Node, Error, 10]), - add_members(Members, Nodes) + ok end. remove_members(_, []) -> ok; remove_members(Members, [Node | Nodes]) -> case ra:remove_member(Members, {?MODULE, Node}) of - {ok, NewMembers, _} -> - remove_members(NewMembers, Nodes); + {ok, _, _} -> + remove_members(Members, Nodes); _ -> remove_members(Members, Nodes) end. @@ -2134,5 +2157,31 @@ transfer_leadership([Destination | _] = _TransferCandidates) -> {ok, undefined} end. +-spec forget_node(node()) -> ok | {error, term()}. +forget_node(Node) when is_atom(Node) -> + IsRunning = rabbit_nodes:is_running(Node), + ExpectedMembers = expected_coord_members(), + ToRemove = {?MODULE, Node}, + case ra:members(ExpectedMembers) of + {ok, Members, Leader} -> + case lists:member(ToRemove, Members) of + true -> + case ra:remove_member(Leader, ToRemove) of + {ok, _, _} when IsRunning -> + _ = ra:force_delete_server(?RA_SYSTEM, ToRemove), + ok; + {ok, _, _} -> + ok; + {error, _} = Err -> + Err + end; + false -> + ok + end; + Err -> + Err + end. + + maps_to_list(M) -> lists:sort(maps:to_list(M)). diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a489ff89ad78..96121e092394 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -980,7 +980,7 @@ delete_replica(VHost, Name, Node) -> delete_all_replicas(Node) -> rabbit_log:info("Asked to remove all stream replicas from node ~ts", [Node]), - Streams = rabbit_amqqueue:list_by_type(stream), + Streams = rabbit_amqqueue:list_stream_queues_on(Node), lists:map(fun(Q) -> QName = amqqueue:get_name(Q), rabbit_log:info("~ts: removing replica on node ~w", diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 2a9c42c40e46..4ad8ff667734 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -43,7 +43,8 @@ all() -> {group, cluster_size_3_parallel_5}, {group, unclustered_size_3_1}, {group, unclustered_size_3_2}, - {group, unclustered_size_3_3} + {group, unclustered_size_3_3}, + {group, unclustered_size_3_4} ]. groups() -> @@ -102,7 +103,8 @@ groups() -> {cluster_size_3_parallel_5, [parallel], all_tests_4()}, {unclustered_size_3_1, [], [add_replica]}, {unclustered_size_3_2, [], [consume_without_local_replica]}, - {unclustered_size_3_3, [], [grow_coordinator_cluster]} + {unclustered_size_3_3, [], [grow_coordinator_cluster]}, + {unclustered_size_3_4, [], [grow_then_shrink_coordinator_cluster]} ]. all_tests_1() -> @@ -213,12 +215,14 @@ init_per_group1(Group, Config) -> cluster_size_3_2 -> 3; unclustered_size_3_1 -> 3; unclustered_size_3_2 -> 3; - unclustered_size_3_3 -> 3 + unclustered_size_3_3 -> 3; + unclustered_size_3_4 -> 3 end, Clustered = case Group of unclustered_size_3_1 -> false; unclustered_size_3_2 -> false; unclustered_size_3_3 -> false; + unclustered_size_3_4 -> false; _ -> true end, Config1 = rabbit_ct_helpers:set_config(Config, @@ -227,7 +231,14 @@ init_per_group1(Group, Config) -> {tcp_ports_base}, {rmq_nodes_clustered, Clustered}]), Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), - Ret = rabbit_ct_helpers:run_steps(Config1b, + Config1c = case Group of + unclustered_size_3_4 -> + rabbit_ct_helpers:merge_app_env( + Config1b, {rabbit, [{stream_tick_interval, 5000}]}); + _ -> + Config1b + end, + Ret = rabbit_ct_helpers:run_steps(Config1c, [fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()), case Ret of @@ -637,6 +648,46 @@ delete_last_replica(Config) -> check_leader_and_replicas(Config, [Server0]), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +grow_then_shrink_coordinator_cluster(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), + ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []), + + rabbit_ct_helpers:await_condition( + fun() -> + case rpc:call(Server0, ra, members, + [{rabbit_stream_coordinator, Server0}]) of + {_, Members, _} -> + Nodes = lists:sort([N || {_, N} <- Members]), + lists:sort([Server0, Server1, Server2]) == Nodes; + _ -> + false + end + end, 60000), + + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server1)], []), + ok = rabbit_control_helper:command(stop_app, Server2), + ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []), + rabbit_ct_helpers:await_condition( + fun() -> + case rpc:call(Server0, ra, members, + [{rabbit_stream_coordinator, Server0}]) of + {_, Members, _} -> + Nodes = lists:sort([N || {_, N} <- Members]), + lists:sort([Server0]) == Nodes; + _ -> + false + end + end, 60000), + ok. + grow_coordinator_cluster(Config) -> [Server0, Server1, _Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -673,7 +724,7 @@ grow_coordinator_cluster(Config) -> false end end, 60000), - % rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]), ok. shrink_coordinator_cluster(Config) -> diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex index 7c19deee5a1c..d85fa9455321 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex @@ -97,6 +97,9 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForgetClusterNodeCommand do any end + stream_coord_result = + :rabbit_misc.rpc_call(node_name, :rabbit_stream_coordinator, :forget_node, [atom_name]) + is_error_fun = fn {_, {:ok, _}} -> false