-
Notifications
You must be signed in to change notification settings - Fork 790
Open
Description
I would like to be able to use zmq::poll()
for normal ZMQ sockets and monitoring ZMQ sockets, to handle them all within a single thread. However this isn't currently possible with the zmq::monitor_t
class due to the blocking nature of the zmq::monitor_t::monitor()
method and the lack of an accessor for the socket that is monitoring socketPtr
.
Could we augment the zmq::monitor_t
class with the functionality to allow polling? Perhaps something like the following would work (apologies in advance for syntax)...
class monitor_t {
// ...
inline operator void* () ZMQ_NOTHROW
{
return monitorPtr;
}
inline operator void const* () const ZMQ_NOTHROW
{
return monitorPtr;
}
void monitor_start(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
{
int rc = zmq_socket_monitor(socket.ptr, addr_, events);
if (rc != 0)
throw error_t ();
socketPtr = socket.ptr;
monitorPtr = zmq_socket (socket.ctxptr, ZMQ_PAIR);
assert (monitorPtr);
rc = zmq_connect (monitorPtr, addr_);
assert (rc == 0);
on_monitor_started();
}
bool monitor_poll()
{
zmq_msg_t eventMsg;
zmq_msg_init (&eventMsg);
int rc = zmq_msg_recv (&eventMsg, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
return false;
assert (rc != -1);
// ...
}
void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
{
monitor_start(socket, addr_, events);
bool keep_looping = monitor_poll();
while (keep_looping) {
keep_looping = monitor_poll();
}
}
// ...
private:
void* socketPtr;
void* monitorPtr;
};
This would allow a usage similar to the following...
void Worker(zmq::socket_t* raw_zmq_data_sock)
{
std::unique_ptr<zmq::socket_t> zmq_data_sock(raw_zmq_data_sock);
std::unique_ptr<zmq::monitor_t> zmq_monitor_sock(new zmq::monitor_t);
zmq_monitor_sock->start_monitor(*zmq_data_sock, "inproc://abc");
std::vector<zmq::pollitem_t> poll_items = {
{ (void*)(*zmq_data_sock), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0, },
{ (void*)(*zmq_monitor_sock), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0, },
};
while (true) {
poll_items[0].revents = 0;
poll_items[1].revents = 0;
items = zmq::poll(poll_items);
// ...
if (poll_items[1].revents) {
zmq_monitor_sock->monitor_poll();
}
}
// ...
}
I can put a pull request together if it sounds like a reasonable idea.