Skip to content

Avoid memory leaks in RMQConnection dependency graph (take 4) #198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
638 changes: 637 additions & 1 deletion RMQClient.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion RMQClient/RMQAllocatedChannel.m
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ @interface RMQAllocatedChannel ()
@property (nonatomic, readwrite) NSNumber *prefetchCountPerChannel;
@property (nonatomic, readwrite) id<RMQConnectionDelegate> delegate;
@property (nonatomic, readwrite) id<RMQNameGenerator> nameGenerator;
@property (nonatomic, readwrite) id<RMQChannelAllocator> allocator;
@property (nonatomic, weak, readwrite) id<RMQChannelAllocator> allocator;
@end

@implementation RMQAllocatedChannel
Expand Down
4 changes: 3 additions & 1 deletion RMQClient/RMQChannelAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
#import "RMQSender.h"

@protocol RMQChannelAllocator <NSObject>
@property (nonatomic, readwrite) id<RMQSender> sender;
@property (nonatomic,weak, readwrite) id<RMQSender> sender;
- (id<RMQChannel>)allocate;
- (void)releaseChannelNumber:(NSNumber *)channelNumber;
- (NSArray<id<RMQChannel>> *)allocatedUserChannels;
- (void)cleanupOnClose;

@end
42 changes: 31 additions & 11 deletions RMQClient/RMQConnection.m
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ @interface RMQConnection ()
@property (nonatomic, readwrite) id <RMQFrameHandler> frameHandler;
@property (nonatomic, readwrite) id<RMQLocalSerialQueue> commandQueue;
@property (nonatomic, readwrite) id<RMQWaiterFactory> waiterFactory;
@property (nonatomic, readwrite) id<RMQHeartbeatSender> heartbeatSender;
@property (nonatomic, weak, readwrite) id<RMQHeartbeatSender> heartbeatSender;
@property (nonatomic, weak, readwrite) id<RMQConnectionDelegate> delegate;
@property (nonatomic, readwrite) id <RMQChannel> channelZero;
@property (nonatomic, weak, readwrite) id <RMQChannel> channelZero;
@property (nonatomic, readwrite) RMQConnectionConfig *config;
@property (nonatomic, readwrite) NSMutableDictionary *userChannels;
@property (nonatomic, readwrite) NSNumber *frameMax;
Expand Down Expand Up @@ -559,22 +559,24 @@ - (void)start:(void (^)(void))completionHandler {
[self.delegate connection:self failedToConnectWithError:connectError];
} else {
[self.transport write:[RMQProtocolHeader new].amqEncoded];
__weak id this = self;

[self.commandQueue enqueue:^{
id<RMQWaiter> handshakeCompletion = [self.waiterFactory makeWithTimeout:self.handshakeTimeout];
__strong typeof(self) strongThis = this;
id<RMQWaiter> handshakeCompletion = [strongThis.waiterFactory makeWithTimeout:strongThis.handshakeTimeout];

RMQHandshaker *handshaker = [[RMQHandshaker alloc] initWithSender:self
config:self.config
RMQHandshaker *handshaker = [[RMQHandshaker alloc] initWithSender:strongThis
config:strongThis.config
completionHandler:^(NSNumber *heartbeatTimeout,
RMQTable *serverProperties) {
[self.heartbeatSender startWithInterval:@(heartbeatTimeout.integerValue / 2)];
self.handshakeComplete = YES;
[strongThis.heartbeatSender startWithInterval:@(heartbeatTimeout.integerValue / 2)];
strongThis.handshakeComplete = YES;
[handshakeCompletion done];
[self.reader run];
self.serverProperties = serverProperties;
[strongThis.reader run];
strongThis.serverProperties = serverProperties;
completionHandler();
}];
RMQReader *handshakeReader = [[RMQReader alloc] initWithTransport:self.transport
RMQReader *handshakeReader = [[RMQReader alloc] initWithTransport:strongThis.transport
frameHandler:handshaker];
handshaker.reader = handshakeReader;
[handshakeReader run];
Expand All @@ -583,7 +585,7 @@ - (void)start:(void (^)(void))completionHandler {
NSError *error = [NSError errorWithDomain:RMQErrorDomain
code:RMQErrorConnectionHandshakeTimedOut
userInfo:@{NSLocalizedDescriptionKey: @"Handshake timed out."}];
[self.delegate connection:self failedToConnectWithError:error];
[strongThis.delegate connection:strongThis failedToConnectWithError:error];
}
}];
}
Expand Down Expand Up @@ -704,7 +706,16 @@ - (NSArray *)closeOperationsWithoutBlock {
^{[self.heartbeatSender stop];},
^{
self.transport.delegate = nil;
[self.transport cleanup];
[self.transport close];
},
^{
[self.channelAllocator cleanupOnClose];
self.channelAllocator = nil;
},
^{
self.reader = nil;
self.frameHandler = nil;
}];
}

Expand All @@ -716,7 +727,16 @@ - (NSArray *)closeOperations {
^{[self.heartbeatSender stop];},
^{
self.transport.delegate = nil;
[self.transport cleanup];
[self.transport close];
},
^{
[self.channelAllocator cleanupOnClose];
self.channelAllocator = nil;
},
^{
self.reader = nil;
self.frameHandler = nil;
}];
}

