stasher::versioned_put: updating multiple versioned objectsExtending the process of updating a versioned current object to multiple versioned current objects is deceptively simple:
Lock each current object. Always use a specific locking order to prevent deadlocks against other threads.
Collect each object's version mcguffin, then create a transaction that updates the current objects.
	If the transaction gets rejected with a
	stasher::req_rejected_stat wait until one of the
	version mcguffins goes out of scope and gets destroyed, before
	assessing what the next step should be (a new transaction, or
	something else).
      
    A stasher::req_rejected_stat indicates that at least one
    of the objects involved in the transaction was updated and is no
    longer the current object in the repository. When a transaction updates
    multiple objects, the stasher::req_rejected_stat does
    not identify which one it was; therefore, it's necessary to wait until
    any one of them does.
  
    This can be implemented manually by using the
    onAnyDestroyed() template, as described in
    LIBCXX's documentation, but the following templates and classes implement
    this more conveniently.
  
stasher::versionscollected: a container for current object version mcguffins#include <stasher/versionscollected.H> typedef stasher::current<inventoryptr, stasher::versionedptr<inventoryptr> > warehouse_t; std::pair<stasher::client::base::transaction, stasher::versionscollected> create_versioned_put(const warehouse_t &a, const warehouse_t &b) { auto transaction=stasher::client::base::transaction::create(); auto versions=stasher::versionscollected::create(); warehouse_t::base::current_value_t::lock alock(a->current_value), block(b->current_value); versions->add(alock->value); versions->add(block->value); // create the transaction here... return std::make_pair(transaction, versions); }
      stasher::versionscollected
      is an x::ref for a container of version mcguffins
      of objects that form a transaction. The above example takes two
      versioned current object,
      and locks their current_values.
      As noted previously, when locking multiple objects they must get locked
      in a specific order, in order to avoid deadlocks; but this is outside
      of the scope of this example, which presumes that the two
      warehouse_ts are different and always get given
      in the right order.
    
      After locking the current values of both objects, each
      current_value.value, which is a
      stasher::versionedptr, gets passed to
      ().
      This method invokes stasher::versionscollected->addstasher::versionedptr's
      getversion() and stores a weak reference to the
      version mcguffin in the stasher::versionscollected.
      stasher::versionscollected does not need to store
      a strong reference on the version mcguffin. Checking whether the
      weakly-referenced mcguffin went out of scope and got destroyed is
      sufficient for the purpose of determining whether the versioned object
      was updated already.
    
      For convenience
      create_from() takes
      a variadic list of stasher::versionedptr as
      parameters:
    
stasher::versionscollected versions= stasher::versionscollected::base::create_from(alock->value, block->value);
      This is equivalent to invoking
      stasher::versionscollected::create(), then
      add()ing
      both stasher::versionedptrs.
    
      A
      create_fromiter()
      takes a beginning and an ending input iterator over
      stasher::versionedptrs, then constructs a new
      stasher::versionscollected,
      add()ing
      version mcguffins from the input sequence.
    
stasher::versioned_put()
      xferinventory.C is a modified version of
      adjinventory.C
      that updates multiple versioned objects in the same transaction, and
      shows how to handle stasher::req_rejected_stat
      with stasher::versioned_put():
    
