diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 8ce54e6f584b..76be5953a6c3 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -862,6 +862,12 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "topic_permission_SUITE", size = "medium", + additional_beam = [ + ":test_amqp_utils_beam", + ], + runtime_deps = [ + "//deps/rabbitmq_amqp_client:erlang_app", + ], ) rabbitmq_integration_suite( diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index dca277a2ab00..9d6f7fab563f 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -1559,7 +1559,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/topic_permission_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"], ) erlang_bytecode( name = "transactions_SUITE_beam_files", diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 3352f26185de..b3c51dca3976 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -26,6 +26,7 @@ priority/1, set_ttl/2, x_header/2, + x_headers/1, routing_headers/2, exchange/1, routing_keys/1, @@ -88,6 +89,7 @@ {timestamp, non_neg_integer()} | {list, [tagged_value()]} | {map, [{tagged_value(), tagged_value()}]} | + {array, atom(), [tagged_value()]} | null | undefined. @@ -104,11 +106,16 @@ {MetadataSize :: non_neg_integer(), PayloadSize :: non_neg_integer()}. -%% retrieve and x- header from the protocol data +%% retrieve an x- header from the protocol data %% the return value should be tagged with an AMQP 1.0 type -callback x_header(binary(), proto_state()) -> tagged_value(). +%% retrieve x- headers from the protocol data +%% the return values should be tagged with an AMQP 1.0 type +-callback x_headers(proto_state()) -> + #{binary() => tagged_value()}. + %% retrieve a property field from the protocol data %% e.g. message_id, correlation_id -callback property(atom(), proto_state()) -> @@ -148,7 +155,7 @@ init(Proto, Data, Anns) -> -spec init(protocol(), term(), annotations(), environment()) -> state(). init(Proto, Data, Anns0, Env) -> {ProtoData, ProtoAnns} = Proto:init(Data), - Anns1 = case map_size(Env) == 0 of + Anns1 = case map_size(Env) =:= 0 of true -> Anns0; false -> Anns0#{env => Env} end, @@ -214,6 +221,25 @@ x_header(Key, #?MODULE{protocol = Proto, x_header(Key, BasicMsg) -> mc_compat:x_header(Key, BasicMsg). +-spec x_headers(state()) -> + #{binary() => tagged_value()}. +x_headers(#?MODULE{protocol = Proto, + annotations = Anns, + data = Data}) -> + %% x-headers may be have been added to the annotations map. + New = maps:filtermap( + fun(Key, Val) -> + case mc_util:is_x_header(Key) of + true -> + {true, mc_util:infer_type(Val)}; + false -> + false + end + end, Anns), + maps:merge(Proto:x_headers(Data), New); +x_headers(BasicMsg) -> + mc_compat:x_headers(BasicMsg). + -spec routing_headers(state(), [x_headers | complex_types]) -> #{binary() => property_value()}. routing_headers(#?MODULE{protocol = Proto, diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index ed6c4b4145d6..06a923763da9 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -8,6 +8,7 @@ init/1, size/1, x_header/2, + x_headers/1, property/2, routing_headers/2, convert_to/3, @@ -125,6 +126,9 @@ size(#v1{message_annotations = MA, x_header(Key, Msg) -> message_annotation(Key, Msg, undefined). +x_headers(Msg) -> + #{K => V || {{_T, K}, V} <- message_annotations(Msg)}. + property(_Prop, #msg_body_encoded{properties = undefined}) -> undefined; property(Prop, #msg_body_encoded{properties = Props}) -> @@ -618,41 +622,16 @@ encode_deaths(Deaths) -> {map, Map} end, Deaths). -essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) -> +essential_properties(Msg) -> Durable = get_property(durable, Msg), Priority = get_property(priority, Msg), Timestamp = get_property(timestamp, Msg), Ttl = get_property(ttl, Msg), - Anns0 = #{?ANN_DURABLE => Durable}, - Anns = maps_put_truthy( - ?ANN_PRIORITY, Priority, - maps_put_truthy( - ?ANN_TIMESTAMP, Timestamp, - maps_put_truthy( - ttl, Ttl, - Anns0))), - case MA of - [] -> - Anns; - _ -> - lists:foldl( - fun ({{symbol, <<"x-routing-key">>}, - {utf8, Key}}, Acc) -> - maps:update_with(?ANN_ROUTING_KEYS, - fun(L) -> [Key | L] end, - [Key], - Acc); - ({{symbol, <<"x-cc">>}, - {list, CCs0}}, Acc) -> - CCs = [CC || {_T, CC} <- CCs0], - maps:update_with(?ANN_ROUTING_KEYS, - fun(L) -> L ++ CCs end, - CCs, - Acc); - ({{symbol, <<"x-exchange">>}, - {utf8, Exchange}}, Acc) -> - Acc#{?ANN_EXCHANGE => Exchange}; - (_, Acc) -> - Acc - end, Anns, MA) - end. + Anns = #{?ANN_DURABLE => Durable}, + maps_put_truthy( + ?ANN_PRIORITY, Priority, + maps_put_truthy( + ?ANN_TIMESTAMP, Timestamp, + maps_put_truthy( + ttl, Ttl, + Anns))). diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 723e60cd3f79..936a1b130d89 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -11,6 +11,7 @@ init/1, size/1, x_header/2, + x_headers/1, routing_headers/2, convert_to/3, convert_from/3, @@ -273,6 +274,23 @@ x_header(Key, #content{properties = none} = Content0) -> Content = rabbit_binary_parser:ensure_content_decoded(Content0), x_header(Key, Content). +x_headers(#content{properties = #'P_basic'{headers = undefined}}) -> + #{}; +x_headers(#content{properties = #'P_basic'{headers = Headers}}) -> + L = lists:filtermap( + fun({Name, Type, Val}) -> + case mc_util:is_x_header(Name) of + true -> + {true, {Name, from_091(Type, Val)}}; + false -> + false + end + end, Headers), + maps:from_list(L); +x_headers(#content{properties = none} = Content0) -> + Content = rabbit_binary_parser:ensure_content_decoded(Content0), + x_headers(Content). + property(Prop, Content) -> mc_util:infer_type(mc_compat:get_property(Prop, Content)). @@ -707,7 +725,6 @@ supported_header_value_type(table) -> supported_header_value_type(_) -> true. - amqp10_map_get(_K, []) -> undefined; amqp10_map_get(K, Tuples) -> diff --git a/deps/rabbit/src/mc_compat.erl b/deps/rabbit/src/mc_compat.erl index 056905239d96..5fce91b202a4 100644 --- a/deps/rabbit/src/mc_compat.erl +++ b/deps/rabbit/src/mc_compat.erl @@ -20,6 +20,7 @@ priority/1, set_ttl/2, x_header/2, + x_headers/1, routing_headers/2, %%% convert_to/2, @@ -138,6 +139,9 @@ set_ttl(Value, #basic_message{content = Content0} = Msg) -> x_header(Key,#basic_message{content = Content}) -> mc_amqpl:x_header(Key, Content). +x_headers(#basic_message{content = Content}) -> + mc_amqpl:x_headers(Content). + routing_headers(#basic_message{content = Content}, Opts) -> mc_amqpl:routing_headers(Content, Opts). diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 1f20d15699db..9ec7928de9b7 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -61,7 +61,7 @@ utf8_string_is_ascii(UTF8String) -> amqp_map_get(Key, {map, List}, Default) -> amqp_map_get(Key, List, Default); amqp_map_get(Key, List, Default) when is_list(List) -> - case lists:search(fun ({{_, K}, _}) -> K == Key end, List) of + case lists:search(fun ({{_, K}, _}) -> K =:= Key end, List) of {value, {_K, V}} -> V; false -> diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index c9d505647eb5..81e4d88d071d 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -154,6 +154,7 @@ %% The routing key is either defined in the ATTACH frame and static for %% the life time of the link or dynamically provided in each message's %% "to" field (address v2) or "subject" field (address v1). + %% (A publisher can set additional routing keys via the x-cc message annotation.) routing_key :: rabbit_types:routing_key() | to | subject, %% queue_name_bin is only set if the link target address refers to a queue. queue_name_bin :: undefined | rabbit_misc:resource_name(), @@ -2369,11 +2370,11 @@ incoming_link_transfer( Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of - {ok, X, RoutingKey, Mc1, PermCache} -> + {ok, X, RoutingKeys, Mc1, PermCache} -> Mc2 = rabbit_message_interceptor:intercept(Mc1), check_user_id(Mc2, User), - TopicPermCache = check_write_permitted_on_topic( - X, User, RoutingKey, TopicPermCache0), + TopicPermCache = check_write_permitted_on_topics( + X, User, RoutingKeys, TopicPermCache0), QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace), Opts = #{correlation => {HandleInt, DeliveryId}}, @@ -2408,14 +2409,14 @@ incoming_link_transfer( "delivery_tag=~p, delivery_id=~p, reason=~p", [DeliveryTag, DeliveryId, Reason]) end; - {error, #'v1_0.error'{} = Err} -> + {error, {anonymous_terminus, false}, #'v1_0.error'{} = Err} -> Disposition = case Settled of true -> []; false -> [released(DeliveryId)] end, Detach = [detach(HandleInt, Link0, Err)], {error, Disposition ++ Detach}; - {error, anonymous_terminus, #'v1_0.error'{} = Err} -> + {error, {anonymous_terminus, true}, #'v1_0.error'{} = Err} -> %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors case Settled of true -> @@ -2440,13 +2441,13 @@ incoming_link_transfer( end. lookup_target(#exchange{} = X, LinkRKey, Mc, _, _, PermCache) -> - lookup_routing_key(X, LinkRKey, Mc, PermCache); + lookup_routing_key(X, LinkRKey, Mc, false, PermCache); lookup_target(#resource{} = XName, LinkRKey, Mc, _, _, PermCache) -> case rabbit_exchange:lookup(XName) of {ok, X} -> - lookup_routing_key(X, LinkRKey, Mc, PermCache); + lookup_routing_key(X, LinkRKey, Mc, false, PermCache); {error, not_found} -> - {error, error_not_found(XName)} + {error, {anonymous_terminus, false}, error_not_found(XName)} end; lookup_target(to, to, Mc, Vhost, User, PermCache0) -> case mc:property(to, Mc) of @@ -2458,25 +2459,26 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) -> case rabbit_exchange:lookup(XName) of {ok, X} -> check_internal_exchange(X), - lookup_routing_key(X, RKey, Mc, PermCache); + lookup_routing_key(X, RKey, Mc, true, PermCache); {error, not_found} -> - {error, anonymous_terminus, error_not_found(XName)} + {error, {anonymous_terminus, true}, error_not_found(XName)} end; {error, bad_address} -> - {error, anonymous_terminus, + {error, {anonymous_terminus, true}, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, description = {utf8, <<"bad 'to' address string: ", String/binary>>}}} end; undefined -> - {error, anonymous_terminus, + {error, {anonymous_terminus, true}, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}} end. lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}}, - RKey0, Mc0, PermCache) -> + RKey0, Mc0, AnonTerm, PermCache) -> + Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0), RKey = case RKey0 of subject -> case mc:property(subject, Mc0) of @@ -2488,9 +2490,31 @@ lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}}, _ when is_binary(RKey0) -> RKey0 end, - Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0), - Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc1), - {ok, X, RKey, Mc, PermCache}. + case mc:x_header(<<"x-cc">>, Mc0) of + undefined -> + RKeys = [RKey], + Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), + {ok, X, RKeys, Mc, PermCache}; + {list, CCs0} = L -> + try lists:map(fun({utf8, CC}) -> CC end, CCs0) of + CCs -> + RKeys = [RKey | CCs], + Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), + {ok, X, RKeys, Mc, PermCache} + catch error:function_clause -> + {error, {anonymous_terminus, AnonTerm}, bad_x_cc(L)} + end; + BadValue -> + {error, {anonymous_terminus, AnonTerm}, bad_x_cc(BadValue)} + end. + +bad_x_cc(Value) -> + Desc = unicode:characters_to_binary( + lists:flatten( + io_lib:format( + "bad value for 'x-cc' message-annotation: ~tp", [Value]))), + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, Desc}}. process_routing_confirm([], _SenderSettles = true, _, U) -> rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1), @@ -3445,14 +3469,20 @@ check_resource_access(Resource, Perm, User, Cache) -> end end. --spec check_write_permitted_on_topic( +-spec check_write_permitted_on_topics( rabbit_types:exchange(), rabbit_types:user(), - rabbit_types:routing_key(), + [rabbit_types:routing_key(),...], topic_permission_cache()) -> topic_permission_cache(). -check_write_permitted_on_topic(Resource, User, RoutingKey, TopicPermCache) -> - check_topic_authorisation(Resource, User, RoutingKey, write, TopicPermCache). +check_write_permitted_on_topics(#exchange{type = topic} = Resource, + User, RoutingKeys, TopicPermCache) -> + lists:foldl( + fun(RoutingKey, Cache) -> + check_topic_authorisation(Resource, User, RoutingKey, write, Cache) + end, TopicPermCache, RoutingKeys); +check_write_permitted_on_topics(_, _, _, TopicPermCache) -> + TopicPermCache. -spec check_read_permitted_on_topic( rabbit_types:exchange(), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a011dc09a650..111b7d8b7df0 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1272,17 +1272,36 @@ parse_uncompressed_subbatch( entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) -> Mc0 = mc:init(mc_amqp, Entry, #{}), - %% If exchange or routing_keys annotation isn't present the entry most likely came + %% If exchange or routing keys annotation isn't present the entry most likely came %% from the rabbitmq-stream plugin so we'll choose defaults that simulate use %% of the direct exchange. - Mc1 = case mc:exchange(Mc0) of - undefined -> mc:set_annotation(?ANN_EXCHANGE, <<>>, Mc0); - _ -> Mc0 - end, - Mc2 = case mc:routing_keys(Mc1) of - [] -> mc:set_annotation(?ANN_ROUTING_KEYS, [QName], Mc1); - _ -> Mc1 - end, + XHeaders = mc:x_headers(Mc0), + Exchange = case XHeaders of + #{<<"x-exchange">> := {utf8, X}} -> + X; + _ -> + <<>> + end, + RKeys0 = case XHeaders of + #{<<"x-cc">> := {list, CCs}} -> + [CC || {utf8, CC} <- CCs]; + _ -> + [] + end, + RKeys1 = case XHeaders of + #{<<"x-routing-key">> := {utf8, RK}} -> + [RK | RKeys0]; + _ -> + RKeys0 + end, + RKeys = case RKeys1 of + [] -> + [QName]; + _ -> + RKeys1 + end, + Mc1 = mc:set_annotation(?ANN_EXCHANGE, Exchange, Mc0), + Mc2 = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2), case rabbit_amqp_filtex:filter(Filter, Mc) of true -> diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index f5a0f74b8932..607aa11473aa 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -304,10 +304,9 @@ target_per_message_exchange_routing_key(Config) -> Tag1 = Body1 = <<1>>, Tag2 = Body2 = <<2>>, - %% Although mc_amqp:essential_properties/1 parses these annotations, they should be ignored. + %% Although mc_amqp:essential_properties/1 parses the x-exchange annotation, it should be ignored. Msg1 = amqp10_msg:set_message_annotations( - #{<<"x-exchange">> => <<"ignored">>, - <<"x-routing-key">> => <<"ignored">>}, + #{<<"x-exchange">> => <<"ignored">>}, amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1))), Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2)), ok = amqp10_client:send_msg(Sender, Msg1), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index f192a0c309f8..91fa3abdc687 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -116,7 +116,8 @@ groups() -> available_messages_quorum_queue, available_messages_stream, incoming_message_interceptors, - trace, + trace_classic_queue, + trace_stream, user_id, message_ttl, plugin, @@ -156,7 +157,12 @@ groups() -> tcp_back_pressure_rabbitmq_internal_flow_quorum_queue, session_max_per_connection, link_max_per_session, - reserved_annotation + reserved_annotation, + x_cc_annotation_exchange, + x_cc_annotation_exchange_routing_key_empty, + x_cc_annotation_queue, + x_cc_annotation_null, + bad_x_cc_annotation_exchange ]}, {cluster_size_3, [shuffle], @@ -4393,16 +4399,26 @@ incoming_message_interceptors(Config) -> ok = amqp10_client:close_connection(Connection), true = rpc(Config, persistent_term, erase, [Key]). -trace(Config) -> +trace_classic_queue(Config) -> + trace(atom_to_binary(?FUNCTION_NAME), <<"classic">>, Config). + +trace_stream(Config) -> + trace(atom_to_binary(?FUNCTION_NAME), <<"stream">>, Config). + +trace(Q, QType, Config) -> Node = atom_to_binary(get_node_config(Config, 0, nodename)), TraceQ = <<"my trace queue">>, - Q = <<"my queue">>, Qs = [Q, TraceQ], RoutingKey = <<"my routing key">>, Payload = <<"my payload">>, CorrelationId = <<"my correlation 👀"/utf8>>, Ch = rabbit_ct_client_helpers:open_channel(Config), - [#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q0}) || Q0 <- Qs], + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{ + queue = Q, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, QType}]}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = TraceQ}), #'queue.bind_ok'{} = amqp_channel:call( Ch, #'queue.bind'{queue = TraceQ, exchange = <<"amq.rabbitmq.trace">>, @@ -4420,16 +4436,21 @@ trace(Config) -> {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]), {ok, SessionReceiver} = amqp10_client:begin_session_sync(Connection), + {ok, Receiver} = amqp10_client:attach_receiver_link(SessionReceiver, + <<"test-receiver">>, + rabbitmq_amqp_address:queue(Q)), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, {ok, Sender} = amqp10_client:attach_sender_link( SessionSender, <<"test-sender">>, rabbitmq_amqp_address:exchange(<<"amq.direct">>, RoutingKey)), ok = wait_for_credit(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link(SessionReceiver, - <<"test-receiver">>, - rabbitmq_amqp_address:queue(Q)), Msg0 = amqp10_msg:new(<<"tag 1">>, Payload, true), - Msg = amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0), + Msg = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"my CC key">>}]}}, + amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0)), ok = amqp10_client:send_msg(Sender, Msg), {ok, _} = amqp10_client:get_msg(Receiver), @@ -4439,7 +4460,7 @@ trace(Config) -> payload = Payload}} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}), ?assertMatch(#{<<"exchange_name">> := <<"amq.direct">>, - <<"routing_keys">> := [RoutingKey], + <<"routing_keys">> := [RoutingKey, <<"my CC key">>], <<"connection">> := <<"127.0.0.1:", _/binary>>, <<"node">> := Node, <<"vhost">> := <<"/">>, @@ -4454,7 +4475,7 @@ trace(Config) -> payload = Payload}} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}), ?assertMatch(#{<<"exchange_name">> := <<"amq.direct">>, - <<"routing_keys">> := [RoutingKey], + <<"routing_keys">> := [RoutingKey, <<"my CC key">>], <<"connection">> := <<"127.0.0.1:", _/binary>>, <<"node">> := Node, <<"vhost">> := <<"/">>, @@ -5956,6 +5977,239 @@ reserved_annotation(Config) -> end, ok = close_connection_sync(Connection). +%% Test that x-cc routing keys work together with target address +%% /exchanges/:exchange/:routing-key +x_cc_annotation_exchange(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}), + Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key 1">>), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + Payload = <<"my message">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"key 2">>}]}}, + amqp10_msg:new(<<"tag">>, Payload))), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([Payload], amqp10_msg:body(Msg1)), + ?assertEqual([Payload], amqp10_msg:body(Msg2)), + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that x-cc routing keys work together with target address +%% /exchanges/:exchange +x_cc_annotation_exchange_routing_key_empty(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}), + AddressEmptyRoutingKey = rabbitmq_amqp_address:exchange(<<"amq.direct">>), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, AddressEmptyRoutingKey), + ok = wait_for_credit(Sender), + + Payload = <<"my message">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"key 1">>}, + {utf8, <<"key 2">>}]}}, + amqp10_msg:new(<<"tag">>, Payload))), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([Payload], amqp10_msg:body(Msg1)), + ?assertEqual([Payload], amqp10_msg:body(Msg2)), + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that x-cc routing keys work together with target address +%% /queues/:queue +x_cc_annotation_queue(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + Address1 = rabbitmq_amqp_address:queue(QName1), + Address2 = rabbitmq_amqp_address:queue(QName2), + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address1), + ok = wait_for_credit(Sender), + + Payload = <<"my message">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, QName2}]}}, + amqp10_msg:new(<<"tag">>, Payload))), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, Address1, settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, Address2, settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([Payload], amqp10_msg:body(Msg1)), + ?assertEqual([Payload], amqp10_msg:body(Msg2)), + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that x-cc routing keys work together with target address 'null' +x_cc_annotation_null(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + QAddress1 = rabbitmq_amqp_address:queue(QName1), + QAddress2 = rabbitmq_amqp_address:queue(QName2), + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key-1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"🗝️-2"/utf8>>, #{}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), + ok = wait_for_credit(Sender), + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, QAddress1, settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, QAddress2, settled), + + Msg1 = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"key-1">>}, + {utf8, <<"key-3">>}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"🗝️-2"/utf8>>)}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = wait_for_accepted(<<"t1">>), + {ok, R1M1} = amqp10_client:get_msg(Receiver1), + {ok, R2M1} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), + + Msg2 = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"🗝️-2"/utf8>>}, + {utf8, <<"key-1">>}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>)}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg(Sender, Msg2), + ok = wait_for_accepted(<<"t2">>), + {ok, R1M2} = amqp10_client:get_msg(Receiver1), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m2">>], amqp10_msg:body(R1M2)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)), + + Msg3 = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, QName1}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName2)}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + ok = amqp10_client:send_msg(Sender, Msg3), + ok = wait_for_accepted(<<"t3">>), + {ok, R1M3} = amqp10_client:get_msg(Receiver1), + {ok, R2M3} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)), + + Msg4 = amqp10_msg:set_message_annotations( + %% We send a symbol instead of utf8.. + #{<<"x-cc">> => {list, [{symbol, QName1}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName2)}, + amqp10_msg:new(<<"t4">>, <<"m4">>))), + ok = amqp10_client:send_msg(Sender, Msg4), + %% "If the source of the link supports the rejected outcome, and the message has not + %% already been settled by the sender, then the routing node MUST reject the message. + %% In this case the error field of rejected MUST contain the error which would have been communicated + %% in the detach which would have be sent if a link to the same address had been attempted." + %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors + receive {amqp10_disposition, {{rejected, Error}, <<"t4">>}} -> + ?assertMatch( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, <<"bad value for 'x-cc' message-annotation:", _/binary>>}}, + Error) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver1), + ok = amqp10_client:detach_link(Receiver2), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +bad_x_cc_annotation_exchange(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + + Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key-1">>), + {ok, Sender1} = amqp10_client:attach_sender_link(Session, <<"sender 1">>, Address), + ok = wait_for_credit(Sender1), + ok = amqp10_client:send_msg( + Sender1, + amqp10_msg:set_message_annotations( + %% We send an array instead of a list. + #{<<"x-cc">> => {array, utf8, [{utf8, <<"🗝️-2"/utf8>>}]}}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = wait_for_settlement(<<"t1">>, released), + receive {amqp10_event, {link, Sender1, {detached, Error1}}} -> + ?assertMatch( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, <<"bad value for 'x-cc' message-annotation: " + "{array,utf8,[{utf8,<<\"🗝️-2"/utf8, _Rest/binary>>}}, + Error1) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Sender2} = amqp10_client:attach_sender_link(Session, <<"sender 2">>, Address), + ok = wait_for_credit(Sender2), + ok = amqp10_client:send_msg( + Sender2, + amqp10_msg:set_message_annotations( + %% We include a non-utf8 type in the list. + #{<<"x-cc">> => {list, [{symbol, <<"key-3">>}]}}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = wait_for_settlement(<<"t2">>, released), + receive {amqp10_event, {link, Sender2, {detached, Error2}}} -> + ?assertEqual( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, <<"bad value for 'x-cc' message-annotation: " + "{list,[{symbol,<<\"key-3\">>}]}">>}}, + Error2) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + %% internal %% diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 6d0ad63b13d8..b793cb3abebd 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -177,15 +177,11 @@ end_per_group(Group, Config) -> init_per_testcase(T, Config) when T =:= dead_letter_reject_expire_expire orelse T =:= stream -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of - ok -> - init_per_testcase0(T, Config); - {skip, _} = Skip -> - %% With feature flag message_containers_deaths_v2 disabled, test case: - %% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159 - %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 - Skip - end; + %% With feature flag message_containers_deaths_v2 disabled, test case: + %% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159 + %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2), + init_per_testcase0(T, Config); init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -1860,6 +1856,10 @@ stream(Config) -> {timestamp, T2} = rabbit_misc:table_lookup(Death2, <<"time">>), ?assert(T1 < T2), + ?assertEqual({array, [{longstr, <<"cc 1">>}, + {longstr, <<"cc 2">>}]}, + rabbit_misc:table_lookup(Headers, <<"CC">>)), + ok = rabbit_ct_client_helpers:close_channel(Ch0), ok = rabbit_ct_client_helpers:close_channel(Ch1). diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index acc9ea69adfe..f8d10462e629 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -42,7 +42,9 @@ all_tests() -> amqp_amqpl_message_id_binary, amqp_amqpl_unsupported_values_not_converted, amqp_to_amqpl_data_body, - amqp_amqpl_amqp_bodies + amqp_amqpl_amqp_bodies, + amqp_x_headers, + amqpl_x_headers ]. %%%=================================================================== @@ -195,10 +197,7 @@ amqpl_table_x_header_array_of_tbls(_Config) -> [{{symbol, <<"type">>}, {utf8, <<"orange">>}}, {{symbol, <<"count">>}, {long, 45}}]} ]}, - mc:x_header(<<"x-fruit">>, Msg)), - - - ok. + mc:x_header(<<"x-fruit">>, Msg)). amqpl_death_v1_records(_Config) -> ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}). @@ -364,8 +363,9 @@ amqpl_amqp_bin_amqpl(_Config) -> Msg10Pre = mc:convert(mc_amqp, Msg), Payload = iolist_to_binary(mc:protocol_state(Msg10Pre)), Msg10 = mc:init(mc_amqp, Payload, #{}), - ?assertEqual(<<"exch">>, mc:exchange(Msg10)), - ?assertEqual([<<"apple">>], mc:routing_keys(Msg10)), + ?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>}, + <<"x-routing-key">> := {utf8, <<"apple">>}}, + mc:x_headers(Msg10)), ?assertEqual(98, mc:priority(Msg10)), ?assertEqual(true, mc:is_persistent(Msg10)), ?assertEqual(99000, mc:timestamp(Msg10)), @@ -422,8 +422,6 @@ amqpl_amqp_bin_amqpl(_Config) -> MsgL2 = mc:convert(mc_amqpl, Msg10), - ?assertEqual(<<"exch">>, mc:exchange(MsgL2)), - ?assertEqual([<<"apple">>], mc:routing_keys(MsgL2)), ?assertEqual(98, mc:priority(MsgL2)), ?assertEqual(true, mc:is_persistent(MsgL2)), ?assertEqual(99000, mc:timestamp(MsgL2)), @@ -450,9 +448,17 @@ amqpl_cc_amqp_bin_amqpl(_Config) -> Msg10Pre = mc:convert(mc_amqp, Msg), Sections = iolist_to_binary(mc:protocol_state(Msg10Pre)), Msg10 = mc:init(mc_amqp, Sections, #{}), - ?assertEqual(RoutingKeys, mc:routing_keys(Msg10)), + ?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>}, + <<"x-routing-key">> := {utf8, <<"apple">>}, + <<"x-cc">> := {list, [{utf8, <<"q1">>}, + {utf8, <<"q2">>}]}}, + mc:x_headers(Msg10)), - MsgL2 = mc:convert(mc_amqpl, Msg10), + %% Here, we simulate what rabbit_stream_queue does: + Msg10b = mc:set_annotation(?ANN_EXCHANGE, <<"exch">>, Msg10), + Msg10c = mc:set_annotation(?ANN_ROUTING_KEYS, [<<"apple">>, <<"q1">>, <<"q2">>], Msg10b), + + MsgL2 = mc:convert(mc_amqpl, Msg10c), ?assertEqual(RoutingKeys, mc:routing_keys(MsgL2)), ?assertMatch(#content{properties = #'P_basic'{headers = Headers}}, mc:protocol_state(MsgL2)). @@ -751,6 +757,52 @@ amqp_amqpl_amqp_bodies(_Config) -> end || Body <- Bodies], ok. +amqp_x_headers(_Config) -> + MAC = [ + {{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}}, + thead2('x-list', list, [utf8(<<"l">>)]), + thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]) + ], + M = #'v1_0.message_annotations'{content = MAC}, + AC = [thead(long, 5)], + A = #'v1_0.application_properties'{content = AC}, + D = #'v1_0.data'{content = <<"data">>}, + + Payload = serialize_sections([M, A, D]), + Msg0 = mc:init(mc_amqp, Payload, annotations()), + Msg1 = mc:set_annotation(<<"x-1">>, {byte, -2}, Msg0), + ?assertEqual(#{<<"x-1">> => {byte, -2}, + <<"x-list">> => {list,[{utf8,<<"l">>}]}, + <<"x-map">> => {map,[{{utf8,<<"k">>},{utf8,<<"v">>}}]}, + <<"x-stream-filter">> => {utf8,<<"apple">>}}, + mc:x_headers(Msg1)). + +amqpl_x_headers(_Config) -> + Props = #'P_basic'{headers = [{<<"a-string">>, longstr, <<"a string">>}, + {<<"x-1">>, binary, <<"v1">>}, + {<<"x-stream-filter">>, longstr, <<"apple">>}]}, + Payload = [<<"data">>], + Content = #content{properties = Props, + payload_fragments_rev = Payload}, + + Msg0 = mc:init(mc_amqpl, Content, annotations()), + Msg1 = mc:set_annotation(delivery_count, 1, Msg0), + Msg = mc:set_annotation(<<"x-delivery-count">>, 2, Msg1), + ?assertEqual(#{<<"x-1">> => {binary, <<"v1">>}, + <<"x-stream-filter">> => {utf8,<<"apple">>}, + <<"x-delivery-count">> => {long, 2}}, + mc:x_headers(Msg)), + + XName = <<"exch">>, + RoutingKey = <<"apple">>, + {ok, BasicMsg0} = rabbit_basic:message_no_id(XName, RoutingKey, Content), + BasicMsg1 = mc:set_annotation(delivery_count, 1, BasicMsg0), + BasicMsg = mc:set_annotation(<<"x-delivery-count">>, 2, BasicMsg1), + ?assertEqual(#{<<"x-1">> => {binary, <<"v1">>}, + <<"x-stream-filter">> => {utf8,<<"apple">>}, + <<"x-delivery-count">> => {long, 2}}, + mc:x_headers(BasicMsg)). + %% Utility amqp10_encode_bin(L) when is_list(L) -> diff --git a/deps/rabbit/test/topic_permission_SUITE.erl b/deps/rabbit/test/topic_permission_SUITE.erl index 2849b76fd3b9..b7c2e10b2421 100644 --- a/deps/rabbit/test/topic_permission_SUITE.erl +++ b/deps/rabbit/test/topic_permission_SUITE.erl @@ -8,6 +8,7 @@ -module(topic_permission_SUITE). -include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -compile([export_all, nowarn_export_all]). @@ -21,6 +22,7 @@ groups() -> [ {sequential_tests, [], [ + amqp_x_cc_annotation, amqpl_cc_headers, amqpl_bcc_headers, topic_permission_database_access, @@ -29,6 +31,7 @@ groups() -> ]. init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:set_config( Config, @@ -56,6 +59,91 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). +amqp_x_cc_annotation(Config) -> + ok = set_topic_permissions(Config, "^a", ".*"), + + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + {Connection, Session1, LinkPair} = amqp_utils:init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.topic">>, <<"a.1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.topic">>, <<"a.2">>, #{}), + + {ok, Sender1} = amqp10_client:attach_sender_link( + Session1, + <<"sender 1">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)), + ok = amqp_utils:wait_for_credit(Sender1), + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session1, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session1, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled), + %% We have permissions to send to both topics. + %% Therefore, m1 should be sent to both queues. + ok = amqp10_client:send_msg(Sender1, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}}, + amqp10_msg:new(<<"t1">>, <<"m1">>, true))), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg2)), + ok = amqp_utils:detach_link_sync(Sender1), + ok = amqp_utils:detach_link_sync(Receiver1), + ok = amqp_utils:detach_link_sync(Receiver2), + + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + {ok, Sender2} = amqp10_client:attach_sender_link( + Session2, + <<"sender 2">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"x.1">>)), + ok = amqp_utils:wait_for_credit(Sender2), + ok = amqp10_client:send_msg(Sender2, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}}, + amqp10_msg:new(<<"t2">>, <<"m2">>, true))), + receive + {amqp10_event, + {session, Session2, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description1}}}}} -> + ?assertEqual( + <<"write access to topic 'x.1' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, + Description1) + after 5000 -> amqp_utils:flush(missing_ended), + ct:fail({missing_event, ?LINE}) + end, + + {ok, Session3} = amqp10_client:begin_session_sync(Connection), + {ok, Sender3} = amqp10_client:attach_sender_link( + Session3, + <<"sender 3">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)), + ok = amqp_utils:wait_for_credit(Sender3), + ok = amqp10_client:send_msg(Sender3, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"x.2">>}]}}, + amqp10_msg:new(<<"t3">>, <<"m3">>, true))), + receive + {amqp10_event, + {session, Session3, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description2}}}}} -> + ?assertEqual( + <<"write access to topic 'x.2' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, + Description2) + after 5000 -> amqp_utils:flush(missing_ended), + ct:fail({missing_event, ?LINE}) + end, + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = amqp_utils:end_session_sync(Session1), + ok = amqp10_client:close_connection(Connection), + ok = clear_topic_permissions(Config). + amqpl_cc_headers(Config) -> amqpl_headers(<<"CC">>, Config). diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index b6cae214c8c3..656b44dd8b7b 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -14,6 +14,7 @@ init/1, size/1, x_header/2, + x_headers/1, property/2, routing_headers/2, convert_to/3, @@ -390,6 +391,11 @@ x_header(Key, #mqtt_msg{props = #{'User-Property' := UserProp}}) -> x_header(_Key, #mqtt_msg{}) -> undefined. +x_headers(#mqtt_msg{props = #{'User-Property' := UserProp}}) -> + #{Key => {utf8, Val} || {<<"x-", _/binary>> = Key, Val} <- UserProp}; +x_headers(#mqtt_msg{}) -> + #{}. + property(correlation_id, #mqtt_msg{props = #{'Correlation-Data' := Corr}}) -> case mc_util:urn_string_to_uuid(Corr) of {ok, UUId} -> diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index 14d88f357602..c6d1308e9ad2 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -61,6 +61,10 @@ roundtrip_amqp(_Config) -> PayloadSize = 10, ExpectedSize = {MetaDataSize, PayloadSize}, ?assertEqual(ExpectedSize, mc:size(Mc0)), + ?assertEqual(#{<<"x-key-1">> => {utf8, <<"val-1">>}, + <<"x-key-2">> => {utf8, <<"val-2">>}, + <<"x-key-3">> => {utf8, <<"val-3">>}}, + mc:x_headers(Mc0)), Env = #{}, ?assertEqual(Msg, mc_mqtt:convert_to(mc_mqtt, Msg, Env)), @@ -310,6 +314,7 @@ mqtt_amqpl_alt(_Config) -> }, Anns = #{?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Msg#mqtt_msg.topic)]}, Mc = mc:init(mc_mqtt, Msg, Anns), + ?assertEqual(#{}, mc:x_headers(Mc)), MsgL = mc:convert(mc_amqpl, Mc), #content{properties = #'P_basic'{headers = HL} = Props} = diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index ca80cfa59630..b4fe0f8b56cc 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -18,6 +18,12 @@ This feature: This feature allows operators to gain insights into the message sizes being published to RabbitMQ, such as average message size, number of messages per pre-defined bucket (which can both be computed accurately), and percentiles (which will be approximated). Each metric is labelled by protocol (AMQP 1.0, AMQP 0.9.1, MQTT 5.0, MQTT 3.1.1, and MQTT 3.1). +## New Features + +### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation +[PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation. +This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. + ## Potential incompatibilities * The default MQTT [Maximum Packet Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086) changed from 256 MiB to 16 MiB. This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`. Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB).