@@ -416,7 +416,8 @@ apply(#{index := Index,
416
416
{State , {dequeue , empty }, Effects }
417
417
end
418
418
end ;
419
- apply (#{index := _Idx } = Meta ,
419
+ apply (#{index := _Idx ,
420
+ machine_version := Vsn } = Meta ,
420
421
# checkout {spec = Spec ,
421
422
consumer_id = ConsumerId }, State0 )
422
423
when Spec == cancel orelse
@@ -425,19 +426,19 @@ apply(#{index := _Idx} = Meta,
425
426
{ok , ConsumerKey } ->
426
427
{State1 , Effects1 } = activate_next_consumer (
427
428
cancel_consumer (Meta , ConsumerKey , State0 , [],
428
- Spec )),
429
+ Spec ), Vsn ),
429
430
Reply = {ok , consumer_cancel_info (ConsumerKey , State1 )},
430
431
{State , _ , Effects } = checkout (Meta , State0 , State1 , Effects1 ),
431
432
{State , Reply , Effects };
432
433
error ->
433
434
{State0 , {error , consumer_not_found }, []}
434
435
end ;
435
- apply (#{index := Idx } = Meta ,
436
+ apply (#{index := Idx ,
437
+ machine_version := Vsn } = Meta ,
436
438
# checkout {spec = Spec0 ,
437
439
meta = ConsumerMeta ,
438
440
consumer_id = {_ , Pid } = ConsumerId }, State0 ) ->
439
- % % might be better to check machine_version
440
- IsV4 = tuple_size (Spec0 ) == 2 ,
441
+ IsV4 = Vsn >= 4 ,
441
442
% % normalise spec format
442
443
Spec = case Spec0 of
443
444
{_ , _ } ->
@@ -461,7 +462,7 @@ apply(#{index := Idx} = Meta,
461
462
end ,
462
463
{Consumer , State1 } = update_consumer (Meta , ConsumerKey , ConsumerId ,
463
464
ConsumerMeta , Spec , Priority , State0 ),
464
- {State2 , Effs } = activate_next_consumer (State1 , []),
465
+ {State2 , Effs } = activate_next_consumer (State1 , [], Vsn ),
465
466
# consumer {checked_out = Checked ,
466
467
credit = Credit ,
467
468
delivery_count = DeliveryCount ,
@@ -472,7 +473,7 @@ apply(#{index := Idx} = Meta,
472
473
credit => Credit ,
473
474
key => ConsumerKey ,
474
475
delivery_count => DeliveryCount ,
475
- is_active => is_active (ConsumerKey , State2 ),
476
+ is_active => is_active (ConsumerKey , State2 , Vsn ),
476
477
num_checked_out => map_size (Checked )}},
477
478
checkout (Meta , State0 , State2 , [{monitor , process , Pid } | Effs ], Reply );
478
479
apply (#{index := Index }, # purge {},
@@ -553,7 +554,7 @@ apply(#{machine_version := Vsn,
553
554
554
555
% % select a new consumer from the waiting queue and run a checkout
555
556
State2 = State1 #? STATE {waiting_consumers = WaitingConsumers },
556
- {State , Effects1 } = activate_next_consumer (State2 , Effects0 ),
557
+ {State , Effects1 } = activate_next_consumer (State2 , Effects0 , Vsn ),
557
558
558
559
% % mark any enquers as suspected
559
560
Enqs = maps :map (fun (P , E ) when node (P ) =:= Node ->
@@ -602,8 +603,9 @@ apply(#{machine_version := Vsn,
602
603
Effects = [{monitor , node , Node } | Effects1 ],
603
604
checkout (Meta , State0 , State #? STATE {enqueuers = Enqs ,
604
605
last_active = Ts }, Effects );
605
- apply (Meta , {down , Pid , _Info }, State0 ) ->
606
- {State1 , Effects1 } = activate_next_consumer (handle_down (Meta , Pid , State0 )),
606
+ apply (#{machine_version := Vsn } = Meta , {down , Pid , _Info }, State0 ) ->
607
+ {State1 , Effects1 } = activate_next_consumer (handle_down (Meta , Pid , State0 ),
608
+ Vsn ),
607
609
checkout (Meta , State0 , State1 , Effects1 );
608
610
apply (#{machine_version := Vsn } = Meta ,
609
611
{nodeup , Node },
@@ -639,7 +641,7 @@ apply(#{machine_version := Vsn} = Meta,
639
641
Waiting = update_waiting_consumer_status (Node , State1 , up ),
640
642
State2 = State1 #? STATE {enqueuers = Enqs1 ,
641
643
waiting_consumers = Waiting },
642
- {State , Effects } = activate_next_consumer (State2 , Effects1 ),
644
+ {State , Effects } = activate_next_consumer (State2 , Effects1 , Vsn ),
643
645
checkout (Meta , State0 , State , Effects );
644
646
apply (_ , {nodedown , _Node }, State ) ->
645
647
{State , ok };
@@ -872,7 +874,7 @@ overview(#?STATE{consumers = Cons,
872
874
msg_ttl => Cfg # cfg .msg_ttl ,
873
875
delivery_limit => Cfg # cfg .delivery_limit
874
876
},
875
- SacOverview = case active_consumer (Cons ) of
877
+ SacOverview = case active_consumer (Cons , version () ) of
876
878
{SacConsumerKey , SacCon } ->
877
879
SacConsumerId = consumer_id (SacCon ),
878
880
NumWaiting = length (WaitingConsumers ),
@@ -1305,7 +1307,7 @@ query_consumers(#?STATE{consumers = Consumers,
1305
1307
1306
1308
query_single_active_consumer (#? STATE {cfg = # cfg {consumer_strategy = single_active },
1307
1309
consumers = Consumers }) ->
1308
- case active_consumer (Consumers ) of
1310
+ case active_consumer (Consumers , version () ) of
1309
1311
undefined ->
1310
1312
{error , no_value };
1311
1313
{_CKey , ? CONSUMER_TAG_PID (Tag , Pid )} ->
@@ -1476,15 +1478,15 @@ cancel_consumer0(Meta, ConsumerKey,
1476
1478
{S0 , Effects0 }
1477
1479
end .
1478
1480
1479
- activate_next_consumer ({State , Effects }) ->
1480
- activate_next_consumer (State , Effects ).
1481
+ activate_next_consumer ({State , Effects }, MacVsn ) ->
1482
+ activate_next_consumer (State , Effects , MacVsn ).
1481
1483
1482
1484
activate_next_consumer (#? STATE {cfg = # cfg {consumer_strategy = competing }} = State ,
1483
- Effects ) ->
1485
+ Effects , _MacVsn ) ->
1484
1486
{State , Effects };
1485
1487
activate_next_consumer (#? STATE {consumers = Cons0 ,
1486
1488
waiting_consumers = Waiting0 } = State0 ,
1487
- Effects0 ) ->
1489
+ Effects0 , MacVsn ) ->
1488
1490
% % invariant, the waiting list always need to be sorted by consumers that are
1489
1491
% % up - then by priority
1490
1492
NextConsumer =
@@ -1495,7 +1497,7 @@ activate_next_consumer(#?STATE{consumers = Cons0,
1495
1497
undefined
1496
1498
end ,
1497
1499
1498
- case {active_consumer (Cons0 ), NextConsumer } of
1500
+ case {active_consumer (Cons0 , MacVsn ), NextConsumer } of
1499
1501
{undefined , {NextCKey , # consumer {cfg = NextCCfg } = NextC }} ->
1500
1502
Remaining = tl (Waiting0 ),
1501
1503
% % TODO: can this happen?
@@ -1564,17 +1566,19 @@ active_consumer({CKey, #consumer{status = Status} = Consumer, _I})
1564
1566
active_consumer ({_CKey , # consumer {status = _ }, I }) ->
1565
1567
active_consumer (maps :next (I ));
1566
1568
active_consumer (none ) ->
1567
- undefined ;
1568
- active_consumer (M ) when is_map (M ) ->
1569
- I = maps :iterator (M ),
1569
+ undefined .
1570
+
1571
+ active_consumer (M , MacVsn ) when is_map (M ) ->
1572
+ I = rabbit_fifo_maps :iterator (M , MacVsn ),
1570
1573
active_consumer (maps :next (I )).
1571
1574
1572
- is_active (_ConsumerKey , #? STATE {cfg = # cfg {consumer_strategy = competing }}) ->
1575
+ is_active (_ConsumerKey ,
1576
+ #? STATE {cfg = # cfg {consumer_strategy = competing }}, _MacVsn ) ->
1573
1577
% % all competing consumers are potentially active
1574
1578
true ;
1575
1579
is_active (ConsumerKey , #? STATE {cfg = # cfg {consumer_strategy = single_active },
1576
- consumers = Consumers }) ->
1577
- ConsumerKey == active_consumer (Consumers ).
1580
+ consumers = Consumers }, MacVsn ) ->
1581
+ ConsumerKey == active_consumer (Consumers , MacVsn ).
1578
1582
1579
1583
maybe_return_all (#{system_time := Ts } = Meta , ConsumerKey ,
1580
1584
# consumer {cfg = CCfg } = Consumer , S0 ,
@@ -1835,26 +1839,26 @@ increase_credit(#consumer{cfg = #consumer_cfg{credit_mode =
1835
1839
increase_credit (# consumer {credit = Current }, Credit ) ->
1836
1840
Current + Credit .
1837
1841
1838
- complete_and_checkout (#{} = Meta , MsgIds , ConsumerKey ,
1842
+ complete_and_checkout (#{machine_version : = Vsn } = Meta , MsgIds , ConsumerKey ,
1839
1843
# consumer {} = Con0 ,
1840
1844
Effects0 , State0 ) ->
1841
1845
State1 = complete (Meta , ConsumerKey , MsgIds , Con0 , State0 ),
1842
1846
% % a completion could have removed the active/quiescing consumer
1843
- Effects1 = add_active_effect (Con0 , State1 , Effects0 ),
1844
- {State2 , Effects2 } = activate_next_consumer (State1 , Effects1 ),
1847
+ Effects1 = add_active_effect (Con0 , State1 , Effects0 , Vsn ),
1848
+ {State2 , Effects2 } = activate_next_consumer (State1 , Effects1 , Vsn ),
1845
1849
checkout (Meta , State0 , State2 , Effects2 ).
1846
1850
1847
1851
add_active_effect (# consumer {status = quiescing } = Consumer ,
1848
1852
#? STATE {cfg = # cfg {consumer_strategy = single_active },
1849
1853
consumers = Consumers } = State ,
1850
- Effects ) ->
1851
- case active_consumer (Consumers ) of
1854
+ Effects , MacVsn ) ->
1855
+ case active_consumer (Consumers , MacVsn ) of
1852
1856
undefined ->
1853
1857
consumer_update_active_effects (State , Consumer , false , waiting , Effects );
1854
1858
_ ->
1855
1859
Effects
1856
1860
end ;
1857
- add_active_effect (_ , _ , Effects ) ->
1861
+ add_active_effect (_ , _ , Effects , _ ) ->
1858
1862
Effects .
1859
1863
1860
1864
cancel_consumer_effects (ConsumerId ,
@@ -2340,16 +2344,16 @@ update_consumer(Meta, ConsumerKey, {Tag, Pid}, ConsumerMeta,
2340
2344
delivery_count = DeliveryCount }
2341
2345
end ,
2342
2346
{Consumer , update_or_remove_con (Meta , ConsumerKey , Consumer , State0 )};
2343
- update_consumer (Meta , ConsumerKey , {Tag , Pid }, ConsumerMeta ,
2344
- {Life , Mode } = Spec , Priority ,
2347
+ update_consumer (#{ machine_version : = Vsn } = Meta , ConsumerKey , {Tag , Pid },
2348
+ ConsumerMeta , {Life , Mode } = Spec , Priority ,
2345
2349
#? STATE {cfg = # cfg {consumer_strategy = single_active },
2346
2350
consumers = Cons0 ,
2347
2351
waiting_consumers = Waiting0 ,
2348
2352
service_queue = _ServiceQueue0 } = State ) ->
2349
2353
% % if it is the current active consumer, just update
2350
2354
% % if it is a cancelled active consumer, add to waiting unless it is the only
2351
2355
% % one, then merge
2352
- case active_consumer (Cons0 ) of
2356
+ case active_consumer (Cons0 , Vsn ) of
2353
2357
{ConsumerKey , # consumer {status = up } = Consumer0 } ->
2354
2358
Consumer = merge_consumer (Meta , Consumer0 , ConsumerMeta ,
2355
2359
Spec , Priority ),
0 commit comments