Skip to content

Commit 30d2267

Browse files
committed
Change consumer listener to use original consumer pointer
1 parent 5f3d0a3 commit 30d2267

File tree

4 files changed

+63
-91
lines changed

4 files changed

+63
-91
lines changed

src/Consumer.cc

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,20 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
4343
constructor.SuppressDestruct();
4444
}
4545

46-
void Consumer::SetCConsumer(pulsar_consumer_t *cConsumer) { this->cConsumer = cConsumer; }
46+
void Consumer::SetCConsumer(std::shared_ptr<CConsumerWrapper> cConsumer) { this->wrapper = cConsumer; }
4747
void Consumer::SetListenerCallback(ListenerCallback *listener) { this->listener = listener; }
4848

4949
Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info) {}
5050

5151
class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
5252
public:
5353
ConsumerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
54-
ConsumerConfig *consumerConfig)
54+
ConsumerConfig *consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper)
5555
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
5656
deferred(deferred),
5757
cClient(cClient),
58-
consumerConfig(consumerConfig) {}
58+
consumerConfig(consumerConfig),
59+
consumerWrapper(consumerWrapper) {}
5960
~ConsumerNewInstanceWorker() {}
6061
void Execute() {
6162
const std::string &topic = this->consumerConfig->GetTopic();
@@ -77,9 +78,9 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
7778
return;
7879
}
7980

80-
pulsar_result result =
81-
pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(),
82-
this->consumerConfig->GetCConsumerConfig(), &this->cConsumer);
81+
pulsar_result result = pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(),
82+
this->consumerConfig->GetCConsumerConfig(),
83+
&this->consumerWrapper->cConsumer);
8384
if (result != pulsar_result_Ok) {
8485
SetError(std::string("Failed to create consumer: ") + pulsar_result_str(result));
8586
} else {
@@ -91,7 +92,7 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
9192
void OnOK() {
9293
Napi::Object obj = Consumer::constructor.New({});
9394
Consumer *consumer = Consumer::Unwrap(obj);
94-
consumer->SetCConsumer(this->cConsumer);
95+
consumer->SetCConsumer(this->consumerWrapper);
9596
consumer->SetListenerCallback(this->listener);
9697
this->deferred.Resolve(obj);
9798
}
@@ -103,13 +104,16 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
103104
pulsar_consumer_t *cConsumer;
104105
ConsumerConfig *consumerConfig;
105106
ListenerCallback *listener;
107+
std::shared_ptr<CConsumerWrapper> consumerWrapper;
106108
};
107109

108110
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
109111
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
110112
Napi::Object config = info[0].As<Napi::Object>();
111-
ConsumerConfig *consumerConfig = new ConsumerConfig(config);
112-
ConsumerNewInstanceWorker *wk = new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig);
113+
std::shared_ptr<CConsumerWrapper> consumerWrapper = std::make_shared<CConsumerWrapper>();
114+
ConsumerConfig *consumerConfig = new ConsumerConfig(config, consumerWrapper);
115+
ConsumerNewInstanceWorker *wk =
116+
new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig, consumerWrapper);
113117
wk->Queue();
114118
return deferred.Promise();
115119
}
@@ -151,11 +155,12 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
151155
Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
152156
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
153157
if (info[0].IsUndefined()) {
154-
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer);
158+
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer);
155159
wk->Queue();
156160
} else {
157161
Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
158-
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
162+
ConsumerReceiveWorker *wk =
163+
new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer, timeout.Int64Value());
159164
wk->Queue();
160165
}
161166
return deferred.Promise();
@@ -164,25 +169,26 @@ Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
164169
void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
165170
Napi::Object obj = info[0].As<Napi::Object>();
166171
Message *msg = Message::Unwrap(obj);
167-
pulsar_consumer_acknowledge_async(this->cConsumer, msg->GetCMessage(), NULL, NULL);
172+
pulsar_consumer_acknowledge_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL);
168173
}
169174

170175
void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
171176
Napi::Object obj = info[0].As<Napi::Object>();
172177
MessageId *msgId = MessageId::Unwrap(obj);
173-
pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
178+
pulsar_consumer_acknowledge_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL, NULL);
174179
}
175180

176181
void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
177182
Napi::Object obj = info[0].As<Napi::Object>();
178183
Message *msg = Message::Unwrap(obj);
179-
pulsar_consumer_acknowledge_cumulative_async(this->cConsumer, msg->GetCMessage(), NULL, NULL);
184+
pulsar_consumer_acknowledge_cumulative_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL);
180185
}
181186

182187
void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
183188
Napi::Object obj = info[0].As<Napi::Object>();
184189
MessageId *msgId = MessageId::Unwrap(obj);
185-
pulsar_consumer_acknowledge_cumulative_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
190+
pulsar_consumer_acknowledge_cumulative_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL,
191+
NULL);
186192
}
187193

