stasher::versionedptr<classptr
>
: associate a mcguffin with each object instance#include <x/ptr.H> #include <stasher/versionedptr.H> class inventoryObj; typedef x::ptr<inventoryObj> inventoryptr; stasher::versionedptr<inventoryptr> instock; // ... instock=inventoryptr::create(); if (instock.null()) { // ... } instock->method(); inventoryptr p=instock; // ... instock.getversion()->addOnDestroy( // ... );
stasher::versionedptr
's template parameter is an
x::ptr
.
The stasher::versionedptr
constains an instance of the x::ptr
.
null
(), the *
() and
->
() operators get forwarded to the
x::ptr
instance.
stasher::versionedptr
defines a
const x::ptr<classptr> &
operator,
and an assignment operator that replaces the x::ptr
.
Additionally, stasher::versionedptr
contains a
mcguffin object that gets replaced by the assignment operator.
Assigning any x::ptr
, including a
nullptr
, to a
stasher::versionedptr
creates a new mcguffin, which
replaces the existing one.
getversion
() returns the current
x::ptr
's mcguffin.
This is slightly different than using the object itself, that the
x::ptr
refers to, as a mcguffin.
This is because a mcguffin gets created even for a
nullptr
. Any assignment operation creates a new
mcguffin object, and the stasher::versionedptr
releases its reference on the previous mcguffin. This includes replacing
a nullptr
with another nullptr
, too.
The stasher::versionedptr
itself employs no locking,
and is not thread safe. Locking and mutex exclusion of an
stasher::versionedptr
must be separately implemented.
Conveniently, an
stasher::current<
will be happy to do it.
classptr
>
stasher::current<
implements managed object callbacks
for classptr
>
s,
where classptr
is an
classptr
x::ptr
to a class.
The callbacks construct a new
, which is
a classptr
nullptr
if the object was removed from the
objet repository.
Otherwise, the file descriptor of the object, and its
uuid, get passed to the object class's constructor, and the new
object instance becomes the
.
The resulting classptr
gets stored in classptr
stasher::current
's
current_value
member, which is a mutex protected
object that contains value
, an instance of the
classptr
that represents the current value
of the object in the object repository.
It's a nullable reference pointer because it can be a
nullptr
in the event that the object gets removed
from the object repository, or if it never existed in the first place.
It is also null
immediately after the
versioned current object gets instantiated, and before the current
contents of the object is retrieved from the object repository
The
stasher::current<
template class has an optional second template parameter, which defaults
to the classptr
>classptr
. If specified,
the second template parameter declares the class of
current_value.value
.
Giving a
“stasher::versionedptr<
”
for the second template parameter results in a
“versioned current object”.
Using a
classptr
>stasher::versionedptr
results in each update to the object's value installing an associated
mcguffin, replacing the previous value's mcguffin.
The examples in this part use a warehouseObj
object which contains a map of individual warehouses objects, keyed by
the name of the object in the stasher repository. The value of the map
is a versioned current object:
#ifndef warehouses_H #define warehouses_H #include <stasher/current.H> #include <stasher/client.H> #include <stasher/manager.H> #include "inventory.H" class warehousesObj; typedef x::ref<warehousesObj> warehouses_t; // A warehouse is a versioned current object. typedef stasher::current<inventoryptr, stasher::versionedptr<inventoryptr> > warehouse_t; // All of our warehouses are here. class warehousesObj : virtual public x::obj { public: warehousesObj() {} ~warehousesObj() {} // Warehouse, by name. std::map<std::string, warehouse_t> warehouses; // When parsing command line options, create a warehouse dynamically, // from the command line list. void createwarehouse(const std::string &s) { if (warehouses.find(s) == warehouses.end()) warehouses.insert(std::make_pair(s, warehouse_t ::create())); } void inventory() { for (auto &warehouse:warehouses) { std::cout << warehouse.first << ":" << std::endl; warehouse_t::base::current_value_t::lock lock(warehouse.second->current_value); if (lock->value.null()) { std::cout << " (none)" << std::endl; continue; } std::cout << " " << std::setw(30) << std::left << "Item" << " " << std::setw(8) << std::right << "Count" << std::setw(0) << std::endl; std::cout << " " << std::setfill('-') << std::setw(30) << "" << " " << std::setw(8) << "" << std::setw(0) << std::setfill(' ') << std::endl; for (auto &item:lock->value->stock) { std::cout << " " << std::setw(30) << std::left << item.first << " " << std::setw(8) << std::right << item.second << std::setw(0) << std::endl; } std::cout << std::setw(75) << std::setfill('=') << "" << std::setw(0) << std::setfill(' ') << std::endl; } } }; // Create the managed object subscriptions, and wait for the objects to get // loaded from the database. void load_warehouses(// Our warehouses const warehouses_t &warehouses, // The client const stasher::client &client, // The connection manager const stasher::manager &manager, // Mcguffins for managed object subscriptions are placed here std::list<x::ref<x::obj> > &mcguffins); #endif
One situation where a
stasher::versionedptr
's mcguffin comes in handy
is
load_warehouses
().
This function sets up current objects'
subscriptions, and waits until the application receives the initial
value of each object. load_warehouses
() gets called
after the map container of all warehouses gets initialized.
load_warehouses
() invokes each current object's
manage
().
Before opening a subscription, a destructor callback gets attached to
the version mcguffin for each current object's initial value.
As previously described,
the initial value of each object does not get retrieved immediately,
so load_warehouses
() waits until it happens.
Once the current value of each object is retrieved, the new version
mcguffin replaces the previous one. When all previous versions of all
objects get replaced, the destructor callback itself goes out of scope
and gets destroyed, so load_warehouses
() only needs
to wait until that happens, using a destructor guard. See LIBCXX for
more information.
If there was a problem establishing the subscription, the initial value
of an object does not get installed. To check for this possibility,
load_warehouses
() waits until each current object's
connection_update
() method gets called with
anything other that stasher::req_disconnected_stat
.
If the status is anything other than
stasher::req_processed_stat
this is a subscription
error. Once all subscriptions have a
stasher::req_processed_stat
,
load_warehouses
() waits for the destructor callback
to go out of scope and gets destroyed, and everything is ready:
#include <x/destroycallbackflag.H> #include <x/destroycallback.H> #include "warehouses.H" void load_warehouses(// Our warehouses const warehouses_t &warehouses, // The client const stasher::client &client, // The connection manager const stasher::manager &manager, // Mcguffins for managed object subscriptions are placed here std::list<x::ref<x::obj> > &mcguffins) { // We'll attach a destructor callback to the warehouses' current // version mcguffins. // // Each warehouse is constructed, and it has a value. It's a null // inventoryptr, of course. When the subscription gets opened and // managed, the current object in the repository gets loaded, and // the warehouse object initialized with that object. So the original // null inventoryptr gets replaced, even if the object does not exist // in the repository -- in that case a null inventoryptr gets // symbolically placed in current_value.value. // // So, in either case, the version mcguffin of the initial null // inventoryptr is going to get destroyed. // // Attach a dummy destructor callback to each warehouse's initial // version mcguffin. Then wait for the dummy destructor to get // destroyed. A mcguffin with destructor callbacks keeps a strong // reference on the callbacks, until the object gets destroyed, and the // callbacks get invoked. All the warehouses get this destructor // callback attached to them, which does nothing. When all of them get // destroyed, then this destructor mcguffin gets destroyed. So, we wait // wait for the destructor callback itself to get destroyed. x::destroyCallbackFlag::base::guard guard; auto destructor_cb=x::destroyCallback::create(); for (auto &warehouse:warehouses->warehouses) { warehouse_t::base::current_value_t::lock lock(warehouse.second->current_value); lock->value.getversion()->addOnDestroy(destructor_cb); mcguffins.push_back(warehouse.second->manage(manager, client, warehouse.first)); } std::cout << "Waiting for objects to get loaded" << std::endl; // We want to make sure that the subscriptions were set up. Wait for // the status to be anything other than req_disconnected_stat. for (auto &warehouse:warehouses->warehouses) { warehouse_t::base::current_value_t::lock lock(warehouse.second->current_value); lock.wait([&lock] { return lock->connection_status != stasher::req_disconnected_stat; }); if (lock->connection_status != stasher::req_processed_stat) // Some kind of a subscription error throw EXCEPTION(warehouse.first + ": " + x::tostring(lock->connection_status)); } // Ok, just because everyone connected, doesn't mean that all the // objects have been loaded yet. guard(destructor_cb); }
#include <x/destroycallback.H> #include <x/destroycallbackflag.H> #include <stasher/client.H> #include <stasher/manager.H> #include <stasher/current.H> #include <stasher/versionedptr.H> #include <stasher/process_request.H> #include "inventory.H" #include "warehouses.H" #include <sstream> #include <list> #include <map> #include <type_traits> // Inventory adjustment // The parameters to adjinventory are groups of three: // name, what, howmany. // // "name" gives the name of an inventoryptr in the object repository. // "what" is the inventory item's name. "howmany" is how many of "what" to be // adjusted in the "name" inventory object. // // Note that the number of inventory objects, called warehouses here, that // can be processed is limited by the maximum number of subscriptions, however // there's no limit (within reason) as to the actual number of adjustments to // same or different inventory items in different warehouses. The // same warehouse may have multiple adjustments of the same or different item. // // All adjustments get processed at the same time. There is no guarantee of the // order of adjustments, so some of them may be rejected due to insufficient // inventory in the warehouse (even if another adjustment was given of the same // inventory item in the same warehouse, that would've increased the inventory // level sufficiently). class adjinfoObj; typedef x::ref<adjinfoObj> adjinfo; void do_adjustments(const stasher::client &client, const warehouses_t &warehouses, std::list<adjinfo> &adjs); // Information about a transfer: the name of the warehouse, // what it is, and how many of them. class adjinfoObj : virtual public x::obj { public: std::string name; std::string what; int howmuch; // The processing status gets placed here stasher::req_stat_t processed; adjinfoObj(const std::string &nameArg, const std::string &whatArg, int howmuchArg) : name(nameArg), what(whatArg), howmuch(howmuchArg) { } ~adjinfoObj() { } std::string descr() const { std::ostringstream o; o << "adjust " << what << " by " << howmuch << " in " << name; return o.str(); } }; void adjinventory(int argc, char **argv) { // The list of transfers parsed from the command line. std::list<adjinfo> adjs; // All the warehouses elicited from the transfers, combined: warehouses_t warehouses=warehouses_t::create(); // Parse command line options. for (int i=1; i+2 < argc; i += 3) { int n=0; std::istringstream(argv[i+2]) >> n; auto adjinfo=adjinfo::create(argv[i], argv[i+1], n); warehouses->createwarehouse(adjinfo->name); adjs.push_back(adjinfo); } auto client=stasher::client::base::connect(); auto manager=stasher::manager::create(); // Load the existing inventory, start subscriptions std::list<x::ref<x::obj> > mcguffins; load_warehouses(warehouses, client, manager, mcguffins); std::cout << "Existing inventory:" << std::endl; warehouses->inventory(); // Perform the transfers. If any of them where req_rejected_stat-ed, // repeat them. do { // Submit the transfers, wait for them to get processed. do_adjustments(client, warehouses, adjs); for (auto b=adjs.begin(), e=adjs.end(), p=b; b != e; ) { p=b; ++b; if ((*p)->processed != stasher::req_rejected_stat) adjs.erase(p); } } while (!adjs.empty()); } // Helper object to invoke a functor, that takes a stasher::putresults as an // argument, from a destructor callback. template<typename Functor> class invokeProcessedFunctorObj : public x::destroyCallbackObj { public: Functor f; stasher::putresults res; template<typename FunctorArg> invokeProcessedFunctorObj(FunctorArg && fArg, const stasher::putresults &resArg) : f(std::forward<FunctorArg>(fArg)), res(resArg) { } ~invokeProcessedFunctorObj() { } void destroyed() noexcept { f(res); } }; // Lock the inventory object in a warehouse. Pass the inventory object to a // functor. The functor is expected to return a transactionptr object. If it's // not null, the functor doesn't want to do a transaction. // // Returns a std::pair of the transactionptr object, and the current inventory // object's version mcguffin. template<typename Functor> std::pair<stasher::client::base::transactionptr, x::ref<x::obj> > make_transaction(const warehouse_t &warehouse, Functor &&f) { warehouse_t::base::current_value_t::lock lock(warehouse->current_value); return std::make_pair(f((const inventoryptr &)lock->value), lock->value.getversion()); } // The first functor gets passed to make_transaction. The returned transaction // is submitted to the object repository. The second functor gets invoked with // the transaction's stasher::putresults as an argument. // // If the transaction fails with a req_rejected_stat, the functor gets called // only after the original inventory object has already been updated with // the new value of the inventory object in the object repository. This is done // by invoking it as part of the destructor callback of the original object's // version mcguffin. This is an immediate process if the inventory object has // already been updated, because our reference would be the last one on the // mcguffin, so its destructor callback will get invoked immediately, after // the destructor callback gets installed, and everything goes out of scope. template<typename MakeTransactionFunctor, typename ProcessTransactionFunctor> void do_adjustment(const stasher::client &client, const warehouse_t &warehouse, MakeTransactionFunctor && make_tran, ProcessTransactionFunctor && process_tran) { auto transaction=make_transaction(warehouse, std::forward<MakeTransactionFunctor> (make_tran)); if (transaction.first.null()) return; // The first functor did not want to do it. auto mcguffin=transaction.second; stasher::process_request(client->put_request(transaction.first), [process_tran, mcguffin] (const stasher::putresults &res) { auto callback=x::ref< invokeProcessedFunctorObj <ProcessTransactionFunctor> >::create(std::move (process_tran), res); if (res->status == stasher::req_rejected_stat) { mcguffin->addOnDestroy (callback); return; } // Invoke the functor now. callback->destroyed(); }); } // Create a transaction that adjusts the inventory // This is called by the first functor in do_adjustment(). stasher::client::base::transactionptr adjust_inventory(const inventoryptr &ptr, const adjinfo &adj) { inventoryptr i; if (adj->howmuch == 0) { std::cout << "Very funny: " + adj->descr() + "\n" << std::flush; return stasher::client::base::transactionptr(); } if (ptr.null()) { // No existing inventory object if (adj->howmuch < 0) { std::cout << "Insufficient stock: " + adj->descr() + "\n" << std::flush; return stasher::client::base::transactionptr(); } i=inventory::create(); i->stock[adj->what]=adj->howmuch; } else { i=inventoryptr::create(*ptr); // Clone the object inventoryObj &newi= *i; // Find existing item. Make it 0, if it does not exist. auto iter=newi.stock.find(adj->what); if (iter == newi.stock.end()) iter=newi.stock.insert(std::make_pair(adj->what, 0)) .first; if (adj->howmuch < 0 && -iter->second > adj->howmuch) { std::cout << "Insufficient stock: " + adj->descr() + "\n" << std::flush; return stasher::client::base::transactionptr(); } iter->second += adj->howmuch; // Delete 0-stocked item. If this is the last item in the // inventory, delete the entire object. if (iter->second == 0) newi.stock.erase(iter); if (newi.stock.empty()) { auto tran=stasher::client::base::transaction::create(); tran->delobj(adj->name, ptr->uuid); return tran; } } // Serialize the new inventory object, then insert or update it. std::string s; typedef std::back_insert_iterator<std::string> insert_iter_t; insert_iter_t insert_iter(s); x::serialize::iterator<insert_iter_t> ser_iter(insert_iter); i->serialize(ser_iter); auto tran=stasher::client::base::transaction::create(); if (ptr.null()) { // No existing object, insert it. tran->newobj(adj->name, s); } else { // Update it. tran->updobj(adj->name, ptr->uuid, s); } return tran; } void do_adjustments(const stasher::client &client, const warehouses_t &warehouses, std::list<adjinfo> &adjs) { // Each functor, below, captures this mcguffin by value. // Guard the mcguffin. When it is completely destroyed, it means that // all transactions have been processed. x::destroyCallbackFlag::base::guard guard; x::ref<x::obj> mcguffin=x::ref<x::obj>::create(); guard(mcguffin); for (auto &adj : adjs) { // Be optimistic. adj->processed=stasher::req_processed_stat; do_adjustment(client, warehouses->warehouses.find(adj->name) ->second, [adj] (const inventoryptr &ptr) { std::cout << "Adjusting: " + adj->descr() + "\n" << std::flush; return adjust_inventory(ptr, adj); }, [adj, mcguffin] (const stasher::putresults &res) { std::cout << x::tostring(res->status) + ": " + adj->descr() + "\n" << std::flush; adj->processed=res->status; }); } } int main(int argc, char **argv) { try { adjinventory(argc, argv); } catch (const x::exception &e) { std::cerr << e << std::endl; return 1; } return 0; }
This is an example of using stasher::versionedptr
to properly handle a stasher::req_rejected_stat
error.
adjinventory.C
takes a list of adjustments on the
command line. Each adjustment consists of a name of an inventory object,
the name of an inventory item, and an adjustment. All adjustments get
processed at once, and each adjustment that fails because of a
stasher::req_rejected_stat
gets tried again.
Here's what happens when there are two inventory objects, “east” with 10 bananas, and “west” with 10 apples, and four adjustments: reduce the number of bananas in east by two, add two bananas to west, reduce the number of apples in west by two, add two bananas to east:
$ ./adjinventory east bananas -2 west bananas 2 west apples -2 east apples 2
Waiting for objects to get loaded
Existing inventory:
east:
Item Count
------------------------------ --------
bananas 10
===========================================================================
west:
Item Count
------------------------------ --------
apples 10
===========================================================================
Adjusting: adjust bananas by -2 in east
Adjusting: adjust bananas by 2 in west
Adjusting: adjust apples by -2 in west
Adjusting: adjust apples by 2 in east
Transaction/request processed: adjust bananas by -2 in east
Transaction/request processed: adjust bananas by 2 in west
Collision detected - object uuid mismatch: adjust apples by -2 in west
Collision detected - object uuid mismatch: adjust apples by 2 in east
Adjusting: adjust apples by -2 in west
Adjusting: adjust apples by 2 in east
Transaction/request processed: adjust apples by -2 in west
Transaction/request processed: adjust apples by 2 in east
Here, make_transaction
() locks the current object
value. In this case, the object is a warehouse object, and the value is
an inventory object. make_transaction
() retrieves
the existing object's version mcguffin from
getversion()
, then creates a transaction
that updates this object.
Applications should not modify stasher::current
objects. As shown here, applications should create transactions and
process them, then leave it up to the managed
stasher::current
object to update itself
automatically, to reflect the updated object in the stasher object
repository. Specifically, the requirements are:
Invoking current object's manage
() method,
to open a subscription. The manager object and the subscription
mcguffin must remain in scope for the subscription to be active.
The versioned object, if it exists and is not a
nullptr
presumably holds the uuid of the
repository's object that it was constructed from.
The object's version mcguffin, and the transaction updating that object get used together.
Only under these circumstances, a contract exists in the event that the
transaction fails with a stasher::req_rejected_stat
that the versioned object is going to get updated by
the managed object subscription, and the version mcguffin will go out
of scope and get destroyed. The
stasher::req_rejected_stat
fail must be because
the object in the repository was simultaneously updated, but the
application has not yet received the updated object, and it's on the
way.
do_adjustment
() calls
make_transaction
() to prepare each transaction,
saves the original object's version mcguffin, then processes the
transaction, and invokes the
ProcessTransactionFunctor
with the transaction's results. If the result was a
stasher::req_rejected_stat
the functor gets invoked
from the destructor callback, otherwise it gets invoked immediately.
For simplicity, do_adjustment
() always constructs
a destructor callback that invokes the functor, but actually installs the
destructor callback on the version mcguffin only in the event of a
stasher::req_rejected_stat
; and invokes it itself in all
other cases.
The end result is that the functor that processes the results of the
transaction gets invoked with a stasher::req_rejected_stat
only after the original object got updated from the repository by an
updated object value, in the event of an update collission.
do_adjustments
() takes a list of all the adjustments
specified on the command line, and starts asynchronous transactions for
all of them, at the same time. As shown in the above example, when
multiple transactions affect the same object, they're likely to collide
(the actual output can vary, under heavy client or server load, due to
the natural variations that are inherent to multithreaded applications,
but the shown output is the typical result).
Finally, adjinventory
collects the results of all
transaction, and repeats the ones that failed with a
stasher::req_rejected_stat
.