Expand Down
2 changes: 1 addition & 1 deletion RMQClient/RMQConnectionRecover.m
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ @interface RMQConnectionRecover ()
@property (nonatomic, readwrite) NSUInteger attempts;
@property (nonatomic, readwrite) NSUInteger attemptLimit;
@property (nonatomic, readwrite) BOOL onlyErrors;
@property (nonatomic, readwrite) id<RMQHeartbeatSender> heartbeatSender;
@property (nonatomic, weak, readwrite) id<RMQHeartbeatSender> heartbeatSender;
@property (nonatomic, readwrite) id<RMQLocalSerialQueue> commandQueue;
@property (nonatomic, readwrite) id<RMQConnectionDelegate> delegate;
@end
Expand Down
8 changes: 7 additions & 1 deletion RMQClient/RMQMultipleChannelAllocator.m
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ - (instancetype)init {
[self doesNotRecognizeSelector:_cmd];
return nil;
}

- (void)cleanupOnClose {
for (id key in self.channels) {
[[self.channels objectForKey:key] close];
}
[self.channels removeAllObjects];

}
- (id<RMQChannel>)allocate {
id<RMQChannel> ch;
@synchronized(self) {
Expand Down
21 changes: 12 additions & 9 deletions RMQClient/RMQReader.m
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
#import "RMQMethodDecoder.h"

@interface RMQReader ()
@property (nonatomic, readwrite) id<RMQTransport>transport;
@property (nonatomic,weak, readwrite) id<RMQTransport>transport;
@property (nonatomic, readwrite) id<RMQFrameHandler>frameHandler;
@end

Expand Down Expand Up @@ -78,20 +78,22 @@ - (void)run {

- (void)handleMethodFrame:(RMQFrame *)frame {
id<RMQMethod> method = (id<RMQMethod>)frame.payload;

__weak id this = self;
if (method.hasContent) {
[self.transport readFrame:^(NSData * _Nonnull headerData) {
RMQFrame *headerFrame = [self frameWithData:headerData];
__strong typeof(self) strongThis = this;

RMQFrame *headerFrame = [strongThis frameWithData:headerData];
RMQContentHeader *header = (RMQContentHeader *)headerFrame.payload;

RMQFrameset *frameset = [[RMQFrameset alloc] initWithChannelNumber:frame.channelNumber
method:method
contentHeader:header
contentBodies:@[]];
if ([header.bodySize isEqualToNumber:@0]) {
[self.frameHandler handleFrameset:frameset];
[strongThis.frameHandler handleFrameset:frameset];
} else {
[self readBodiesForIncompleteFrameset:frameset];
[strongThis readBodiesForIncompleteFrameset:frameset];
}
}];
} else {
Expand All @@ -102,17 +104,18 @@ - (void)handleMethodFrame:(RMQFrame *)frame {
}

- (void)readBodiesForIncompleteFrameset:(RMQFrameset *)contentFrameset {
__weak id this = self;
[self.transport readFrame:^(NSData * _Nonnull data) {
RMQFrame *frame = [self frameWithData:data];

__strong typeof(self) strongThis = this;
if ([frame.payload isKindOfClass:[RMQContentBody class]]) {
[self frameset:contentFrameset
[this frameset:contentFrameset
addBodyFrame:frame];
} else {
[self.frameHandler handleFrameset:contentFrameset];
[strongThis.frameHandler handleFrameset:contentFrameset];
RMQFrameset *nonContentFrameset = [[RMQFrameset alloc] initWithChannelNumber:contentFrameset.channelNumber
method:(id <RMQMethod>)frame.payload];
[self.frameHandler handleFrameset:nonContentFrameset];
[strongThis.frameHandler handleFrameset:nonContentFrameset];
}
}];
}
Expand Down
4 changes: 2 additions & 2 deletions RMQClient/RMQSuspendResumeDispatcher.m
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@

@interface RMQSuspendResumeDispatcher ()
@property (nonatomic, readwrite) id<RMQChannel> channel;
@property (nonatomic, readwrite) id<RMQSender> sender;
@property (nonatomic,weak, readwrite) id<RMQSender> sender;
@property (nonatomic, readwrite) RMQFramesetValidator *validator;
@property (nonatomic, readwrite) id<RMQLocalSerialQueue> commandQueue;
@property (nonatomic, readwrite) id<RMQLocalSerialQueue> enablementQueue;
@property (nonatomic, readwrite) NSNumber *enableDelay;
@property (nonatomic, readwrite) id<RMQConnectionDelegate> delegate;
@property (nonatomic,weak, readwrite) id<RMQConnectionDelegate> delegate;
@property (nonatomic, readwrite) DispatcherState state;
@property (nonatomic, readwrite) BOOL disabled;
@end
Expand Down
1 change: 1 addition & 0 deletions RMQClient/RMQSynchronizedMutableDictionary.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@
- (nullable id)objectForKeyedSubscript:(nonnull id)key;
- (void)setObject:(nonnull id)obj forKeyedSubscript:(nonnull id)key;
- (void)removeObjectForKey:(nonnull id)key;
- (void)removeAllObjects;
@end
6 changes: 6 additions & 0 deletions RMQClient/RMQSynchronizedMutableDictionary.m
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,11 @@ - (void)removeObjectForKey:(id)key {
self.count--;
}
}
- (void)removeAllObjects {
@synchronized (self.lock) {
[self.backingDictionary removeAllObjects];
self.count = 0;
}
}

@end
10 changes: 7 additions & 3 deletions RMQClient/RMQTCPSocketTransport.m
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,15 @@ struct __attribute__((__packed__)) AMQPHeader {
#define AMQP091_FINAL_OCTET_SIZE 1

- (void)readFrame:(void (^)(NSData * _Nonnull))complete {
__weak id this = self;
[self read:AMQP091_HEADER_SIZE complete:^(NSData * _Nonnull data) {
const struct AMQPHeader *header;
header = (const struct AMQPHeader *)data.bytes;

__strong typeof(self) strongThis = this;
UInt32 hostSize = CFSwapInt32BigToHost(header->size);

[self read:hostSize complete:^(NSData * _Nonnull payload) {
[self read:AMQP091_FINAL_OCTET_SIZE complete:^(NSData * _Nonnull frameEnd) {
[strongThis read:hostSize complete:^(NSData * _Nonnull payload) {
[strongThis read:AMQP091_FINAL_OCTET_SIZE complete:^(NSData * _Nonnull frameEnd) {
NSMutableData *allData = [data mutableCopy];
[allData appendData:payload];
complete(allData);
Expand Down Expand Up @@ -228,6 +229,9 @@ - (void)invokeZeroArityCallback:(long)tag {
foundCallback();
}
}
- (void)cleanup {
[self.callbacks removeAllObjects];
}

- (BOOL)tlsUpgradeWithError:(NSError **)error {
NSArray *certificates = [self.tlsOptions certificatesWithError:error];
Expand Down
1 change: 1 addition & 0 deletions RMQClient/RMQTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@
- (BOOL)isConnected;
/// @brief Returns true if the transport is not connected
- (BOOL)isDisconnected;
- (void)cleanup;
@end
7 changes: 7 additions & 0 deletions RMQClientTests/ChannelSpyAllocator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
// ---------------------------------------------------------------------------

@objc class ChannelSpyAllocator: NSObject, RMQChannelAllocator {

var id = 0
var channels: [ChannelSpy] = []
var sender: RMQSender!
Expand All @@ -51,6 +52,12 @@
channels.append(ch)
return ch
}
func cleanupOnClose() {
for channel in channels {
channel.close()
}
channels.removeAll()
}

func releaseChannelNumber(_ channelNumber: NSNumber!) {
channels = channels.filter { ch -> Bool in
Expand Down
4 changes: 2 additions & 2 deletions RMQClientTests/ConnectionClosureTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ConnectionClosureTest: XCTestCase {
let allocator = ChannelSpyAllocator()
let q = FakeSerialQueue()
let handshakeCount = 1
let expectedCloseProcedureCount = 5
let expectedCloseProcedureCount = 7
let channelsToCreateCount = 2
let conn = RMQConnection(transport: transport,
config: ConnectionWithFakesHelper.connectionConfig(),
Expand Down Expand Up @@ -187,7 +187,7 @@ class ConnectionClosureTest: XCTestCase {
let transport = ControlledInteractionTransport()
let allocator = ChannelSpyAllocator()
let q = FakeSerialQueue()
let expectedCloseProcedureCount = 5
let expectedCloseProcedureCount = 7
let channelsToCreateCount = 2
let heartbeatSender = HeartbeatSenderSpy()
let conn = RMQConnection(transport: transport,
Expand Down
3 changes: 3 additions & 0 deletions RMQClientTests/ControlledInteractionTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ enum TestDoubleTransportError: Error {
connected = false
delegate?.transport(self, disconnectedWithError: nil)
}
func cleanup() {

}

func write(_ data: Data) {
outboundData.append(data)
Expand Down