188194
class ConsumerCloseWorker : public Napi::AsyncWorker {
@@ -210,14 +216,13 @@ class ConsumerCloseWorker : public Napi::AsyncWorker {
210216

211217
Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
212218
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
213-
ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->cConsumer);
219+
ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer);
214220
wk->Queue();
215221
return deferred.Promise();
216222
}
217223

218224
Consumer::~Consumer() {
219-
pulsar_consumer_pause_message_listener(this->cConsumer);
220-
pulsar_consumer_free(this->cConsumer);
225+
pulsar_consumer_pause_message_listener(this->wrapper->cConsumer);
221226
if (this->listener) {
222227
this->listener->callback.Release();
223228
}

src/Consumer.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,11 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
3131
static Napi::FunctionReference constructor;
3232
Consumer(const Napi::CallbackInfo &info);
3333
~Consumer();
34-
void SetCConsumer(pulsar_consumer_t *cConsumer);
34+
void SetCConsumer(std::shared_ptr<CConsumerWrapper> cConsumer);
3535
void SetListenerCallback(ListenerCallback *listener);
36-
pulsar_consumer_t *GetCConsumer();
3736

3837
private:
39-
pulsar_consumer_t *cConsumer;
38+
std::shared_ptr<CConsumerWrapper> wrapper;
4039
ListenerCallback *listener;
4140

4241
Napi::Value Receive(const Napi::CallbackInfo &info);

src/ConsumerConfig.cc

Lines changed: 30 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -41,105 +41,55 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
4141
{"Failover", pulsar_ConsumerFailover}};
4242

4343
struct MessageListenerProxyData {
44-
pulsar_consumer_t *cConsumer;
44+
std::shared_ptr<CConsumerWrapper> consumerWrapper;
4545
pulsar_message_t *cMessage;
46-
std::mutex mutex;
47-
std::condition_variable cv;
48-
bool finished;
4946

50-
MessageListenerProxyData(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage)
51-
: cConsumer(cConsumer), cMessage(cMessage), finished(false) {}
52-
53-
void Wait() {
54-
std::unique_lock<std::mutex> lk(this->mutex);
55-
this->cv.wait(lk, [=]() { return this->finished; });
56-
}
57-
58-
void Notify() {
59-
std::lock_guard<std::mutex> lk(this->mutex);
60-
this->finished = true;
61-
this->cv.notify_all();
62-
}
63-
64-
bool Finished() {
65-
std::lock_guard<std::mutex> lk(this->mutex);
66-
return this->finished;
67-
}
47+
MessageListenerProxyData(std::shared_ptr<CConsumerWrapper> consumerWrapper, pulsar_message_t *cMessage)
48+
: consumerWrapper(consumerWrapper), cMessage(cMessage) {}
6849
};
6950

7051
void Acknowledge(const Napi::CallbackInfo &info) {
7152
Napi::Object obj = info[0].As<Napi::Object>();
7253
Message *msg = Message::Unwrap(obj);
73-
std::shared_ptr<MessageListenerProxyData> *listenerData =
74-
(std::shared_ptr<MessageListenerProxyData> *)info.Data();
54+
MessageListenerProxyData *listenerData = (MessageListenerProxyData *)info.Data();
7555
// We can't use the consumer after it has been notified, the consumer will be destrotyed after the first
7656
// time we call notify
77-
if (!(*listenerData)->Finished()) {
78-
pulsar_consumer_acknowledge_async((*listenerData)->cConsumer, msg->GetCMessage(), NULL, NULL);
79-
}
80-
(*listenerData)->Notify();
81-
}
82-
83-
void NegativeAcknowledge(const Napi::CallbackInfo &info) {
84-
std::shared_ptr<MessageListenerProxyData> *listenerData =
85-
(std::shared_ptr<MessageListenerProxyData> *)info.Data();
86-
// Nack isn't available on the C client version 2.3.0
87-
// This just releases the message listener thread
88-
(*listenerData)->Notify();
57+
pulsar_consumer_acknowledge_async(listenerData->consumerWrapper->cConsumer, msg->GetCMessage(), NULL, NULL);
8958
}
9059

9160
void FinalizeMessageListenerProxy(napi_env env, void *data, void *) {
92-
std::shared_ptr<MessageListenerProxyData> *listenerData = (std::shared_ptr<MessageListenerProxyData> *)data;
93-
(*listenerData)->Notify();
61+
MessageListenerProxyData *listenerData = (MessageListenerProxyData *)data;
9462
delete listenerData;
9563
}
9664