#include <x/destroycallback.H> #include <x/destroycallbackflag.H> #include <stasher/client.H> #include <stasher/manager.H> #include <stasher/current.H> #include <stasher/versionedptr.H> #include <stasher/versionedput.H> #include "inventory.H" #include "warehouses.H" #include <sstream> #include <list> #include <map> // Inventory transfer. // The parameters to xferinventory are groups of four: // from, to, what, howmany. // // "from" and "to" are names of inventoryptr objects. // // "what" is the inventory name. "howmany" is how many of "what" to be // transferred from the "from" inventory object to the "to" one. // // Note that the number of inventory objects, called warehouses here, that // can be processed is limited by the maximum number of subscriptions, however // there's no limit (within reason) as to the actual number of transfers of // same or different inventory items between the different warehouses. The // same warehouse may be a source or a destination of multiple transfers. // // All transfers get processed at the same time. There is no guarantee of the // order of transfers, so some of them may be rejected due to insufficient // inventory in the "from" warehouse (even if there's another incoming transfer // of this inventory item into this warehouse from another warehouse, since the // transfer order is not guaranteed). // // This is an example of a versioned put, with an automatic retry in case of // a collission: all transfers get kicked off at the same time; when the same // warehouse is involved in multiple transfers, it's likely that multiple // updates of the same object go out together, and one of them is going to // get stasher::req_rejected_stat, in which case it's simply tried again. // The versioned put makes sure that for a req_rejected_stat transaction, at // least one participating object's version has changed. // Some forward declarations class xferinfoObj; typedef x::ref<xferinfoObj> xferinfo; int adjust(const inventoryptr &existing, const std::string &objectname, const std::string &what, int howmuch, const stasher::client::base::transaction &transaction); void do_transfers(const stasher::client &client, const warehouses_t &warehouses, std::list<xferinfo> &transfers); std::pair<std::string, int> do_adjust(const inventory &existing, const std::string &what, int howmuch); // Information about a transfer: the name of the warehouses where something // gets transferred from and to, what it is, and how many of them. class xferinfoObj : virtual public x::obj { public: std::string from, to; std::string what; int howmuch; // This is set by create_versioned_put if there's not enough // inventory to perform this transfer. bool isenough; // The processing status gets placed here stasher::req_stat_t processed; xferinfoObj(const std::string &fromArg, const std::string &toArg, const std::string &whatArg, int howmuchArg) : from(fromArg), to(toArg), what(whatArg), howmuch(howmuchArg), isenough(true) // Be optimistic { } ~xferinfoObj() { } std::string descr() const { std::ostringstream o; o << "transfer " << howmuch << " " << what << " from " << from << " to " << to; return o.str(); } }; //////////////////////////////////////////////////////////////////////////// // // Take our warehouses, and one transfer. // // Create a transaction effecting the transfer, and collect the original // versions of each warehouse's inventory that were used to build the // transaction that updates both warehouses' inventory objects. std::pair<stasher::client::base::transaction, stasher::versionscollected> create_versioned_put(const warehouses_t &warehouses, const xferinfo &xfer) { // Lock the from and the to warehouse. // Note: in this example, create_versioned_put() always gets called // from the same thread. If this were a multithreaded context, we would // have to lock xfer->from and xfer->to in alphabetical order, in // order to avoid a potential deadlock against a thread that's doing // a transfer in the opposite "direction" (can be the same or a // different inventory item). // // But, since this is a single thread, this is ok. The manager // updates each current object one at a time, so there's no // possibility of a deadlock. // // Doing this in alphabetical order would've been ugly. Thankfully, // I don't have to do it. warehouse_t::base::current_value_t::lock from(warehouses->warehouses.find(xfer->from) ->second->current_value); warehouse_t::base::current_value_t::lock to(warehouses->warehouses.find(xfer->to) ->second->current_value); // Create a transaction, and collect the current versions of the // objects that go into the transaction. auto transaction=stasher::client::base::transaction::create(); auto versions=stasher::versionscollected::base ::create_from(from->value, to->value); if (adjust(from->value, xfer->from, xfer->what, -xfer->howmuch, transaction) < 0) { // Not enough in the from inventory xfer->isenough=false; } else { adjust(to->value, xfer->to, xfer->what, xfer->howmuch, transaction); } return std::make_pair(transaction, versions); } // Apply a transfer to an inventory, and update the transaction object, // accordingly. // Returns the new inventory level of the selected item. int adjust(const inventoryptr &existing, const std::string &objectname, const std::string &what, int howmuch, const stasher::client::base::transaction &transaction) { if (existing.null()) { // New inventory object inventory dummy=inventory::create(); auto result=do_adjust(dummy, what, howmuch); if (result.first.size() == 0) { // Marginal: no inventory before and after. We give up. return result.second; } transaction->newobj(objectname, result.first); return result.second; } auto result=do_adjust(existing, what, howmuch); if (result.first.size() == 0) // Empty inventory! { transaction->delobj(objectname, existing->uuid); } else { transaction->updobj(objectname, existing->uuid, result.first); } return result.second; } // Ok, the task is now reduced to taking this inventory object, updating // the inventory level, and then serializing it back, and returning the // new level. std::pair<std::string, int> do_adjust(const inventory &existing, const std::string &what, int howmuch) { // Clone the object auto cpy=inventory::create(*existing); // Find this object in the inventory map. auto iter=cpy->stock.find(what); if (iter == cpy->stock.end()) { // Doesn't exist, create it. iter=cpy->stock.insert(std::make_pair(what, 0)).first; } iter->second += howmuch; std::pair<std::string, int> ret; ret.second=iter->second; // Inventory of 0 removes this item from the inventory, completely. if (ret.second == 0) cpy->stock.erase(iter); // Return an empty string if the inventory is empty. This results // in the object getting deleted. if (!cpy->stock.empty()) { typedef std::back_insert_iterator<std::string> insert_iter_t; insert_iter_t insert_iter(ret.first); x::serialize::iterator<insert_iter_t> ser_iter(insert_iter); cpy->serialize(ser_iter); } return ret; } void xferinventory(int argc, char **argv) { // The list of transfers parsed from the command line. std::list<xferinfo> transfers; // All the warehouses elicited from the transfers, combined: warehouses_t warehouses=warehouses_t::create(); // Parse command line options. for (int i=1; i+3 < argc; i += 4) { int n=0; std::istringstream(argv[i+3]) >> n; if (n <= 0) { std::cerr << "Eh?" << std::endl; return; } auto xferinfo=xferinfo::create(argv[i], argv[i+1], argv[i+2], n); warehouses->createwarehouse(xferinfo->from); warehouses->createwarehouse(xferinfo->to); transfers.push_back(xferinfo); } auto client=stasher::client::base::connect(); auto manager=stasher::manager::create(); // Load the existing inventory, start subscriptions std::list<x::ref<x::obj> > mcguffins; load_warehouses(warehouses, client, manager, mcguffins); std::cout << "Transfering between:" << std::endl; warehouses->inventory(); // Perform the transfers. If any of them where req_rejected_stat-ed, // repeat them. do { // Submit the transfers, wait for them to get processed. do_transfers(client, warehouses, transfers); for (auto b=transfers.begin(), e=transfers.end(), p=b; b != e; ) { p=b; ++b; if ((*p)->processed != stasher::req_rejected_stat) transfers.erase(p); } } while (!transfers.empty()); } void do_transfers(const stasher::client &client, const warehouses_t &warehouses, std::list<xferinfo> &transfers) { // We could very well call versioned_put() inside the first for loop. // But, for this example to demonstrate req_rejected_stat handling, // we must go out of our to cause a version conflict. // // We'll create all transactions before processing them. This way, // when there are multiple transactions that hit the same warehouse, // there will be a req_rejected_stat. class transaction_list { public: stasher::client::base::transaction tran; stasher::versionscollected vers; xferinfo xfer; transaction_list(const stasher::client::base::transaction &tranArg, const stasher::versionscollected &versArg, const xferinfo &xferArg) : tran(tranArg), vers(versArg), xfer(xferArg) { } }; std::vector<transaction_list> transactions; for (auto &transfer : transfers) { // Be optimistic. transfer->processed=stasher::req_processed_stat; if (transfer->from == transfer->to) { std::cout << "Very funny: " << transfer->descr() << std::endl; continue; } std::pair<stasher::client::base::transaction, stasher::versionscollected> transaction=create_versioned_put(warehouses, transfer); if (!transfer->isenough) { std::cout << "Insufficient inventory: " << transfer->descr() << std::endl; continue; } transactions.emplace_back(transaction.first, transaction.second, transfer); } for (auto &t : transactions) { std::cout << "Processing: " << t.xfer->descr() << std::endl; stasher::putresults res= stasher::versioned_put(client, t.tran, t.vers); std::cout << x::tostring(res->status) << ": " << t.xfer->descr() << std::endl; t.xfer->processed=res->status; } } int main(int argc, char **argv) { try { xferinventory(argc, argv); } catch (const x::exception &e) { std::cerr << e << std::endl; return 1; } return 0; }
      xferinventory takes a list of transfers on the
      command line. Each transfer consists of the name of the
      “from” warehouse object, the name of the
      “to” warehouse object, the name of an inventory item, and
      a positive count.
      xferinventory updates both warehouse objects,
      adjusting each one's inventory accordingly.
      xferinventory accepts more than one transfer, and
      does them all. Here's the output of transferring four bananas from
      east to west, and four
      apples from west to east:
    
	$ ./xferinventory east west bananas 4 west east apples 4
