The message queue

msgqueue_auto represents the execution thread's internal message queue. It is constructed in automatic scope, so it exists when x::start_threadmsgdispatcher() returns. The new thread is immediately ready to process messages. Because the message queue exists in automatic scope it gets destroyed when the execution thread terminates. Any unprocessed messages in the queue get destroyed as well.

sendevent() takes no action if the execution thread is not running, and the message queue does not exist. In that situation sendevent() does nothing, and returns immediately. This is the expected design pattern for sending messages to the execution thread:

The mcguffin gets destroyed when the message dispatching thread's private method (which processes the message) returns. No more references to the mcguffin exist, at this point. If the execution thread is not running and sendevent() does nothing, the mcguffin gets destroyed immediately. If the execution thread terminates with the unprocessed message remaining in the message queue, the message and its mcguffin also gets destroyed. Examining the contents of the status object, therefore, indicates whether the message was processed, or not, in all possible outcomes.

Using the message queue

msgqueue_auto is just a wrapper for an event queue:

msgqueue_auto msgqueue(this, x::eventfd::create());

// ...

x::eventfd eventfd=msgqueue->get_eventfd();

// ...

if (!msgqueue->empty())
     msgqueue->pop()->dispatch();
	  

msgqueue_auto's constructor's optional second parameter specifies the event queue's file descriptor, and a new one gets created by default. msgqueue.event(); is just a shortcut for msgqueue->pop()->dispatch();

It is possible to set the event file descriptor to non-blocking mode, use empty()() to check for empty messages, or use poll() to wait for messages to get added to the message queue.

Sometimes it's convenient to create the execution thread's queue after the execution thread's object gets created, but before the execution thread starts:


auto clinstance = myClass::create();

x::threadmsgdispatcherObj::msgqueue_obj queue_arg=x::threadmsgdispatcherObj::msgqueue_obj::create(clinstance);

// Additional initialization

x::start_threadmsgdispatcher(clinstance, queue_arg);

// ......

void myThreadObj::run(x::ptr<x::obj> &threadmsgdispatcher_mcguffin
                     msgqueue_obj &queue_arg)
    {
        msgqueue_auto msgqueue=*queue_arg;

        threadmsgdispatcher_mcguffin=nullptr;

        // ...
	  

The msgqueue_obj class is inherited from x::threadmsgdispatcherObj, and is just a reference-counted wrapper for msgqueue_auto. In this case, it gets passed as a parameter to the execution thread's run() via x::start_threadmsgdispatcher().

With this approach, it is possible to invoke the new object's public sendevent()s before the execution thread gets started. Since the message queue exists, sendevent()s obediently fills it with messages even if the actual execution thread isn't running yet. After x::start_threadmsgdispatcher-ing, the parent execution thread gets rid of its queue_arg, so when the execution thread eventually terminates, that destroys the only reference to the message queue, preserving the expected behavior of destroying the message queue together with its execution thread.

x::threadmsgdispatcherObj::msgqueueptr_t msgqueue=clinstance->get_msgqueue();

if (!msgqueue.null())
{
    auto eventfd=msgqueue->get_eventfd();

    // ...
}
	  

The overall design pattern (with some variations), is that the internal message queue gets created and destroyed together with the execution thread. This is done (again with some variations) by constructing the message queue in automatic scope in the run() method.

Other class methods can use get_msgqueue() (inherited from x::threadmsgdispatcherObj). It returns an x::ptr to the message queue. Private class methods can safely assume that the message queue exists, since it exists while the execution thread is running, and the private class methods are presumably called only by the execution thread.

get_msgqueue() itself is thread-safe, and can be called by other execution threads. get_msgqueue() returns a null x::ptr if the message queue does not exist because the execution thread is not running.

Auxiliary message queues

A message-based thread class can have one or more auxiliary message queues. This is done by declaring a active_queue_t class name, and adding an additional msgqueue_auto declaration to the thread's run() method.

class myThreadObj : public x::threadmsgdispatcherObj {

    LOG_CLASS_SCOPE;

    // ...

    active_queue_t second_queue;

public:

    // ...
};


void myThreadObj::run(x::ptr<x::obj> &threadmsgdispatcher_mcguffin)
{
    msgqueue_auto msgqueue(this);
    msgqueue_auto msgqueue(this, second_queue);

    threadmsgdispatcher_mcguffin=nullptr;

    // ...
}

The first msgqueue_auto declaration constructs the main message dispatching queue, as usual. The second declaration constructs the auxiliary message queue. This should be done before releasing the startup mcguffin. The resulting auxiliary message queue has the same lifetime scope and semantics as the main thread message dispatching queue:

  • The auxiliary queue exists as long as the execution thread is running, and gets destroyed when the execution thread stops.

  • Any unprocessed messages in the auxiliary queue get destroyed when the auxiliary queue goes out of scope and gets destroyed.

void myThreadObj::queue_notify(int flag)
{
     this->sendeventaux(second_queue, &myThreadObj::dispatch_statusinquiry,
                        this, format);
}

void myThreadObj::process_notifications()
{
     this->process_events(second_queue);
}

sendeventaux()'s first parameter is an auxiliary message queue, the remaining parameters are the same as sendevent()'s. The resulting message does not immediately get processed by the execution thread. It gets added to the auxiliary queue.

process_events() takes any and all messages on the auxiliary queue, and moves them to the main message dispatching queue. The execution thread, if it's running should then proceed and immediately dispatch the messages.

If the execution thread is not running, sendeventaux() and process_events() return without taking any actions.