stasher::client
->subscribe
(): object subscriptions#include <iostream> #include <stasher/client.H> #include <x/mpobj.H> #include <queue> class mySubscriberObj : public stasher::client::base::subscriberObj { public: mySubscriberObj() {} ~mySubscriberObj() { } typedef x::mpcobj<std::queue<std::string> > queue_t; queue_t queue; void updated(const std::string &objname, const std::string &suffix) override { queue_t::lock lock(queue); lock->push(objname+suffix); lock.notify_one(); } std::string get() { queue_t::lock lock(queue); while (lock->empty()) lock.wait(); std::string s=lock->front(); lock->pop(); return s; } }; void simplesubscribe(int argc, char **argv) { if (argc < 2) throw EXCEPTION("Usage: simplesubscribe {object}+"); stasher::client client=stasher::client::base::connect(); auto subscriber=x::ref<mySubscriberObj>::create(); std::list<x::ref<x::obj> > mcguffins; for (int i=1; i<argc; ++i) { stasher::subscriberesults res=client->subscribe(argv[i], subscriber); if (res->status != stasher::req_processed_stat) throw EXCEPTION(x::tostring(res->status)); mcguffins.push_back(res->mcguffin); auto cancel_mcguffin=res->cancel_mcguffin; // NOT USED std::cout << "Subscribed to " << argv[i] << std::endl; } while (1) { std::string object=subscriber->get(); stasher::client::base::getreq req =stasher::client::base::getreq::create(); req->objects.insert(object); req->openobjects=true; stasher::contents contents=client->get(req)->objects; if (!contents->succeeded) throw EXCEPTION(contents->errmsg); auto iter=contents->find(object); if (iter == contents->end()) { std::cout << object << " removed" << std::endl; continue; } std::string line; std::getline(*iter->second.fd->getistream(), line); if (line == "stop") break; std::cout << object << ": " << line << std::endl; } } int main(int argc, char **argv) { try { simplesubscribe(argc, argv); } catch (const x::exception &e) { std::cerr << e << std::endl; exit(1); } return 0; }
This is an example of a subscription. A subscription provides a notification mechanism when an object in the repository changes. This is done by installing a callback that gets invoked whenever the subscribed object gets created, updated, or deleted (a subscription can get installed for an object that does not exist at the time of the subscription).
It's possible to open more than one subscription, for different objects, at a time. This example subscribes to the list of objects given on the command line. Note, however that the server imposes a maximum limit on the number of opened subscriptions (this is not checked in this example).
The first parameter to
()
is the name of the object to open a subscription for. The second parameter
is an stasher::client
->subscribex::ref
to a subclass of
stasher::client::base::subscriberObj
that implements a updated
() method as shown
in this example.
After a subscription gets opened, updated
() gets
invoked every time the object named by
the first parameter to subscribe
() changes
– it gets created, updated, or deleted.
Passing an object name with a trailing “/” subscribes to an
entire object hierarchy, with a special notation of “”
referering to the top level hierarchy.
Example: opening a subscription for “constants” invokes
updated
() any time the object named
“constants” gets created, updated, or deleted.
Opening a subscription for “constants/” invokes
updated
() any time an object named
“constants/e”, “constants/pi”, or any
“constants/anything
” gets
created, updated, or deleted.
This also includes “constants/imaginary/i”, but not the
“constants” object itself. Opening a subscription for
“” invokes updated
() for any
object in the top level hierarchy, like
“e”, “pi”.
A subscription to a hierarchy counts as one subscription towards the connection limits.
()
returns a stasher::client
->subscribestasher::subscriberesults
, which is an
x::ref
to a reference-counted object with
the following members:
status
The subscription status status,
with stasher::req_processed_stat
when the
subscription is succesfully opened.
mcguffin
A mcguffin representing the open subscription. There is no formal unsubscribe(), rather than subscription remains open as long as the mcguffin exists. Stopping the subscription involves simply letting the mcguffin go out of scope and get destroyed.
The above example puts each subscription's mcguffin on a list kept
on the stack, so that the subscription remains in effect until
the simplesubscribe
() function returns.
While a subscription remains open, the client object holds a
strong reference on the subscriber callback. When the mcguffin
goes out of scope and gets destroyed, if the client connection thread
is in the middle of invoking the
updated
() callback, or is busy with
something else at the moment, there may be a slight delay before
the subscription gets wrapped up, and the connection thread
releases its reference on the subscriber callback object; and it's
remotely possible that updated
()
can get invoked at the same time, or just after, the mcguffin
goes out of scope and gets destroyed.
cancel_mcguffin
The flip side of the coin. This is a mcguffin that's owned by the client connection thread. When the subscription gets closed, for any reason, the client connection thread releases its reference on the mcguffin.
It's possible that an open subscription can get closed even before its mcguffin goes out of scope and gets destroyed, and the cancellation mcguffin provides the means for detecting this situation. The normal sequence of events when the subscription gets closed goes like this;
The application destroys the mcguffin.
The client connection thread stops the subscription.
The client connection thread releases its reference on the cancellation mcguffin and the subscriber callback object.
The last step also happens spontaneously in the event that the
client connection thread's connection to the server breaks for any reason. The
subscription does not get reopened automatically, when a new
client connection thread reconnects to the stasher server. Attaching a destructor
callback to the cancellation mcguffin (and releasing the reference
on the cancellation mcguffin, and the
stasher::subscriberesults
object) provides the
means for detecting and handling this situation.
The cancellation mcguffin's destructor callbacks also gets invoked when the subscription gets closed in the regular way, by destroying the subscription mcguffin.
The above example, prints the names of the created/updated/deleted objects until any object's contents are set to a single line with the word "stop".
updated
() callback
updated
() receives two
std::string
s as parameters.
The first parameter is always the same as the original parameter
to subscribe
(). For an open subscription for
a single object the second parameter is always an empty string.
For an open subscription for an object hierarchym, the second parameter
is the name of the object in the subscribed hierarchy that got created,
updated, or deleted.
Note that concatenating the two together gets the complete name of
the affected object, in all situations. The same subscriber callback
can get installed for multiple open subscriptions, and the
parameters to updated
() indicate which specific
object the callback gets invoked for.
The callback gets invoked by the client connection thread, so it should not engage in any
lengthy, prolonged activities, nor throw any exceptions. Furthermore,
it should not invoke stasher::client
methods itself.
Because it's the actual client connection thread, the outcome of such a lapse in judgement
is typically a thread deadlock. In this example, the callback puts the
full name of the object into a queue. The main thread keeps reading
the name of each object from the queue, and invokes
get
()
to retrieve the object's contents.
It's possible that the same object gets updated two times in a row, very quickly; and the second update completes before the application has an opportunity to respond to the first callback, and send the request for the object's new contents. The application ends up missing the first update, and gets the value of the object after the second update. The application then responds to the second update notification it receives, invoking the callback; then ends up getting the same value, the object's value after the second update.
Here, an “update” refers to the object getting created, updated, or deleted. Both the first and the subsequent “update” included. An application must be prepared to handle this possibility.
The limitations on callback methods described later, in the section called “What asynchronous C++ API methods can and cannot do”, apply to these callbacks too.
Some parts of the “etc” hierarchy in the root hierarchy
namespace, namely “etc/done” are for stasher's internal
use. The updated
() callback for this
hierarchy gets suppressed internally.