97-
void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback,
98-
std::shared_ptr<MessageListenerProxyData> *data) {
99-
Napi::Object obj = Message::NewInstance({}, (*data)->cMessage);
100-
std::shared_ptr<MessageListenerProxyData> *dataCopy = new std::shared_ptr<MessageListenerProxyData>(*data);
65+
void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
66+
Napi::Object obj = Message::NewInstance({}, data->cMessage);
10167

10268
Napi::Function acknowledge = Napi::Function::New(env, &Acknowledge, "Acknowledge", data);
103-
Napi::Function negativeAcknowledge =
104-
Napi::Function::New(env, &NegativeAcknowledge, "NegativeAcknowledge", dataCopy);
105-
10669
napi_status ackFinalizerStatus =
10770
napi_wrap(env, acknowledge, data, &FinalizeMessageListenerProxy, NULL, NULL);
10871
if (ackFinalizerStatus != napi_ok) {
109-
(*data)->Notify();
110-
delete dataCopy;
11172
delete data;
11273
return;
11374
}
114-
115-
napi_status nackFinalizerStatus =
116-
napi_wrap(env, negativeAcknowledge, dataCopy, &FinalizeMessageListenerProxy, NULL, NULL);
117-
if (nackFinalizerStatus != napi_ok) {
118-
(*dataCopy)->Notify();
119-
delete dataCopy;
120-
return;
121-
}
122-
jsCallback.Call({obj, acknowledge, negativeAcknowledge});
75+
jsCallback.Call({obj, acknowledge});
12376
}
12477

12578
void MessageListener(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage, void *ctx) {
126-
Napi::ThreadSafeFunction *listenerCallback = (Napi::ThreadSafeFunction *)ctx;
127-
if (listenerCallback->Acquire() != napi_ok) {
79+
ListenerCallback *listenerCallback = (ListenerCallback *)ctx;
80+
if (listenerCallback->callback.Acquire() != napi_ok) {
12881
return;
12982
};
130-
std::shared_ptr<MessageListenerProxyData> data =
131-
std::make_shared<MessageListenerProxyData>(cConsumer, cMessage);
132-
std::shared_ptr<MessageListenerProxyData> *dataPtr = new std::shared_ptr<MessageListenerProxyData>(data);
133-
*dataPtr = data;
134-
listenerCallback->BlockingCall(dataPtr, MessageListenerProxy);
135-
listenerCallback->Release();
136-
137-
data->Wait();
83+
MessageListenerProxyData *dataPtr =
84+
new MessageListenerProxyData(listenerCallback->consumerWrapper, cMessage);
85+
listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
86+
listenerCallback->callback.Release();
13887
}
13988

14089
void FinalizeListenerCallback(Napi::Env env, ListenerCallback *cb, void *) { delete cb; }
14190

142-
ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig)
91+
ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig,
92+
std::shared_ptr<CConsumerWrapper> consumerWrapper)
14393
: topic(""), subscription(""), ackTimeoutMs(0), listener(nullptr) {
14494
this->cConsumerConfig = pulsar_consumer_configuration_create();
14595

@@ -201,18 +151,21 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig)
201151

202152
if (consumerConfig.Has(CFG_LISTENER) && consumerConfig.Get(CFG_LISTENER).IsFunction()) {
203153
this->listener = new ListenerCallback();
154+
this->listener->consumerWrapper = consumerWrapper;
204155
Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New(
205156
consumerConfig.Env(), consumerConfig.Get(CFG_LISTENER).As<Napi::Function>(), "Listener Callback", 1,
206157
1, (void *)NULL, FinalizeListenerCallback, listener);
207158
this->listener->callback = std::move(callback);
208159
pulsar_consumer_configuration_set_message_listener(this->cConsumerConfig, &MessageListener,
209-
&this->listener->callback);
160+
this->listener);
210161
}
211162
}
212163

213164
ConsumerConfig::~ConsumerConfig() {
214165
pulsar_consumer_configuration_free(this->cConsumerConfig);
215-
if (this->listener) this->listener->callback.Release();
166+
if (this->listener) {
167+
this->listener->callback.Release();
168+
}
216169
}
217170

218171
pulsar_consumer_configuration_t *ConsumerConfig::GetCConsumerConfig() { return this->cConsumerConfig; }
@@ -226,3 +179,11 @@ ListenerCallback *ConsumerConfig::GetListenerCallback() {
226179
}
227180

228181
int64_t ConsumerConfig::GetAckTimeoutMs() { return this->ackTimeoutMs; }
182+
183+
CConsumerWrapper::CConsumerWrapper() : cConsumer(nullptr) {}
184+
185+
CConsumerWrapper::~CConsumerWrapper() {
186+
if (this->cConsumer) {
187+
pulsar_consumer_free(this->cConsumer);
188+
}
189+
}

src/ConsumerConfig.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,20 @@
2525

2626
#define MIN_ACK_TIMEOUT_MILLIS 10000
2727

28+
struct CConsumerWrapper {
29+
pulsar_consumer_t *cConsumer;
30+
CConsumerWrapper();
31+
~CConsumerWrapper();
32+
};
33+
2834
struct ListenerCallback {
2935
Napi::ThreadSafeFunction callback;
36+
std::shared_ptr<CConsumerWrapper> consumerWrapper;
3037
};
3138

3239
class ConsumerConfig {
3340
public:
34-
ConsumerConfig(const Napi::Object &consumerConfig);
41+
ConsumerConfig(const Napi::Object &consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper);
3542
~ConsumerConfig();
3643
pulsar_consumer_configuration_t *GetCConsumerConfig();
3744
std::string GetTopic();

0 commit comments

Comments
 (0)