Waiting for objects to get loaded
Transfering between:
east:
    Item                                Count
    ------------------------------   --------
    apples                                  2
    bananas                                 8
===========================================================================
west:
    Item                                Count
    ------------------------------   --------
    apples                                  8
    bananas                                 2
===========================================================================
Processing: transfer 4 bananas from east to west
Transaction/request processed: transfer 4 bananas from east to west
Processing: transfer 4 apples from west to east
Collision detected - object uuid mismatch: transfer 4 apples from west to east
Processing: transfer 4 apples from west to east
Transaction/request processed: transfer 4 apples from west to east
      
      In xferinventory.C,
      create_versioned_put() acquires a lock on both
      warehouse objects, collects both objects' version mcguffins into a
      stasher::versionscollected, then prepares a
      transaction to update those objects, and releases the lock.
    
      do_transfers() processes the transactions.
      They all get purposefully created in advance, so that all transactions
      get created for the initial contents of the warehouse objects.
      stasher::versioned_put() then processes each
      transaction, one at a time. This guarantees a
      stasher::req_rejected_stat when the same warehouse is
      a part of more than one transfer, as in this example.
    
      stasher::versioned_put() takes a
      client connection handle, a transaction object, and
      a stasher::versionscollected.
      This synchronous function waits for
      the transaction to get processed, then returns its
      stasher::putresults.
      Additionally, if the transaction's status is
      stasher::req_rejected_stat,
      stasher::versioned_put() waits until at least one of the
      versioned mcguffins goes out of scope and gets destroyed, indicating that
      its versioned object has been updated.
    
      The above example simply proceeds and immediately reprocesses all
      transfers that failed with a stasher::req_rejected_stat,
      using the updated contents of the warehouse objects as a new starting
      point, for this go-around.
    
	All current object value locks must be released before calling
	stasher::versioned_put(). The correct sequence of
	events is:
      
	    Lock all current_values, in a consistent order,
	    to avoid deadlocks.
	  
	    Collect each value's version mcguffin, into a
	    stasher::versionscollected, and prepare
	    a transaction updating the same objects.
	  
	    Release current_value locks.
	  
	    Call stasher::versioned_put().
	  
	In the event of the collision,
	stasher::versioned_put() waits until one of the
	versioned objects gets updated. The callbacks that update versioned
	current objects acquire their own lock on their
	current_value, resulting in a deadlock unless
	all locks get released prior to a
	stasher::versioned_put().
      
