From 715d421e629a9d74848cbc988a4108b6ffeab375 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 22 Oct 2024 11:05:55 +0200 Subject: [PATCH 1/2] Support x-cc message annotation Support an `x-cc` message annotation in AMQP 1.0 similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. The value of the `x-cc` message annotation must by an array of strings. A message annotation is used since application properties allow only simple types. --- deps/rabbit/BUILD.bazel | 6 + deps/rabbit/app.bzl | 2 +- deps/rabbit/src/mc.erl | 30 ++- deps/rabbit/src/mc_amqp.erl | 47 +--- deps/rabbit/src/mc_amqpl.erl | 59 ++++- deps/rabbit/src/mc_compat.erl | 4 + deps/rabbit/src/mc_util.erl | 2 +- deps/rabbit/src/rabbit_amqp_session.erl | 64 +++-- deps/rabbit/src/rabbit_stream_queue.erl | 37 ++- deps/rabbit/test/amqp_address_SUITE.erl | 5 +- deps/rabbit/test/amqp_client_SUITE.erl | 258 +++++++++++++++++++- deps/rabbit/test/dead_lettering_SUITE.erl | 30 ++- deps/rabbit/test/mc_unit_SUITE.erl | 87 +++++-- deps/rabbit/test/topic_permission_SUITE.erl | 88 +++++++ deps/rabbitmq_mqtt/src/mc_mqtt.erl | 6 + deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl | 5 + release-notes/4.1.0.md | 6 + 17 files changed, 621 insertions(+), 115 deletions(-) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 8ce54e6f584b..76be5953a6c3 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -862,6 +862,12 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "topic_permission_SUITE", size = "medium", + additional_beam = [ + ":test_amqp_utils_beam", + ], + runtime_deps = [ + "//deps/rabbitmq_amqp_client:erlang_app", + ], ) rabbitmq_integration_suite( diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index dca277a2ab00..9d6f7fab563f 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -1559,7 +1559,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/topic_permission_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"], ) erlang_bytecode( name = "transactions_SUITE_beam_files", diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 3352f26185de..b3c51dca3976 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -26,6 +26,7 @@ priority/1, set_ttl/2, x_header/2, + x_headers/1, routing_headers/2, exchange/1, routing_keys/1, @@ -88,6 +89,7 @@ {timestamp, non_neg_integer()} | {list, [tagged_value()]} | {map, [{tagged_value(), tagged_value()}]} | + {array, atom(), [tagged_value()]} | null | undefined. @@ -104,11 +106,16 @@ {MetadataSize :: non_neg_integer(), PayloadSize :: non_neg_integer()}. -%% retrieve and x- header from the protocol data +%% retrieve an x- header from the protocol data %% the return value should be tagged with an AMQP 1.0 type -callback x_header(binary(), proto_state()) -> tagged_value(). +%% retrieve x- headers from the protocol data +%% the return values should be tagged with an AMQP 1.0 type +-callback x_headers(proto_state()) -> + #{binary() => tagged_value()}. + %% retrieve a property field from the protocol data %% e.g. message_id, correlation_id -callback property(atom(), proto_state()) -> @@ -148,7 +155,7 @@ init(Proto, Data, Anns) -> -spec init(protocol(), term(), annotations(), environment()) -> state(). init(Proto, Data, Anns0, Env) -> {ProtoData, ProtoAnns} = Proto:init(Data), - Anns1 = case map_size(Env) == 0 of + Anns1 = case map_size(Env) =:= 0 of true -> Anns0; false -> Anns0#{env => Env} end, @@ -214,6 +221,25 @@ x_header(Key, #?MODULE{protocol = Proto, x_header(Key, BasicMsg) -> mc_compat:x_header(Key, BasicMsg). +-spec x_headers(state()) -> + #{binary() => tagged_value()}. +x_headers(#?MODULE{protocol = Proto, + annotations = Anns, + data = Data}) -> + %% x-headers may be have been added to the annotations map. + New = maps:filtermap( + fun(Key, Val) -> + case mc_util:is_x_header(Key) of + true -> + {true, mc_util:infer_type(Val)}; + false -> + false + end + end, Anns), + maps:merge(Proto:x_headers(Data), New); +x_headers(BasicMsg) -> + mc_compat:x_headers(BasicMsg). + -spec routing_headers(state(), [x_headers | complex_types]) -> #{binary() => property_value()}. routing_headers(#?MODULE{protocol = Proto, diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index ed6c4b4145d6..06a923763da9 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -8,6 +8,7 @@ init/1, size/1, x_header/2, + x_headers/1, property/2, routing_headers/2, convert_to/3, @@ -125,6 +126,9 @@ size(#v1{message_annotations = MA, x_header(Key, Msg) -> message_annotation(Key, Msg, undefined). +x_headers(Msg) -> + #{K => V || {{_T, K}, V} <- message_annotations(Msg)}. + property(_Prop, #msg_body_encoded{properties = undefined}) -> undefined; property(Prop, #msg_body_encoded{properties = Props}) -> @@ -618,41 +622,16 @@ encode_deaths(Deaths) -> {map, Map} end, Deaths). -essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) -> +essential_properties(Msg) -> Durable = get_property(durable, Msg), Priority = get_property(priority, Msg), Timestamp = get_property(timestamp, Msg), Ttl = get_property(ttl, Msg), - Anns0 = #{?ANN_DURABLE => Durable}, - Anns = maps_put_truthy( - ?ANN_PRIORITY, Priority, - maps_put_truthy( - ?ANN_TIMESTAMP, Timestamp, - maps_put_truthy( - ttl, Ttl, - Anns0))), - case MA of - [] -> - Anns; - _ -> - lists:foldl( - fun ({{symbol, <<"x-routing-key">>}, - {utf8, Key}}, Acc) -> - maps:update_with(?ANN_ROUTING_KEYS, - fun(L) -> [Key | L] end, - [Key], - Acc); - ({{symbol, <<"x-cc">>}, - {list, CCs0}}, Acc) -> - CCs = [CC || {_T, CC} <- CCs0], - maps:update_with(?ANN_ROUTING_KEYS, - fun(L) -> L ++ CCs end, - CCs, - Acc); - ({{symbol, <<"x-exchange">>}, - {utf8, Exchange}}, Acc) -> - Acc#{?ANN_EXCHANGE => Exchange}; - (_, Acc) -> - Acc - end, Anns, MA) - end. + Anns = #{?ANN_DURABLE => Durable}, + maps_put_truthy( + ?ANN_PRIORITY, Priority, + maps_put_truthy( + ?ANN_TIMESTAMP, Timestamp, + maps_put_truthy( + ttl, Ttl, + Anns))). diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 723e60cd3f79..9e62d60b65fe 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -11,6 +11,7 @@ init/1, size/1, x_header/2, + x_headers/1, routing_headers/2, convert_to/3, convert_from/3, @@ -273,6 +274,23 @@ x_header(Key, #content{properties = none} = Content0) -> Content = rabbit_binary_parser:ensure_content_decoded(Content0), x_header(Key, Content). +x_headers(#content{properties = #'P_basic'{headers = undefined}}) -> + #{}; +x_headers(#content{properties = #'P_basic'{headers = Headers}}) -> + L = lists:filtermap( + fun({Name, Type, Val}) -> + case mc_util:is_x_header(Name) of + true -> + {true, {Name, from_091(Type, Val)}}; + false -> + false + end + end, Headers), + maps:from_list(L); +x_headers(#content{properties = none} = Content0) -> + Content = rabbit_binary_parser:ensure_content_decoded(Content0), + x_headers(Content). + property(Prop, Content) -> mc_util:infer_type(mc_compat:get_property(Prop, Content)). @@ -690,10 +708,23 @@ from_091(binary, V) -> {binary, V}; from_091(timestamp, V) -> {timestamp, V * 1000}; from_091(byte, V) -> {byte, V}; from_091(void, _V) -> null; -from_091(array, L) -> - {list, [from_091(T, V) || {T, V} <- L]}; from_091(table, L) -> - {map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]}. + {map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]}; +from_091(array, []) -> + {list, []}; +from_091(array, L0 = [{T0, _} | _]) -> + {L = [{T1, _} | _], {Monomorphic, _}} = + lists:mapfoldl(fun({T, V}, {Mono0, PrevType}) -> + Mono = case Mono0 of + false -> false; + true -> T =:= PrevType + end, + {from_091(T, V), {Mono, T}} + end, {true, T0}, L0), + case Monomorphic of + true -> {array, T1, L}; + false -> {list, L} + end. map_add(_T, _Key, _Type, undefined, Acc) -> Acc; @@ -707,7 +738,6 @@ supported_header_value_type(table) -> supported_header_value_type(_) -> true. - amqp10_map_get(_K, []) -> undefined; amqp10_map_get(K, Tuples) -> @@ -857,3 +887,24 @@ amqp10_section_header(Header, Headers) -> amqp_encoded_binary(Section) -> iolist_to_binary(amqp10_framing:encode_bin(Section)). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +from_091_array_test() -> + {list, []} = from_091(array, []), + {array, utf8, [{utf8, <<"e1">>}]} = from_091(array, [{longstr, <<"e1">>}]), + {array, utf8, [{utf8, <<"e1">>}, + {utf8, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>}, + {longstr, <<"e2">>}]), + {list, [{utf8, <<"e1">>}, + {binary, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>}, + {binary, <<"e2">>}]), + {list, [{utf8, <<"e1">>}, + {binary, <<"e2">>}, + {utf8, <<"e3">>}, + {utf8, <<"e4">>}]} = from_091(array, [{longstr, <<"e1">>}, + {binary, <<"e2">>}, + {longstr, <<"e3">>}, + {longstr, <<"e4">>}]). +-endif. diff --git a/deps/rabbit/src/mc_compat.erl b/deps/rabbit/src/mc_compat.erl index 056905239d96..5fce91b202a4 100644 --- a/deps/rabbit/src/mc_compat.erl +++ b/deps/rabbit/src/mc_compat.erl @@ -20,6 +20,7 @@ priority/1, set_ttl/2, x_header/2, + x_headers/1, routing_headers/2, %%% convert_to/2, @@ -138,6 +139,9 @@ set_ttl(Value, #basic_message{content = Content0} = Msg) -> x_header(Key,#basic_message{content = Content}) -> mc_amqpl:x_header(Key, Content). +x_headers(#basic_message{content = Content}) -> + mc_amqpl:x_headers(Content). + routing_headers(#basic_message{content = Content}, Opts) -> mc_amqpl:routing_headers(Content, Opts). diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 1f20d15699db..9ec7928de9b7 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -61,7 +61,7 @@ utf8_string_is_ascii(UTF8String) -> amqp_map_get(Key, {map, List}, Default) -> amqp_map_get(Key, List, Default); amqp_map_get(Key, List, Default) when is_list(List) -> - case lists:search(fun ({{_, K}, _}) -> K == Key end, List) of + case lists:search(fun ({{_, K}, _}) -> K =:= Key end, List) of {value, {_K, V}} -> V; false -> diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index c9d505647eb5..16c84db24706 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -154,6 +154,7 @@ %% The routing key is either defined in the ATTACH frame and static for %% the life time of the link or dynamically provided in each message's %% "to" field (address v2) or "subject" field (address v1). + %% (A publisher can set additional routing keys via the x-cc message annotation.) routing_key :: rabbit_types:routing_key() | to | subject, %% queue_name_bin is only set if the link target address refers to a queue. queue_name_bin :: undefined | rabbit_misc:resource_name(), @@ -2369,11 +2370,11 @@ incoming_link_transfer( Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of - {ok, X, RoutingKey, Mc1, PermCache} -> + {ok, X, RoutingKeys, Mc1, PermCache} -> Mc2 = rabbit_message_interceptor:intercept(Mc1), check_user_id(Mc2, User), - TopicPermCache = check_write_permitted_on_topic( - X, User, RoutingKey, TopicPermCache0), + TopicPermCache = check_write_permitted_on_topics( + X, User, RoutingKeys, TopicPermCache0), QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace), Opts = #{correlation => {HandleInt, DeliveryId}}, @@ -2408,14 +2409,14 @@ incoming_link_transfer( "delivery_tag=~p, delivery_id=~p, reason=~p", [DeliveryTag, DeliveryId, Reason]) end; - {error, #'v1_0.error'{} = Err} -> + {error, {anonymous_terminus, false}, #'v1_0.error'{} = Err} -> Disposition = case Settled of true -> []; false -> [released(DeliveryId)] end, Detach = [detach(HandleInt, Link0, Err)], {error, Disposition ++ Detach}; - {error, anonymous_terminus, #'v1_0.error'{} = Err} -> + {error, {anonymous_terminus, true}, #'v1_0.error'{} = Err} -> %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors case Settled of true -> @@ -2440,13 +2441,13 @@ incoming_link_transfer( end. lookup_target(#exchange{} = X, LinkRKey, Mc, _, _, PermCache) -> - lookup_routing_key(X, LinkRKey, Mc, PermCache); + lookup_routing_key(X, LinkRKey, Mc, false, PermCache); lookup_target(#resource{} = XName, LinkRKey, Mc, _, _, PermCache) -> case rabbit_exchange:lookup(XName) of {ok, X} -> - lookup_routing_key(X, LinkRKey, Mc, PermCache); + lookup_routing_key(X, LinkRKey, Mc, false, PermCache); {error, not_found} -> - {error, error_not_found(XName)} + {error, {anonymous_terminus, false}, error_not_found(XName)} end; lookup_target(to, to, Mc, Vhost, User, PermCache0) -> case mc:property(to, Mc) of @@ -2458,25 +2459,26 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) -> case rabbit_exchange:lookup(XName) of {ok, X} -> check_internal_exchange(X), - lookup_routing_key(X, RKey, Mc, PermCache); + lookup_routing_key(X, RKey, Mc, true, PermCache); {error, not_found} -> - {error, anonymous_terminus, error_not_found(XName)} + {error, {anonymous_terminus, true}, error_not_found(XName)} end; {error, bad_address} -> - {error, anonymous_terminus, + {error, {anonymous_terminus, true}, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, description = {utf8, <<"bad 'to' address string: ", String/binary>>}}} end; undefined -> - {error, anonymous_terminus, + {error, {anonymous_terminus, true}, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}} end. lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}}, - RKey0, Mc0, PermCache) -> + RKey0, Mc0, AnonTerm, PermCache) -> + Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0), RKey = case RKey0 of subject -> case mc:property(subject, Mc0) of @@ -2488,9 +2490,25 @@ lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}}, _ when is_binary(RKey0) -> RKey0 end, - Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0), - Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc1), - {ok, X, RKey, Mc, PermCache}. + case mc:x_header(<<"x-cc">>, Mc0) of + undefined -> + RKeys = [RKey], + Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), + {ok, X, RKeys, Mc, PermCache}; + {array, utf8, CCs0} -> + CCs = lists:map(fun({utf8, CC}) -> CC end, CCs0), + RKeys = [RKey | CCs], + Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), + {ok, X, RKeys, Mc, PermCache}; + BadValue -> + Desc = unicode:characters_to_binary( + lists:flatten( + io_lib:format( + "bad value for 'x-cc' message-annotation: ~tp", [BadValue]))), + {error, {anonymous_terminus, AnonTerm}, + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, Desc}}} + end. process_routing_confirm([], _SenderSettles = true, _, U) -> rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1), @@ -3445,14 +3463,20 @@ check_resource_access(Resource, Perm, User, Cache) -> end end. --spec check_write_permitted_on_topic( +-spec check_write_permitted_on_topics( rabbit_types:exchange(), rabbit_types:user(), - rabbit_types:routing_key(), + [rabbit_types:routing_key(),...], topic_permission_cache()) -> topic_permission_cache(). -check_write_permitted_on_topic(Resource, User, RoutingKey, TopicPermCache) -> - check_topic_authorisation(Resource, User, RoutingKey, write, TopicPermCache). +check_write_permitted_on_topics(#exchange{type = topic} = Resource, + User, RoutingKeys, TopicPermCache) -> + lists:foldl( + fun(RoutingKey, Cache) -> + check_topic_authorisation(Resource, User, RoutingKey, write, Cache) + end, TopicPermCache, RoutingKeys); +check_write_permitted_on_topics(_, _, _, TopicPermCache) -> + TopicPermCache. -spec check_read_permitted_on_topic( rabbit_types:exchange(), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a011dc09a650..fa9b39bdb971 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1272,17 +1272,36 @@ parse_uncompressed_subbatch( entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) -> Mc0 = mc:init(mc_amqp, Entry, #{}), - %% If exchange or routing_keys annotation isn't present the entry most likely came + %% If exchange or routing keys annotation isn't present the entry most likely came %% from the rabbitmq-stream plugin so we'll choose defaults that simulate use %% of the direct exchange. - Mc1 = case mc:exchange(Mc0) of - undefined -> mc:set_annotation(?ANN_EXCHANGE, <<>>, Mc0); - _ -> Mc0 - end, - Mc2 = case mc:routing_keys(Mc1) of - [] -> mc:set_annotation(?ANN_ROUTING_KEYS, [QName], Mc1); - _ -> Mc1 - end, + XHeaders = mc:x_headers(Mc0), + Exchange = case XHeaders of + #{<<"x-exchange">> := {utf8, X}} -> + X; + _ -> + <<>> + end, + RKeys0 = case XHeaders of + #{<<"x-cc">> := {array, utf8, CCs}} -> + [CC || {utf8, CC} <- CCs]; + _ -> + [] + end, + RKeys1 = case XHeaders of + #{<<"x-routing-key">> := {utf8, RK}} -> + [RK | RKeys0]; + _ -> + RKeys0 + end, + RKeys = case RKeys1 of + [] -> + [QName]; + _ -> + RKeys1 + end, + Mc1 = mc:set_annotation(?ANN_EXCHANGE, Exchange, Mc0), + Mc2 = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2), case rabbit_amqp_filtex:filter(Filter, Mc) of true -> diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index f5a0f74b8932..607aa11473aa 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -304,10 +304,9 @@ target_per_message_exchange_routing_key(Config) -> Tag1 = Body1 = <<1>>, Tag2 = Body2 = <<2>>, - %% Although mc_amqp:essential_properties/1 parses these annotations, they should be ignored. + %% Although mc_amqp:essential_properties/1 parses the x-exchange annotation, it should be ignored. Msg1 = amqp10_msg:set_message_annotations( - #{<<"x-exchange">> => <<"ignored">>, - <<"x-routing-key">> => <<"ignored">>}, + #{<<"x-exchange">> => <<"ignored">>}, amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1))), Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2)), ok = amqp10_client:send_msg(Sender, Msg1), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index f192a0c309f8..f80d59b07caa 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -116,7 +116,8 @@ groups() -> available_messages_quorum_queue, available_messages_stream, incoming_message_interceptors, - trace, + trace_classic_queue, + trace_stream, user_id, message_ttl, plugin, @@ -156,7 +157,12 @@ groups() -> tcp_back_pressure_rabbitmq_internal_flow_quorum_queue, session_max_per_connection, link_max_per_session, - reserved_annotation + reserved_annotation, + x_cc_annotation_exchange, + x_cc_annotation_exchange_routing_key_empty, + x_cc_annotation_queue, + x_cc_annotation_null, + bad_x_cc_annotation_exchange ]}, {cluster_size_3, [shuffle], @@ -4393,16 +4399,26 @@ incoming_message_interceptors(Config) -> ok = amqp10_client:close_connection(Connection), true = rpc(Config, persistent_term, erase, [Key]). -trace(Config) -> +trace_classic_queue(Config) -> + trace(atom_to_binary(?FUNCTION_NAME), <<"classic">>, Config). + +trace_stream(Config) -> + trace(atom_to_binary(?FUNCTION_NAME), <<"stream">>, Config). + +trace(Q, QType, Config) -> Node = atom_to_binary(get_node_config(Config, 0, nodename)), TraceQ = <<"my trace queue">>, - Q = <<"my queue">>, Qs = [Q, TraceQ], RoutingKey = <<"my routing key">>, Payload = <<"my payload">>, CorrelationId = <<"my correlation 👀"/utf8>>, Ch = rabbit_ct_client_helpers:open_channel(Config), - [#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q0}) || Q0 <- Qs], + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{ + queue = Q, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, QType}]}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = TraceQ}), #'queue.bind_ok'{} = amqp_channel:call( Ch, #'queue.bind'{queue = TraceQ, exchange = <<"amq.rabbitmq.trace">>, @@ -4420,16 +4436,21 @@ trace(Config) -> {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]), {ok, SessionReceiver} = amqp10_client:begin_session_sync(Connection), + {ok, Receiver} = amqp10_client:attach_receiver_link(SessionReceiver, + <<"test-receiver">>, + rabbitmq_amqp_address:queue(Q)), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, {ok, Sender} = amqp10_client:attach_sender_link( SessionSender, <<"test-sender">>, rabbitmq_amqp_address:exchange(<<"amq.direct">>, RoutingKey)), ok = wait_for_credit(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link(SessionReceiver, - <<"test-receiver">>, - rabbitmq_amqp_address:queue(Q)), Msg0 = amqp10_msg:new(<<"tag 1">>, Payload, true), - Msg = amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0), + Msg = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, <<"my CC key">>}]}}, + amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0)), ok = amqp10_client:send_msg(Sender, Msg), {ok, _} = amqp10_client:get_msg(Receiver), @@ -4439,7 +4460,7 @@ trace(Config) -> payload = Payload}} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}), ?assertMatch(#{<<"exchange_name">> := <<"amq.direct">>, - <<"routing_keys">> := [RoutingKey], + <<"routing_keys">> := [RoutingKey, <<"my CC key">>], <<"connection">> := <<"127.0.0.1:", _/binary>>, <<"node">> := Node, <<"vhost">> := <<"/">>, @@ -4454,7 +4475,7 @@ trace(Config) -> payload = Payload}} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}), ?assertMatch(#{<<"exchange_name">> := <<"amq.direct">>, - <<"routing_keys">> := [RoutingKey], + <<"routing_keys">> := [RoutingKey, <<"my CC key">>], <<"connection">> := <<"127.0.0.1:", _/binary>>, <<"node">> := Node, <<"vhost">> := <<"/">>, @@ -5956,6 +5977,221 @@ reserved_annotation(Config) -> end, ok = close_connection_sync(Connection). +%% Test that x-cc routing keys work together with target address +%% /exchanges/:exchange/:routing-key +x_cc_annotation_exchange(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}), + Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key 1">>), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + Payload = <<"my message">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, <<"key 2">>}]}}, + amqp10_msg:new(<<"tag">>, Payload))), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([Payload], amqp10_msg:body(Msg1)), + ?assertEqual([Payload], amqp10_msg:body(Msg2)), + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that x-cc routing keys work together with target address +%% /exchanges/:exchange +x_cc_annotation_exchange_routing_key_empty(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}), + AddressEmptyRoutingKey = rabbitmq_amqp_address:exchange(<<"amq.direct">>), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, AddressEmptyRoutingKey), + ok = wait_for_credit(Sender), + + Payload = <<"my message">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, <<"key 1">>}, + {utf8, <<"key 2">>}]}}, + amqp10_msg:new(<<"tag">>, Payload))), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([Payload], amqp10_msg:body(Msg1)), + ?assertEqual([Payload], amqp10_msg:body(Msg2)), + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that x-cc routing keys work together with target address +%% /queues/:queue +x_cc_annotation_queue(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + Address1 = rabbitmq_amqp_address:queue(QName1), + Address2 = rabbitmq_amqp_address:queue(QName2), + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address1), + ok = wait_for_credit(Sender), + + Payload = <<"my message">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, QName2}]}}, + amqp10_msg:new(<<"tag">>, Payload))), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, Address1, settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, Address2, settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([Payload], amqp10_msg:body(Msg1)), + ?assertEqual([Payload], amqp10_msg:body(Msg2)), + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that x-cc routing keys work together with target address 'null' +x_cc_annotation_null(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + QAddress1 = rabbitmq_amqp_address:queue(QName1), + QAddress2 = rabbitmq_amqp_address:queue(QName2), + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key-1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"🗝️-2"/utf8>>, #{}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), + ok = wait_for_credit(Sender), + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, QAddress1, settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, QAddress2, settled), + + Msg1 = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, <<"key-1">>}, + {utf8, <<"key-3">>}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"🗝️-2"/utf8>>)}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = wait_for_accepted(<<"t1">>), + {ok, R1M1} = amqp10_client:get_msg(Receiver1), + {ok, R2M1} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), + + Msg2 = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, <<"🗝️-2"/utf8>>}, + {utf8, <<"key-1">>}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>)}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg(Sender, Msg2), + ok = wait_for_accepted(<<"t2">>), + {ok, R1M2} = amqp10_client:get_msg(Receiver1), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m2">>], amqp10_msg:body(R1M2)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)), + + Msg3 = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, QName1}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName2)}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + ok = amqp10_client:send_msg(Sender, Msg3), + ok = wait_for_accepted(<<"t3">>), + {ok, R1M3} = amqp10_client:get_msg(Receiver1), + {ok, R2M3} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)), + + Msg4 = amqp10_msg:set_message_annotations( + %% We send a symbol array instead of utf8 array. + #{<<"x-cc">> => {array, symbol, [{symbol, QName1}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName2)}, + amqp10_msg:new(<<"t4">>, <<"m4">>))), + ok = amqp10_client:send_msg(Sender, Msg4), + %% "If the source of the link supports the rejected outcome, and the message has not + %% already been settled by the sender, then the routing node MUST reject the message. + %% In this case the error field of rejected MUST contain the error which would have been communicated + %% in the detach which would have be sent if a link to the same address had been attempted." + %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors + receive {amqp10_disposition, {{rejected, Error}, <<"t4">>}} -> + ?assertMatch( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, <<"bad value for 'x-cc' message-annotation:", _/binary>>}}, + Error) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver1), + ok = amqp10_client:detach_link(Receiver2), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +bad_x_cc_annotation_exchange(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + + Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key-1">>), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_message_annotations( + %% We send a list instead of an array. + #{<<"x-cc">> => {list, [{utf8, <<"🗝️-2"/utf8>>}]}}, + amqp10_msg:new(<<"tag">>, <<"msg">>))), + ok = wait_for_settlement(<<"tag">>, released), + receive {amqp10_event, {link, Sender, {detached, Error}}} -> + ?assertMatch( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, <<"bad value for 'x-cc' message-annotation: " + "{list,[{utf8,<<\"🗝️-2"/utf8, _Rest/binary>>}}, + Error) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + %% internal %% diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 6d0ad63b13d8..63d49fd5d184 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -174,17 +174,21 @@ end_per_group(Group, Config) -> Config end. -init_per_testcase(T, Config) - when T =:= dead_letter_reject_expire_expire orelse - T =:= stream -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of - ok -> - init_per_testcase0(T, Config); - {skip, _} = Skip -> - %% With feature flag message_containers_deaths_v2 disabled, test case: - %% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159 - %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 - Skip +init_per_testcase(T = dead_letter_reject_expire_expire, Config) -> + %% With feature flag message_containers_deaths_v2 disabled, this test is known to fail due to + %% https://github.com/rabbitmq/rabbitmq-server/issues/11159 + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2), + init_per_testcase0(T, Config); +init_per_testcase(T = stream, Config) -> + %% With feature flag message_containers_deaths_v2 disabled, this test is known to fail due to + %% https://github.com/rabbitmq/rabbitmq-server/issues/11173 + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2), + case rabbit_ct_helpers:is_mixed_versions() of + true -> + {skip, "TODO unskip this test when the lower version got bumped to v4.0.3 " + "which includes https://github.com/rabbitmq/rabbitmq-server/pull/12571"}; + false -> + init_per_testcase0(T, Config) end; init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -1860,6 +1864,10 @@ stream(Config) -> {timestamp, T2} = rabbit_misc:table_lookup(Death2, <<"time">>), ?assert(T1 < T2), + ?assertEqual({array, [{longstr, <<"cc 1">>}, + {longstr, <<"cc 2">>}]}, + rabbit_misc:table_lookup(Headers, <<"CC">>)), + ok = rabbit_ct_client_helpers:close_channel(Ch0), ok = rabbit_ct_client_helpers:close_channel(Ch1). diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index acc9ea69adfe..5f9ccc592c68 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -42,7 +42,9 @@ all_tests() -> amqp_amqpl_message_id_binary, amqp_amqpl_unsupported_values_not_converted, amqp_to_amqpl_data_body, - amqp_amqpl_amqp_bodies + amqp_amqpl_amqp_bodies, + amqp_x_headers, + amqpl_x_headers ]. %%%=================================================================== @@ -187,18 +189,12 @@ amqpl_table_x_header_array_of_tbls(_Config) -> Content = #content{properties = Props, payload_fragments_rev = Payload}, Msg = mc:init(mc_amqpl, Content, annotations()), - ?assertMatch({list, - [{map, - [{{symbol, <<"type">>}, {utf8, <<"apple">>}}, - {{symbol, <<"count">>}, {long, 99}}]}, - {map, - [{{symbol, <<"type">>}, {utf8, <<"orange">>}}, - {{symbol, <<"count">>}, {long, 45}}]} - ]}, - mc:x_header(<<"x-fruit">>, Msg)), - - - ok. + ?assertMatch({array, map, + [{map, [{{symbol, <<"type">>}, {utf8, <<"apple">>}}, + {{symbol, <<"count">>}, {long, 99}}]}, + {map, [{{symbol, <<"type">>}, {utf8, <<"orange">>}}, + {{symbol, <<"count">>}, {long, 45}}]}]}, + mc:x_header(<<"x-fruit">>, Msg)). amqpl_death_v1_records(_Config) -> ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}). @@ -364,8 +360,9 @@ amqpl_amqp_bin_amqpl(_Config) -> Msg10Pre = mc:convert(mc_amqp, Msg), Payload = iolist_to_binary(mc:protocol_state(Msg10Pre)), Msg10 = mc:init(mc_amqp, Payload, #{}), - ?assertEqual(<<"exch">>, mc:exchange(Msg10)), - ?assertEqual([<<"apple">>], mc:routing_keys(Msg10)), + ?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>}, + <<"x-routing-key">> := {utf8, <<"apple">>}}, + mc:x_headers(Msg10)), ?assertEqual(98, mc:priority(Msg10)), ?assertEqual(true, mc:is_persistent(Msg10)), ?assertEqual(99000, mc:timestamp(Msg10)), @@ -422,8 +419,6 @@ amqpl_amqp_bin_amqpl(_Config) -> MsgL2 = mc:convert(mc_amqpl, Msg10), - ?assertEqual(<<"exch">>, mc:exchange(MsgL2)), - ?assertEqual([<<"apple">>], mc:routing_keys(MsgL2)), ?assertEqual(98, mc:priority(MsgL2)), ?assertEqual(true, mc:is_persistent(MsgL2)), ?assertEqual(99000, mc:timestamp(MsgL2)), @@ -450,9 +445,17 @@ amqpl_cc_amqp_bin_amqpl(_Config) -> Msg10Pre = mc:convert(mc_amqp, Msg), Sections = iolist_to_binary(mc:protocol_state(Msg10Pre)), Msg10 = mc:init(mc_amqp, Sections, #{}), - ?assertEqual(RoutingKeys, mc:routing_keys(Msg10)), + ?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>}, + <<"x-routing-key">> := {utf8, <<"apple">>}, + <<"x-cc">> := {array, utf8, [{utf8, <<"q1">>}, + {utf8, <<"q2">>}]}}, + mc:x_headers(Msg10)), - MsgL2 = mc:convert(mc_amqpl, Msg10), + %% Here, we simulate what rabbit_stream_queue does: + Msg10b = mc:set_annotation(?ANN_EXCHANGE, <<"exch">>, Msg10), + Msg10c = mc:set_annotation(?ANN_ROUTING_KEYS, [<<"apple">>, <<"q1">>, <<"q2">>], Msg10b), + + MsgL2 = mc:convert(mc_amqpl, Msg10c), ?assertEqual(RoutingKeys, mc:routing_keys(MsgL2)), ?assertMatch(#content{properties = #'P_basic'{headers = Headers}}, mc:protocol_state(MsgL2)). @@ -751,6 +754,52 @@ amqp_amqpl_amqp_bodies(_Config) -> end || Body <- Bodies], ok. +amqp_x_headers(_Config) -> + MAC = [ + {{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}}, + thead2('x-list', list, [utf8(<<"l">>)]), + thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]) + ], + M = #'v1_0.message_annotations'{content = MAC}, + AC = [thead(long, 5)], + A = #'v1_0.application_properties'{content = AC}, + D = #'v1_0.data'{content = <<"data">>}, + + Payload = serialize_sections([M, A, D]), + Msg0 = mc:init(mc_amqp, Payload, annotations()), + Msg1 = mc:set_annotation(<<"x-1">>, {byte, -2}, Msg0), + ?assertEqual(#{<<"x-1">> => {byte, -2}, + <<"x-list">> => {list,[{utf8,<<"l">>}]}, + <<"x-map">> => {map,[{{utf8,<<"k">>},{utf8,<<"v">>}}]}, + <<"x-stream-filter">> => {utf8,<<"apple">>}}, + mc:x_headers(Msg1)). + +amqpl_x_headers(_Config) -> + Props = #'P_basic'{headers = [{<<"a-string">>, longstr, <<"a string">>}, + {<<"x-1">>, binary, <<"v1">>}, + {<<"x-stream-filter">>, longstr, <<"apple">>}]}, + Payload = [<<"data">>], + Content = #content{properties = Props, + payload_fragments_rev = Payload}, + + Msg0 = mc:init(mc_amqpl, Content, annotations()), + Msg1 = mc:set_annotation(delivery_count, 1, Msg0), + Msg = mc:set_annotation(<<"x-delivery-count">>, 2, Msg1), + ?assertEqual(#{<<"x-1">> => {binary, <<"v1">>}, + <<"x-stream-filter">> => {utf8,<<"apple">>}, + <<"x-delivery-count">> => {long, 2}}, + mc:x_headers(Msg)), + + XName = <<"exch">>, + RoutingKey = <<"apple">>, + {ok, BasicMsg0} = rabbit_basic:message_no_id(XName, RoutingKey, Content), + BasicMsg1 = mc:set_annotation(delivery_count, 1, BasicMsg0), + BasicMsg = mc:set_annotation(<<"x-delivery-count">>, 2, BasicMsg1), + ?assertEqual(#{<<"x-1">> => {binary, <<"v1">>}, + <<"x-stream-filter">> => {utf8,<<"apple">>}, + <<"x-delivery-count">> => {long, 2}}, + mc:x_headers(BasicMsg)). + %% Utility amqp10_encode_bin(L) when is_list(L) -> diff --git a/deps/rabbit/test/topic_permission_SUITE.erl b/deps/rabbit/test/topic_permission_SUITE.erl index 2849b76fd3b9..8e9866f10b5d 100644 --- a/deps/rabbit/test/topic_permission_SUITE.erl +++ b/deps/rabbit/test/topic_permission_SUITE.erl @@ -8,6 +8,7 @@ -module(topic_permission_SUITE). -include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -compile([export_all, nowarn_export_all]). @@ -21,6 +22,7 @@ groups() -> [ {sequential_tests, [], [ + amqp_x_cc_annotation, amqpl_cc_headers, amqpl_bcc_headers, topic_permission_database_access, @@ -29,6 +31,7 @@ groups() -> ]. init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:set_config( Config, @@ -56,6 +59,91 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). +amqp_x_cc_annotation(Config) -> + ok = set_topic_permissions(Config, "^a", ".*"), + + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + {Connection, Session1, LinkPair} = amqp_utils:init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.topic">>, <<"a.1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.topic">>, <<"a.2">>, #{}), + + {ok, Sender1} = amqp10_client:attach_sender_link( + Session1, + <<"sender 1">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)), + ok = amqp_utils:wait_for_credit(Sender1), + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session1, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session1, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled), + %% We have permissions to send to both topics. + %% Therefore, m1 should be sent to both queues. + ok = amqp10_client:send_msg(Sender1, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, <<"a.2">>}]}}, + amqp10_msg:new(<<"t1">>, <<"m1">>, true))), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg2)), + ok = amqp_utils:detach_link_sync(Sender1), + ok = amqp_utils:detach_link_sync(Receiver1), + ok = amqp_utils:detach_link_sync(Receiver2), + + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + {ok, Sender2} = amqp10_client:attach_sender_link( + Session2, + <<"sender 2">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"x.1">>)), + ok = amqp_utils:wait_for_credit(Sender2), + ok = amqp10_client:send_msg(Sender2, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, <<"a.2">>}]}}, + amqp10_msg:new(<<"t2">>, <<"m2">>, true))), + receive + {amqp10_event, + {session, Session2, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description1}}}}} -> + ?assertEqual( + <<"write access to topic 'x.1' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, + Description1) + after 5000 -> amqp_utils:flush(missing_ended), + ct:fail({missing_event, ?LINE}) + end, + + {ok, Session3} = amqp10_client:begin_session_sync(Connection), + {ok, Sender3} = amqp10_client:attach_sender_link( + Session3, + <<"sender 3">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)), + ok = amqp_utils:wait_for_credit(Sender3), + ok = amqp10_client:send_msg(Sender3, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {array, utf8, [{utf8, <<"x.2">>}]}}, + amqp10_msg:new(<<"t3">>, <<"m3">>, true))), + receive + {amqp10_event, + {session, Session3, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description2}}}}} -> + ?assertEqual( + <<"write access to topic 'x.2' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, + Description2) + after 5000 -> amqp_utils:flush(missing_ended), + ct:fail({missing_event, ?LINE}) + end, + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = amqp_utils:end_session_sync(Session1), + ok = amqp10_client:close_connection(Connection), + ok = clear_topic_permissions(Config). + amqpl_cc_headers(Config) -> amqpl_headers(<<"CC">>, Config). diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index b6cae214c8c3..656b44dd8b7b 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -14,6 +14,7 @@ init/1, size/1, x_header/2, + x_headers/1, property/2, routing_headers/2, convert_to/3, @@ -390,6 +391,11 @@ x_header(Key, #mqtt_msg{props = #{'User-Property' := UserProp}}) -> x_header(_Key, #mqtt_msg{}) -> undefined. +x_headers(#mqtt_msg{props = #{'User-Property' := UserProp}}) -> + #{Key => {utf8, Val} || {<<"x-", _/binary>> = Key, Val} <- UserProp}; +x_headers(#mqtt_msg{}) -> + #{}. + property(correlation_id, #mqtt_msg{props = #{'Correlation-Data' := Corr}}) -> case mc_util:urn_string_to_uuid(Corr) of {ok, UUId} -> diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index 14d88f357602..c6d1308e9ad2 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -61,6 +61,10 @@ roundtrip_amqp(_Config) -> PayloadSize = 10, ExpectedSize = {MetaDataSize, PayloadSize}, ?assertEqual(ExpectedSize, mc:size(Mc0)), + ?assertEqual(#{<<"x-key-1">> => {utf8, <<"val-1">>}, + <<"x-key-2">> => {utf8, <<"val-2">>}, + <<"x-key-3">> => {utf8, <<"val-3">>}}, + mc:x_headers(Mc0)), Env = #{}, ?assertEqual(Msg, mc_mqtt:convert_to(mc_mqtt, Msg, Env)), @@ -310,6 +314,7 @@ mqtt_amqpl_alt(_Config) -> }, Anns = #{?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Msg#mqtt_msg.topic)]}, Mc = mc:init(mc_mqtt, Msg, Anns), + ?assertEqual(#{}, mc:x_headers(Mc)), MsgL = mc:convert(mc_amqpl, Mc), #content{properties = #'P_basic'{headers = HL} = Props} = diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index ca80cfa59630..f3640cd5bb91 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -18,6 +18,12 @@ This feature: This feature allows operators to gain insights into the message sizes being published to RabbitMQ, such as average message size, number of messages per pre-defined bucket (which can both be computed accurately), and percentiles (which will be approximated). Each metric is labelled by protocol (AMQP 1.0, AMQP 0.9.1, MQTT 5.0, MQTT 3.1.1, and MQTT 3.1). +## New Features + +### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation +[PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation. +This annotation allows publishers to specify an [array](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-array) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. + ## Potential incompatibilities * The default MQTT [Maximum Packet Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086) changed from 256 MiB to 16 MiB. This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`. Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB). From ca1367571b3f08b6732c9dbe8fede27b82238f0e Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 23 Oct 2024 15:17:14 +0200 Subject: [PATCH 2/2] Use list instead of array for x-cc message annotation --- deps/rabbit/src/mc_amqpl.erl | 40 +------------ deps/rabbit/src/rabbit_amqp_session.erl | 30 ++++++---- deps/rabbit/src/rabbit_stream_queue.erl | 2 +- deps/rabbit/test/amqp_client_SUITE.erl | 64 +++++++++++++-------- deps/rabbit/test/dead_lettering_SUITE.erl | 20 ++----- deps/rabbit/test/mc_unit_SUITE.erl | 17 +++--- deps/rabbit/test/topic_permission_SUITE.erl | 6 +- release-notes/4.1.0.md | 2 +- 8 files changed, 83 insertions(+), 98 deletions(-) diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 9e62d60b65fe..936a1b130d89 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -708,23 +708,10 @@ from_091(binary, V) -> {binary, V}; from_091(timestamp, V) -> {timestamp, V * 1000}; from_091(byte, V) -> {byte, V}; from_091(void, _V) -> null; +from_091(array, L) -> + {list, [from_091(T, V) || {T, V} <- L]}; from_091(table, L) -> - {map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]}; -from_091(array, []) -> - {list, []}; -from_091(array, L0 = [{T0, _} | _]) -> - {L = [{T1, _} | _], {Monomorphic, _}} = - lists:mapfoldl(fun({T, V}, {Mono0, PrevType}) -> - Mono = case Mono0 of - false -> false; - true -> T =:= PrevType - end, - {from_091(T, V), {Mono, T}} - end, {true, T0}, L0), - case Monomorphic of - true -> {array, T1, L}; - false -> {list, L} - end. + {map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]}. map_add(_T, _Key, _Type, undefined, Acc) -> Acc; @@ -887,24 +874,3 @@ amqp10_section_header(Header, Headers) -> amqp_encoded_binary(Section) -> iolist_to_binary(amqp10_framing:encode_bin(Section)). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - -from_091_array_test() -> - {list, []} = from_091(array, []), - {array, utf8, [{utf8, <<"e1">>}]} = from_091(array, [{longstr, <<"e1">>}]), - {array, utf8, [{utf8, <<"e1">>}, - {utf8, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>}, - {longstr, <<"e2">>}]), - {list, [{utf8, <<"e1">>}, - {binary, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>}, - {binary, <<"e2">>}]), - {list, [{utf8, <<"e1">>}, - {binary, <<"e2">>}, - {utf8, <<"e3">>}, - {utf8, <<"e4">>}]} = from_091(array, [{longstr, <<"e1">>}, - {binary, <<"e2">>}, - {longstr, <<"e3">>}, - {longstr, <<"e4">>}]). --endif. diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 16c84db24706..81e4d88d071d 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2495,21 +2495,27 @@ lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}}, RKeys = [RKey], Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), {ok, X, RKeys, Mc, PermCache}; - {array, utf8, CCs0} -> - CCs = lists:map(fun({utf8, CC}) -> CC end, CCs0), - RKeys = [RKey | CCs], - Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), - {ok, X, RKeys, Mc, PermCache}; + {list, CCs0} = L -> + try lists:map(fun({utf8, CC}) -> CC end, CCs0) of + CCs -> + RKeys = [RKey | CCs], + Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), + {ok, X, RKeys, Mc, PermCache} + catch error:function_clause -> + {error, {anonymous_terminus, AnonTerm}, bad_x_cc(L)} + end; BadValue -> - Desc = unicode:characters_to_binary( - lists:flatten( - io_lib:format( - "bad value for 'x-cc' message-annotation: ~tp", [BadValue]))), - {error, {anonymous_terminus, AnonTerm}, - #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, - description = {utf8, Desc}}} + {error, {anonymous_terminus, AnonTerm}, bad_x_cc(BadValue)} end. +bad_x_cc(Value) -> + Desc = unicode:characters_to_binary( + lists:flatten( + io_lib:format( + "bad value for 'x-cc' message-annotation: ~tp", [Value]))), + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, Desc}}. + process_routing_confirm([], _SenderSettles = true, _, U) -> rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1), {U, []}; diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index fa9b39bdb971..111b7d8b7df0 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1283,7 +1283,7 @@ entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPi <<>> end, RKeys0 = case XHeaders of - #{<<"x-cc">> := {array, utf8, CCs}} -> + #{<<"x-cc">> := {list, CCs}} -> [CC || {utf8, CC} <- CCs]; _ -> [] diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index f80d59b07caa..91fa3abdc687 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -4449,7 +4449,7 @@ trace(Q, QType, Config) -> ok = wait_for_credit(Sender), Msg0 = amqp10_msg:new(<<"tag 1">>, Payload, true), Msg = amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, <<"my CC key">>}]}}, + #{<<"x-cc">> => {list, [{utf8, <<"my CC key">>}]}}, amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0)), ok = amqp10_client:send_msg(Sender, Msg), {ok, _} = amqp10_client:get_msg(Receiver), @@ -5993,7 +5993,7 @@ x_cc_annotation_exchange(Config) -> Payload = <<"my message">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, <<"key 2">>}]}}, + #{<<"x-cc">> => {list, [{utf8, <<"key 2">>}]}}, amqp10_msg:new(<<"tag">>, Payload))), ok = wait_for_accepted(<<"tag">>), ok = amqp10_client:detach_link(Sender), @@ -6028,8 +6028,8 @@ x_cc_annotation_exchange_routing_key_empty(Config) -> Payload = <<"my message">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, <<"key 1">>}, - {utf8, <<"key 2">>}]}}, + #{<<"x-cc">> => {list, [{utf8, <<"key 1">>}, + {utf8, <<"key 2">>}]}}, amqp10_msg:new(<<"tag">>, Payload))), ok = wait_for_accepted(<<"tag">>), ok = amqp10_client:detach_link(Sender), @@ -6063,7 +6063,7 @@ x_cc_annotation_queue(Config) -> Payload = <<"my message">>, ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, QName2}]}}, + #{<<"x-cc">> => {list, [{utf8, QName2}]}}, amqp10_msg:new(<<"tag">>, Payload))), ok = wait_for_accepted(<<"tag">>), ok = amqp10_client:detach_link(Sender), @@ -6097,8 +6097,8 @@ x_cc_annotation_null(Config) -> {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, QAddress2, settled), Msg1 = amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, <<"key-1">>}, - {utf8, <<"key-3">>}]}}, + #{<<"x-cc">> => {list, [{utf8, <<"key-1">>}, + {utf8, <<"key-3">>}]}}, amqp10_msg:set_properties( #{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"🗝️-2"/utf8>>)}, amqp10_msg:new(<<"t1">>, <<"m1">>))), @@ -6110,8 +6110,8 @@ x_cc_annotation_null(Config) -> ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), Msg2 = amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, <<"🗝️-2"/utf8>>}, - {utf8, <<"key-1">>}]}}, + #{<<"x-cc">> => {list, [{utf8, <<"🗝️-2"/utf8>>}, + {utf8, <<"key-1">>}]}}, amqp10_msg:set_properties( #{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>)}, amqp10_msg:new(<<"t2">>, <<"m2">>))), @@ -6123,7 +6123,7 @@ x_cc_annotation_null(Config) -> ?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)), Msg3 = amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, QName1}]}}, + #{<<"x-cc">> => {list, [{utf8, QName1}]}}, amqp10_msg:set_properties( #{to => rabbitmq_amqp_address:queue(QName2)}, amqp10_msg:new(<<"t3">>, <<"m3">>))), @@ -6135,8 +6135,8 @@ x_cc_annotation_null(Config) -> ?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)), Msg4 = amqp10_msg:set_message_annotations( - %% We send a symbol array instead of utf8 array. - #{<<"x-cc">> => {array, symbol, [{symbol, QName1}]}}, + %% We send a symbol instead of utf8.. + #{<<"x-cc">> => {list, [{symbol, QName1}]}}, amqp10_msg:set_properties( #{to => rabbitmq_amqp_address:queue(QName2)}, amqp10_msg:new(<<"t4">>, <<"m4">>))), @@ -6169,23 +6169,41 @@ bad_x_cc_annotation_exchange(Config) -> {ok, Session} = amqp10_client:begin_session(Connection), Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key-1">>), - {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), - ok = wait_for_credit(Sender), - + {ok, Sender1} = amqp10_client:attach_sender_link(Session, <<"sender 1">>, Address), + ok = wait_for_credit(Sender1), ok = amqp10_client:send_msg( - Sender, + Sender1, amqp10_msg:set_message_annotations( - %% We send a list instead of an array. - #{<<"x-cc">> => {list, [{utf8, <<"🗝️-2"/utf8>>}]}}, - amqp10_msg:new(<<"tag">>, <<"msg">>))), - ok = wait_for_settlement(<<"tag">>, released), - receive {amqp10_event, {link, Sender, {detached, Error}}} -> + %% We send an array instead of a list. + #{<<"x-cc">> => {array, utf8, [{utf8, <<"🗝️-2"/utf8>>}]}}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = wait_for_settlement(<<"t1">>, released), + receive {amqp10_event, {link, Sender1, {detached, Error1}}} -> ?assertMatch( #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, description = {utf8, <<"bad value for 'x-cc' message-annotation: " - "{list,[{utf8,<<\"🗝️-2"/utf8, _Rest/binary>>}}, - Error) + "{array,utf8,[{utf8,<<\"🗝️-2"/utf8, _Rest/binary>>}}, + Error1) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Sender2} = amqp10_client:attach_sender_link(Session, <<"sender 2">>, Address), + ok = wait_for_credit(Sender2), + ok = amqp10_client:send_msg( + Sender2, + amqp10_msg:set_message_annotations( + %% We include a non-utf8 type in the list. + #{<<"x-cc">> => {list, [{symbol, <<"key-3">>}]}}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = wait_for_settlement(<<"t2">>, released), + receive {amqp10_event, {link, Sender2, {detached, Error2}}} -> + ?assertEqual( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, <<"bad value for 'x-cc' message-annotation: " + "{list,[{symbol,<<\"key-3\">>}]}">>}}, + Error2) after 5000 -> ct:fail({missing_event, ?LINE}) end, diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 63d49fd5d184..b793cb3abebd 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -174,22 +174,14 @@ end_per_group(Group, Config) -> Config end. -init_per_testcase(T = dead_letter_reject_expire_expire, Config) -> - %% With feature flag message_containers_deaths_v2 disabled, this test is known to fail due to - %% https://github.com/rabbitmq/rabbitmq-server/issues/11159 +init_per_testcase(T, Config) + when T =:= dead_letter_reject_expire_expire orelse + T =:= stream -> + %% With feature flag message_containers_deaths_v2 disabled, test case: + %% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159 + %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2), init_per_testcase0(T, Config); -init_per_testcase(T = stream, Config) -> - %% With feature flag message_containers_deaths_v2 disabled, this test is known to fail due to - %% https://github.com/rabbitmq/rabbitmq-server/issues/11173 - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2), - case rabbit_ct_helpers:is_mixed_versions() of - true -> - {skip, "TODO unskip this test when the lower version got bumped to v4.0.3 " - "which includes https://github.com/rabbitmq/rabbitmq-server/pull/12571"}; - false -> - init_per_testcase0(T, Config) - end; init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 5f9ccc592c68..f8d10462e629 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -189,11 +189,14 @@ amqpl_table_x_header_array_of_tbls(_Config) -> Content = #content{properties = Props, payload_fragments_rev = Payload}, Msg = mc:init(mc_amqpl, Content, annotations()), - ?assertMatch({array, map, - [{map, [{{symbol, <<"type">>}, {utf8, <<"apple">>}}, - {{symbol, <<"count">>}, {long, 99}}]}, - {map, [{{symbol, <<"type">>}, {utf8, <<"orange">>}}, - {{symbol, <<"count">>}, {long, 45}}]}]}, + ?assertMatch({list, + [{map, + [{{symbol, <<"type">>}, {utf8, <<"apple">>}}, + {{symbol, <<"count">>}, {long, 99}}]}, + {map, + [{{symbol, <<"type">>}, {utf8, <<"orange">>}}, + {{symbol, <<"count">>}, {long, 45}}]} + ]}, mc:x_header(<<"x-fruit">>, Msg)). amqpl_death_v1_records(_Config) -> @@ -447,8 +450,8 @@ amqpl_cc_amqp_bin_amqpl(_Config) -> Msg10 = mc:init(mc_amqp, Sections, #{}), ?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>}, <<"x-routing-key">> := {utf8, <<"apple">>}, - <<"x-cc">> := {array, utf8, [{utf8, <<"q1">>}, - {utf8, <<"q2">>}]}}, + <<"x-cc">> := {list, [{utf8, <<"q1">>}, + {utf8, <<"q2">>}]}}, mc:x_headers(Msg10)), %% Here, we simulate what rabbit_stream_queue does: diff --git a/deps/rabbit/test/topic_permission_SUITE.erl b/deps/rabbit/test/topic_permission_SUITE.erl index 8e9866f10b5d..b7c2e10b2421 100644 --- a/deps/rabbit/test/topic_permission_SUITE.erl +++ b/deps/rabbit/test/topic_permission_SUITE.erl @@ -82,7 +82,7 @@ amqp_x_cc_annotation(Config) -> %% We have permissions to send to both topics. %% Therefore, m1 should be sent to both queues. ok = amqp10_client:send_msg(Sender1, amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, <<"a.2">>}]}}, + #{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}}, amqp10_msg:new(<<"t1">>, <<"m1">>, true))), {ok, Msg1} = amqp10_client:get_msg(Receiver1), {ok, Msg2} = amqp10_client:get_msg(Receiver2), @@ -99,7 +99,7 @@ amqp_x_cc_annotation(Config) -> rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"x.1">>)), ok = amqp_utils:wait_for_credit(Sender2), ok = amqp10_client:send_msg(Sender2, amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, <<"a.2">>}]}}, + #{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}}, amqp10_msg:new(<<"t2">>, <<"m2">>, true))), receive {amqp10_event, @@ -122,7 +122,7 @@ amqp_x_cc_annotation(Config) -> rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)), ok = amqp_utils:wait_for_credit(Sender3), ok = amqp10_client:send_msg(Sender3, amqp10_msg:set_message_annotations( - #{<<"x-cc">> => {array, utf8, [{utf8, <<"x.2">>}]}}, + #{<<"x-cc">> => {list, [{utf8, <<"x.2">>}]}}, amqp10_msg:new(<<"t3">>, <<"m3">>, true))), receive {amqp10_event, diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index f3640cd5bb91..b4fe0f8b56cc 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -22,7 +22,7 @@ Each metric is labelled by protocol (AMQP 1.0, AMQP 0.9.1, MQTT 5.0, MQTT 3.1.1, ### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation [PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation. -This annotation allows publishers to specify an [array](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-array) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. +This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. ## Potential incompatibilities