stasher::manager
->manage_subscription
(): a managed subscription#include <iostream> #include <stasher/client.H> #include <stasher/manager.H> #include <stasher/managedsubscriber.H> #include <x/mpobj.H> #include <queue> class mySubscriberObj : public stasher::managedsubscriberObj { public: mySubscriberObj() {} ~mySubscriberObj() { } typedef x::mpcobj<std::queue<std::string> > queue_t; queue_t queue; // Invoked when the connection gets established, or breaks. void connection_update(stasher::req_stat_t status) override { std::cout << ("Connection update: " + x::tostring(status) + "\n") << std::flush; } void updated(const std::string &objname, const std::string &suffix) override { queue_t::lock lock(queue); lock->push(objname+suffix); lock.notify_one(); } std::string get() { queue_t::lock lock(queue); while (lock->empty()) lock.wait(); std::string s=lock->front(); lock->pop(); return s; } }; void simplesubscribe(int argc, char **argv) { if (argc < 2) throw EXCEPTION("Usage: simplesubscribe {object}+"); stasher::client client=stasher::client::base::connect(); auto manager=stasher::manager::create(L"", "10 seconds"); auto subscriber=x::ref<mySubscriberObj>::create(); std::list<x::ref<x::obj> > mcguffins; for (int i=1; i<argc; ++i) { mcguffins.push_back(manager->manage_subscription(client, argv[i], subscriber)); } while (1) { std::string object=subscriber->get(); stasher::client::base::getreq req =stasher::client::base::getreq::create(); req->objects.insert(object); req->openobjects=true; stasher::contents contents=client->get(req)->objects; if (!contents->succeeded) throw EXCEPTION(contents->errmsg); auto iter=contents->find(object); if (iter == contents->end()) { std::cout << object << " removed" << std::endl; continue; } std::string line; std::getline(*iter->second.fd->getistream(), line); if (line == "stop") break; std::cout << object << ": " << line << std::endl; } } int main(int argc, char **argv) { try { simplesubscribe(argc, argv); } catch (const x::exception &e) { std::cerr << e << std::endl; exit(1); } return 0; }
A
()
gets cancelled if the client connection to the server drops, for some
reason. An application needs to detect that using the subscription's
cancellation mcguffin, and make periodic resubscription attempts.
As described in the section called “A simple C++ client: getting and modifying objects”, when a connection with
the stasher server breaks, the next request results in an automatic attempt
to reconnect with the server.
stasher::client
->subscribe
The manager object takes care of that. When a managed subscriber's
connection breaks, the manager object makes periodic attempts to
resubscribe.
To use a managed subscription, subclass
stasher::managedsubscriberObj
instead of stasher::client::base::subscriberObj
,
and implement updated
() and
connection_update
(), then
invoke
().
The first parameter to stasher::manager
->manage_subscriptionmanage_subscription
() is the
client connection handle, the second parameter is the name of an object
or a hierarchy to subscribe to, and the third parameter is an
x::ref
to an instance of the
stasher::managedsubscriberObj
subclass.
The same subscriber object can be a callback for multiple subscriptions, or each subscription can have a separate callback object instance. For each subscription, the manager holds a strong reference on the callback object, and its client connection.
There is no explicit un-manage method.
manage_subscription
() returns a mcguffin.
The manager keeps a strong reference on the subscriber callback object,
and the client connection handle, as long as the manager object and the
mcguffin exists. The manager stops managing the
subscription when the last reference to the mcguffin goes out of
scope and gets destroyed.
The updated
() callback works just like it does
with
().
A new method,
stasher::client
->subscribeconnection_update
(), gets called
with a stasher::req_processed_stat
when the
subscription gets opened. This happens shortly after,
or maybe even prior to,
manage_subscription
() returning to the caller.
When then connection with the server disconnects, for any reason,
connection_update
() gets called
with a stasher::req_disconnected_stat
.
connection_update
() gets called with
a stasher::req_processed_stat
again when the manager
reestablishes a subscription after reconnecting with the server.
Other status values for connection_update
()
indicate an error with opening the subscription.
$ ./updatethread fruits/ ""
Connection update: Transaction/request processed
Connection update: Transaction/request processed
pi: 3.1415926
fruits/apple removed
Connection update: Connection to the server failed
Connection update: Connection to the server failed
Connection update: Transaction/request processed
Connection update: Transaction/request processed
fruits/grape: juicy
This example subscribes to two hierarchies, “” (the top level
hierarchy), and the “fruits” hierarchy (note the trailing
“/”).
Both of them log a
connection_update(
.
After both subscriptions are opened, one object gets added or updated in
the top level hierarchy (“pi”), and one object gets removed
from “fruits”. The connection with the server drops
afterwards
stasher::req_processed_stat
)connection_update(
,
and the connection manager reestablishes it some time later
stasher::req_disconnected_stat
)connection_update(
.
After the connection gets reestablished, the
stasher::req_processed_stat
)fruits/grape
objects gets created or updated.
The connection manager tries to reestablish the connection and the
subscription periodically. Application may see additional
connection_update(
before a
stasher::req_disconnected_stat
)connection_update(
.
stasher::req_processed_stat
)
These callbacks have certain limitations, see the section called “What asynchronous C++ API methods can and cannot do” for more information. Each managed subscription counts towards the client's connection limits.
After a subscription gets opened or reopened,
stasher::managedsubscriberObj
's
updated
() gets called only to report subsequent
changes to the object (if the subscription was for an individual object);
or to some object in the hierarchy (if the subscription was for a
hierarchy) from that point going forward.
Particularly, changes could've occured between a
connection_update(
and a
stasher::req_disconnected_stat
)connection_update(
.
stasher::req_processed_stat
)
An application should respond to a
connection_update(
by retrieving the contents of the subscribed object, or the hierarchy,
at that time.
In the example in this chapter, this can be done by having
stasher::req_processed_stat
)connection_update(
put the subscriber-to object's name into the queue. Note that
this requires
using a separate, individual subscriber class instance for each object
instead of a single class instance for all subscriptions, because each one
must know which object it is for. This results in the main execution
thread using the same code to check for the object's current value,
and refresh it.
stasher::req_processed_stat
)
But it's easier to use
manage_object
(),
for individual objects, or
manage_hierarchymonitor
().