stasher::versioned_put_request()
      xferinventory2.C
      is a version of
      xferinventory.C
      that demonstrates how to use
      stasher::versioned_put_request(),
      an asynchronous version
      of stasher::versioned_put().
      stasher::versioned_put() takes
      client connection handle, a transaction object, a functor, and
      a stasher::versionscollected.
      The client connection handle, the transaction object, and the
      stasher::versionscollected serve the same purpose
      as with stasher::versioned_put().
    
      stasher::versioned_put_request()
      returns immediately.
      The functor gets invoked, with a
      stasher::putresults parameter, when the
      transaction gets processed.
      Just like with stasher::versioned_put(),
      if the transaction's status is
      stasher::req_rejected_stat,
      the functor does not get invoked until at least
      one of the versioned mcguffins goes out of scope and gets destroyed,
      indicating that its versioned object has been updated.
    
	$ ./xferinventory2 east west apples 2 east west bananas 1 west east apples 1 west east bananas 2
Waiting for objects to get loaded
Transfering between:
east:
    Item                                Count
    ------------------------------   --------
    apples                                  6
    bananas                                 4
===========================================================================
west:
    Item                                Count
    ------------------------------   --------
    apples                                  4
    bananas                                 6
===========================================================================
Processing: transfer 2 apples from east to west
Processing: transfer 1 bananas from east to west
Processing: transfer 1 apples from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 2 apples from east to west
Collision detected - object uuid mismatch: transfer 1 apples from west to east
Collision detected - object uuid mismatch: transfer 1 bananas from east to west
Collision detected - object uuid mismatch: transfer 2 bananas from west to east
Processing: transfer 1 bananas from east to west
Processing: transfer 1 apples from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 1 bananas from east to west
Collision detected - object uuid mismatch: transfer 1 apples from west to east
Collision detected - object uuid mismatch: transfer 2 bananas from west to east
Processing: transfer 1 apples from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 1 apples from west to east
Collision detected - object uuid mismatch: transfer 2 bananas from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 2 bananas from west to east
      
      xferinventory2.C does not prepare all the
      transactions in advance, before processing them. Since they're
      asynchronous,
      xferinventory2.C starts them all together, then
      waits until they've all been processed, before redoing all the
      stasher::req_rejected_stat failures:
    
	The requirement to release all versioned current object locks,
	mentioned in
	xferinventory.C
	is slightly relaxed due to
	stasher::versioned_put_request()'s asynchronous
	nature, as long as the locks are released shortly after
	stasher::versioned_put_request returns.
      
	Because of that, two convenience templates are also available.
	stasher::versioned_put_request_from is an
	alternative version where the last
	stasher::versionscollected parameter gets
	replaced by a variadic list of
	stasher::versionedptrs (or something similar
	that implements a suitable getversion());
	and stasher::versioned_put_request_fromiter where
	the last
	stasher::versionscollected parameter gets
	replaced by a beginning and an ending iterator, over
	stasher::versionedptrs.
      
	These alternative functions construct a
	stasher::versionscollected from
	their custom parameters, then invoke
	stasher::versioned_put_request().