stasher::versioned_put
: updating multiple versioned objectsExtending the process of updating a versioned current object to multiple versioned current objects is deceptively simple:
Lock each current object. Always use a specific locking order to prevent deadlocks against other threads.
Collect each object's version mcguffin, then create a transaction that updates the current objects.
If the transaction gets rejected with a
stasher::req_rejected_stat
wait until one of the
version mcguffins goes out of scope and gets destroyed, before
assessing what the next step should be (a new transaction, or
something else).
A stasher::req_rejected_stat
indicates that at least one
of the objects involved in the transaction was updated and is no
longer the current object in the repository. When a transaction updates
multiple objects, the stasher::req_rejected_stat
does
not identify which one it was; therefore, it's necessary to wait until
any one of them does.
This can be implemented manually by using the
onAnyDestroyed
() template, as described in
LIBCXX's documentation, but the following templates and classes implement
this more conveniently.
stasher::versionscollected
: a container for current object version mcguffins#include <stasher/versionscollected.H> typedef stasher::current<inventoryptr, stasher::versionedptr<inventoryptr> > warehouse_t; std::pair<stasher::client::base::transaction, stasher::versionscollected> create_versioned_put(const warehouse_t &a, const warehouse_t &b) { auto transaction=stasher::client::base::transaction::create(); auto versions=stasher::versionscollected::create(); warehouse_t::base::current_value_t::lock alock(a->current_value), block(b->current_value); versions->add(alock->value); versions->add(block->value); // create the transaction here... return std::make_pair(transaction, versions); }
stasher::versionscollected
is an x::ref
for a container of version mcguffins
of objects that form a transaction. The above example takes two
versioned current object,
and locks their current_value
s.
As noted previously, when locking multiple objects they must get locked
in a specific order, in order to avoid deadlocks; but this is outside
of the scope of this example, which presumes that the two
warehouse_t
s are different and always get given
in the right order.
After locking the current values of both objects, each
current_value.value
, which is a
stasher::versionedptr
, gets passed to
().
This method invokes stasher::versionscollected
->addstasher::versionedptr
's
getversion
() and stores a weak reference to the
version mcguffin in the stasher::versionscollected
.
stasher::versionscollected
does not need to store
a strong reference on the version mcguffin. Checking whether the
weakly-referenced mcguffin went out of scope and got destroyed is
sufficient for the purpose of determining whether the versioned object
was updated already.
For convenience
create_from
() takes
a variadic list of stasher::versionedptr
as
parameters:
stasher::versionscollected versions= stasher::versionscollected::base::create_from(alock->value, block->value);
This is equivalent to invoking
stasher::versionscollected::create
(), then
add
()ing
both stasher::versionedptr
s.
A
create_fromiter
()
takes a beginning and an ending input iterator over
stasher::versionedptr
s, then constructs a new
stasher::versionscollected
,
add
()ing
version mcguffins from the input sequence.
stasher::versioned_put
()
xferinventory.C
is a modified version of
adjinventory.C
that updates multiple versioned objects in the same transaction, and
shows how to handle stasher::req_rejected_stat
with stasher::versioned_put
():
#include <x/destroycallback.H> #include <x/destroycallbackflag.H> #include <stasher/client.H> #include <stasher/manager.H> #include <stasher/current.H> #include <stasher/versionedptr.H> #include <stasher/versionedput.H> #include "inventory.H" #include "warehouses.H" #include <sstream> #include <list> #include <map> // Inventory transfer. // The parameters to xferinventory are groups of four: // from, to, what, howmany. // // "from" and "to" are names of inventoryptr objects. // // "what" is the inventory name. "howmany" is how many of "what" to be // transferred from the "from" inventory object to the "to" one. // // Note that the number of inventory objects, called warehouses here, that // can be processed is limited by the maximum number of subscriptions, however // there's no limit (within reason) as to the actual number of transfers of // same or different inventory items between the different warehouses. The // same warehouse may be a source or a destination of multiple transfers. // // All transfers get processed at the same time. There is no guarantee of the // order of transfers, so some of them may be rejected due to insufficient // inventory in the "from" warehouse (even if there's another incoming transfer // of this inventory item into this warehouse from another warehouse, since the // transfer order is not guaranteed). // // This is an example of a versioned put, with an automatic retry in case of // a collission: all transfers get kicked off at the same time; when the same // warehouse is involved in multiple transfers, it's likely that multiple // updates of the same object go out together, and one of them is going to // get stasher::req_rejected_stat, in which case it's simply tried again. // The versioned put makes sure that for a req_rejected_stat transaction, at // least one participating object's version has changed. // Some forward declarations class xferinfoObj; typedef x::ref<xferinfoObj> xferinfo; int adjust(const inventoryptr &existing, const std::string &objectname, const std::string &what, int howmuch, const stasher::client::base::transaction &transaction); void do_transfers(const stasher::client &client, const warehouses_t &warehouses, std::list<xferinfo> &transfers); std::pair<std::string, int> do_adjust(const inventory &existing, const std::string &what, int howmuch); // Information about a transfer: the name of the warehouses where something // gets transferred from and to, what it is, and how many of them. class xferinfoObj : virtual public x::obj { public: std::string from, to; std::string what; int howmuch; // This is set by create_versioned_put if there's not enough // inventory to perform this transfer. bool isenough; // The processing status gets placed here stasher::req_stat_t processed; xferinfoObj(const std::string &fromArg, const std::string &toArg, const std::string &whatArg, int howmuchArg) : from(fromArg), to(toArg), what(whatArg), howmuch(howmuchArg), isenough(true) // Be optimistic { } ~xferinfoObj() { } std::string descr() const { std::ostringstream o; o << "transfer " << howmuch << " " << what << " from " << from << " to " << to; return o.str(); } }; //////////////////////////////////////////////////////////////////////////// // // Take our warehouses, and one transfer. // // Create a transaction effecting the transfer, and collect the original // versions of each warehouse's inventory that were used to build the // transaction that updates both warehouses' inventory objects. std::pair<stasher::client::base::transaction, stasher::versionscollected> create_versioned_put(const warehouses_t &warehouses, const xferinfo &xfer) { // Lock the from and the to warehouse. // Note: in this example, create_versioned_put() always gets called // from the same thread. If this were a multithreaded context, we would // have to lock xfer->from and xfer->to in alphabetical order, in // order to avoid a potential deadlock against a thread that's doing // a transfer in the opposite "direction" (can be the same or a // different inventory item). // // But, since this is a single thread, this is ok. The manager // updates each current object one at a time, so there's no // possibility of a deadlock. // // Doing this in alphabetical order would've been ugly. Thankfully, // I don't have to do it. warehouse_t::base::current_value_t::lock from(warehouses->warehouses.find(xfer->from) ->second->current_value); warehouse_t::base::current_value_t::lock to(warehouses->warehouses.find(xfer->to) ->second->current_value); // Create a transaction, and collect the current versions of the // objects that go into the transaction. auto transaction=stasher::client::base::transaction::create(); auto versions=stasher::versionscollected::base ::create_from(from->value, to->value); if (adjust(from->value, xfer->from, xfer->what, -xfer->howmuch, transaction) < 0) { // Not enough in the from inventory xfer->isenough=false; } else { adjust(to->value, xfer->to, xfer->what, xfer->howmuch, transaction); } return std::make_pair(transaction, versions); } // Apply a transfer to an inventory, and update the transaction object, // accordingly. // Returns the new inventory level of the selected item. int adjust(const inventoryptr &existing, const std::string &objectname, const std::string &what, int howmuch, const stasher::client::base::transaction &transaction) { if (existing.null()) { // New inventory object inventory dummy=inventory::create(); auto result=do_adjust(dummy, what, howmuch); if (result.first.size() == 0) { // Marginal: no inventory before and after. We give up. return result.second; } transaction->newobj(objectname, result.first); return result.second; } auto result=do_adjust(existing, what, howmuch); if (result.first.size() == 0) // Empty inventory! { transaction->delobj(objectname, existing->uuid); } else { transaction->updobj(objectname, existing->uuid, result.first); } return result.second; } // Ok, the task is now reduced to taking this inventory object, updating // the inventory level, and then serializing it back, and returning the // new level. std::pair<std::string, int> do_adjust(const inventory &existing, const std::string &what, int howmuch) { // Clone the object auto cpy=inventory::create(*existing); // Find this object in the inventory map. auto iter=cpy->stock.find(what); if (iter == cpy->stock.end()) { // Doesn't exist, create it. iter=cpy->stock.insert(std::make_pair(what, 0)).first; } iter->second += howmuch; std::pair<std::string, int> ret; ret.second=iter->second; // Inventory of 0 removes this item from the inventory, completely. if (ret.second == 0) cpy->stock.erase(iter); // Return an empty string if the inventory is empty. This results // in the object getting deleted. if (!cpy->stock.empty()) { typedef std::back_insert_iterator<std::string> insert_iter_t; insert_iter_t insert_iter(ret.first); x::serialize::iterator<insert_iter_t> ser_iter(insert_iter); cpy->serialize(ser_iter); } return ret; } void xferinventory(int argc, char **argv) { // The list of transfers parsed from the command line. std::list<xferinfo> transfers; // All the warehouses elicited from the transfers, combined: warehouses_t warehouses=warehouses_t::create(); // Parse command line options. for (int i=1; i+3 < argc; i += 4) { int n=0; std::istringstream(argv[i+3]) >> n; if (n <= 0) { std::cerr << "Eh?" << std::endl; return; } auto xferinfo=xferinfo::create(argv[i], argv[i+1], argv[i+2], n); warehouses->createwarehouse(xferinfo->from); warehouses->createwarehouse(xferinfo->to); transfers.push_back(xferinfo); } auto client=stasher::client::base::connect(); auto manager=stasher::manager::create(); // Load the existing inventory, start subscriptions std::list<x::ref<x::obj> > mcguffins; load_warehouses(warehouses, client, manager, mcguffins); std::cout << "Transfering between:" << std::endl; warehouses->inventory(); // Perform the transfers. If any of them where req_rejected_stat-ed, // repeat them. do { // Submit the transfers, wait for them to get processed. do_transfers(client, warehouses, transfers); for (auto b=transfers.begin(), e=transfers.end(), p=b; b != e; ) { p=b; ++b; if ((*p)->processed != stasher::req_rejected_stat) transfers.erase(p); } } while (!transfers.empty()); } void do_transfers(const stasher::client &client, const warehouses_t &warehouses, std::list<xferinfo> &transfers) { // We could very well call versioned_put() inside the first for loop. // But, for this example to demonstrate req_rejected_stat handling, // we must go out of our to cause a version conflict. // // We'll create all transactions before processing them. This way, // when there are multiple transactions that hit the same warehouse, // there will be a req_rejected_stat. class transaction_list { public: stasher::client::base::transaction tran; stasher::versionscollected vers; xferinfo xfer; transaction_list(const stasher::client::base::transaction &tranArg, const stasher::versionscollected &versArg, const xferinfo &xferArg) : tran(tranArg), vers(versArg), xfer(xferArg) { } }; std::vector<transaction_list> transactions; for (auto &transfer : transfers) { // Be optimistic. transfer->processed=stasher::req_processed_stat; if (transfer->from == transfer->to) { std::cout << "Very funny: " << transfer->descr() << std::endl; continue; } std::pair<stasher::client::base::transaction, stasher::versionscollected> transaction=create_versioned_put(warehouses, transfer); if (!transfer->isenough) { std::cout << "Insufficient inventory: " << transfer->descr() << std::endl; continue; } transactions.emplace_back(transaction.first, transaction.second, transfer); } for (auto &t : transactions) { std::cout << "Processing: " << t.xfer->descr() << std::endl; stasher::putresults res= stasher::versioned_put(client, t.tran, t.vers); std::cout << x::tostring(res->status) << ": " << t.xfer->descr() << std::endl; t.xfer->processed=res->status; } } int main(int argc, char **argv) { try { xferinventory(argc, argv); } catch (const x::exception &e) { std::cerr << e << std::endl; return 1; } return 0; }
xferinventory takes a list of transfers on the
command line. Each transfer consists of the name of the
“from” warehouse object, the name of the
“to” warehouse object, the name of an inventory item, and
a positive count.
xferinventory updates both warehouse objects,
adjusting each one's inventory accordingly.
xferinventory accepts more than one transfer, and
does them all. Here's the output of transferring four bananas from
east
to west
, and four
apples from west
to east
:
$ ./xferinventory east west bananas 4 west east apples 4
Waiting for objects to get loaded
Transfering between:
east:
Item Count
------------------------------ --------
apples 2
bananas 8
===========================================================================
west:
Item Count
------------------------------ --------
apples 8
bananas 2
===========================================================================
Processing: transfer 4 bananas from east to west
Transaction/request processed: transfer 4 bananas from east to west
Processing: transfer 4 apples from west to east
Collision detected - object uuid mismatch: transfer 4 apples from west to east
Processing: transfer 4 apples from west to east
Transaction/request processed: transfer 4 apples from west to east
In xferinventory.C
,
create_versioned_put
() acquires a lock on both
warehouse objects, collects both objects' version mcguffins into a
stasher::versionscollected
, then prepares a
transaction to update those objects, and releases the lock.
do_transfers
() processes the transactions.
They all get purposefully created in advance, so that all transactions
get created for the initial contents of the warehouse objects.
stasher::versioned_put
() then processes each
transaction, one at a time. This guarantees a
stasher::req_rejected_stat
when the same warehouse is
a part of more than one transfer, as in this example.
stasher::versioned_put
() takes a
client connection handle, a transaction object, and
a stasher::versionscollected
.
This synchronous function waits for
the transaction to get processed, then returns its
stasher::putresults
.
Additionally, if the transaction's status is
stasher::req_rejected_stat
,
stasher::versioned_put
() waits until at least one of the
versioned mcguffins goes out of scope and gets destroyed, indicating that
its versioned object has been updated.
The above example simply proceeds and immediately reprocesses all
transfers that failed with a stasher::req_rejected_stat
,
using the updated contents of the warehouse objects as a new starting
point, for this go-around.
All current object value locks must be released before calling
stasher::versioned_put
(). The correct sequence of
events is:
Lock all current_value
s, in a consistent order,
to avoid deadlocks.
Collect each value's version mcguffin, into a
stasher::versionscollected
, and prepare
a transaction updating the same objects.
Release current_value
locks.
Call stasher::versioned_put
().
In the event of the collision,
stasher::versioned_put
() waits until one of the
versioned objects gets updated. The callbacks that update versioned
current objects acquire their own lock on their
current_value
, resulting in a deadlock unless
all locks get released prior to a
stasher::versioned_put
().
stasher::versioned_put_request
()
xferinventory2.C
is a version of
xferinventory.C
that demonstrates how to use
stasher::versioned_put_request
(),
an asynchronous version
of stasher::versioned_put
().
stasher::versioned_put
() takes
client connection handle, a transaction object, a functor, and
a stasher::versionscollected
.
The client connection handle, the transaction object, and the
stasher::versionscollected
serve the same purpose
as with stasher::versioned_put
().
stasher::versioned_put_request
()
returns immediately.
The functor gets invoked, with a
stasher::putresults
parameter, when the
transaction gets processed.
Just like with stasher::versioned_put
(),
if the transaction's status is
stasher::req_rejected_stat
,
the functor does not get invoked until at least
one of the versioned mcguffins goes out of scope and gets destroyed,
indicating that its versioned object has been updated.
$ ./xferinventory2 east west apples 2 east west bananas 1 west east apples 1 west east bananas 2
Waiting for objects to get loaded
Transfering between:
east:
Item Count
------------------------------ --------
apples 6
bananas 4
===========================================================================
west:
Item Count
------------------------------ --------
apples 4
bananas 6
===========================================================================
Processing: transfer 2 apples from east to west
Processing: transfer 1 bananas from east to west
Processing: transfer 1 apples from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 2 apples from east to west
Collision detected - object uuid mismatch: transfer 1 apples from west to east
Collision detected - object uuid mismatch: transfer 1 bananas from east to west
Collision detected - object uuid mismatch: transfer 2 bananas from west to east
Processing: transfer 1 bananas from east to west
Processing: transfer 1 apples from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 1 bananas from east to west
Collision detected - object uuid mismatch: transfer 1 apples from west to east
Collision detected - object uuid mismatch: transfer 2 bananas from west to east
Processing: transfer 1 apples from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 1 apples from west to east
Collision detected - object uuid mismatch: transfer 2 bananas from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 2 bananas from west to east
xferinventory2.C
does not prepare all the
transactions in advance, before processing them. Since they're
asynchronous,
xferinventory2.C
starts them all together, then
waits until they've all been processed, before redoing all the
stasher::req_rejected_stat
failures:
The requirement to release all versioned current object locks,
mentioned in
xferinventory.C
is slightly relaxed due to
stasher::versioned_put_request
()'s asynchronous
nature, as long as the locks are released shortly after
stasher::versioned_put_request
returns.
Because of that, two convenience templates are also available.
stasher::versioned_put_request_from
is an
alternative version where the last
stasher::versionscollected
parameter gets
replaced by a variadic list of
stasher::versionedptr
s (or something similar
that implements a suitable getversion
());
and stasher::versioned_put_request_fromiter
where
the last
stasher::versionscollected
parameter gets
replaced by a beginning and an ending iterator, over
stasher::versionedptr
s.
These alternative functions construct a
stasher::versionscollected
from
their custom parameters, then invoke
stasher::versioned_put_request
().