#include <stasher/heartbeat.H> #include <stasher/client.H> #include <stasher/manager.H> #include <x/threads/run.H> #include <x/mpobj.H> #include <x/ymdhms.H> #include <iostream> #include <deque> // The identifier for each instance of the application. // The identifier can be any class that: // - Implements strict weak ordering using operator<(). Depending on other // uses, it might also need to implement other comparison operators. // - Has a default constructor // - Serializable, using x::serialize/x::deserialize (this implies a default // constructor) // - Is stringable, using x::tostring class application_id { public: // For this example, the identifier consists of a node name and a pid. std::string nodename; pid_t pid; application_id() { } application_id(const std::string &nodenameArg, pid_t pidArg) : nodename(nodenameArg), pid(pidArg) { } // Implement the serialization requirement template<typename iter_type> void serialize(iter_type &iter) { iter(nodename); iter(pid); } // Implement strict weak ordering bool operator<(const application_id &o) const { return nodename < o.nodename || (nodename == o.nodename && pid < o.pid); } bool operator==(const application_id &o) const { return nodename == o.nodename && pid == o.pid; } // Implement stringability static const x::stringable_t stringable=x::class_tostring; template<typename OutputIterator> OutputIterator to_string(OutputIterator iter, const x::const_locale &localeRef) const { std::ostringstream o; o << nodename << ", pid " << pid; std::string s=o.str(); return std::copy(s.begin(), s.end(), iter); } }; // Our heartbeat status is a plain std::string, but it can also be any // arbitrary class that meets the same requirements as the node identifier, // except for strict weak ordering. typedef stasher::heartbeat<application_id, std::string> heartbeat; // The heartbeat update thread class updatethrObj : virtual public x::obj { public: // Mutex-protected thread metadata class input_t { public: // Queue for heartbeat update requests. std::deque<heartbeat::base::update_type_t> update; // Main thread has terminated. Time to wrap things up bool eof; // Most recently posted heartbeat from all instances. // Keyed by instance name, value is the instance's metadata. // When this changes, we print it. std::map<application_id, std::string> current; // Our official heartbeat status, posted to the repository std::string my_status; input_t() : eof(false) {} }; typedef x::mpcobj<input_t> input_meta_t; input_meta_t meta; // Retrieve the posted heartbeat status from all instances. std::map<application_id, std::string> getstatus(const heartbeat &hb) { std::map<application_id, std::string> m; heartbeat::base::lock lock(*hb); if (!lock->value.null()) // Empty, if no heartbeat object posted yet. for (auto ×tamp: lock->value->timestamps) { m.insert(std::make_pair(timestamp.first, timestamp.second.meta)); } return m; } // The update thread void run(const heartbeat &hb) { while (1) { auto cur_value=getstatus(hb); input_meta_t::lock lock(meta); if (lock->eof) break; if (cur_value != lock->current) { lock->current=cur_value; std::cout << std::setw(79) << std::setfill('-') << "" << std::setw(0) << std::endl; // Posted heartbeat status has changed. Show it. for (auto &status:lock->current) { std::cout << x::tostring(status.first) << ": " << status.second << std::endl; } std::cout << std::setw(79) << std::setfill('-') << "" << std::setw(0) << std::endl << std::endl; } // Wait for an update request. if (lock->update.empty()) { lock.wait(); continue; } auto update_type=lock->update.front(); lock->update.pop_front(); // Pass the update request, and my current status, to // the heartbeat template. hb->update(update_type, // Including the current timestamp in the // message, below, results in the posted // heartbeat status of each instance being // different with every periodic refresh // (ten seconds). (lock->my_status.size() ? lock->my_status:"(none)")); } } // The heartbeat template wants our current heartbeat status. void push(heartbeat::base::update_type_t type) { input_meta_t::lock lock(meta); lock->update.push_back(type); lock.notify_all(); } // The main thread posts new official status. void push(const std::string &new_status) { input_meta_t::lock lock(meta); lock->update.push_back(heartbeat::base::app_update); lock->my_status=new_status; lock.notify_all(); } }; // A container for the heartbeat update thread's instance. This makes sure // that the thread terminates before it goes out of scope. // // As explained, the heartbeat template instance must go out of scope and // get destroyed before the client connect object, otherwise a deadlock may // occur. class thr_instance { public: // The running thread x::runthread<void> run; // The object the thread is running. x::ref<updatethrObj> thr; // Start the thread thr_instance(const x::ref<updatethrObj> &thrArg, const heartbeat &heartbeat) : run(x::run(thrArg, heartbeat)), thr(thrArg) { } // The destructor stops the thread. ~thr_instance() { { updatethrObj::input_meta_t::lock lock(thr->meta); lock->eof=true; lock.notify_all(); } run->get(); // Wait for the thread to stop } }; void post_heartbeat() { auto client=stasher::client::base::connect(); std::string name=client->gethelo().nodename; if (name.empty()) throw EXCEPTION("Not connected"); std::cout << "Type (blindly) then ENTER to update this instance status, empty line to quit" << std::endl; auto manager=stasher::manager::create(); auto thr=x::ref<updatethrObj>::create(); auto hb=heartbeat::create(manager, client, "heartbeat", // Name of the object application_id(name, getpid()), // Our refresh interval L"refresh", std::chrono::seconds(10), // Interval for purging out instances that // no longer update. L"stale", std::chrono::seconds(30), // Heartbeat template callback, requesting // a posted update. [thr] (heartbeat::base::update_type_t update_type) { thr->push(update_type); }); thr_instance run1(thr, hb); // Starts everything, loops, posting the update. std::string line; while (!std::getline(std::cin, line).eof()) { if (line.empty()) break; thr->push(line); } } int main(int argc, char **argv) { try { post_heartbeat(); } catch (const x::exception &e) { std::cerr << e << std::endl; exit(1); } return 0; }
This example demonstrates the recommended design pattern for implementing heartbeats:
The update functor sends a message to a separate thread, consisting
of the update_type_t
token. This example
implements a thread queue manually, using
std::deque
. A real application will
typically use LIBCXX's message dispatching framework.
The heartbeat update thread gets the heartbeat object as a parameter
to its run()
, which is stored on the
executing thread's stack. This ensures that when the thread
terminates, its reference on the heartbeat object goes out of scope
and gets destroyed.
Upon receiving the message, the thread invokes the heartbeat
object's update
() method, passing it the
update_type_t
handle, and the instance's
current status.
Aside from the periodically-scheduled updates of the instance's
heartbeat status, the application requests an explicit update to
its posted heartbeat status by invoking
update
() and passing
stasher::heartbeat<
for the application_id
,application_status
>::base::app_updateupdate_type_t
parameter.
In this example, the main execution thread arranges that by
passing it as a message to the heartbeat thread, and have the
heartbeat thread handle it like all the other update messages.
Not every call to update
() results
in the heartbeat object getting updated in the stasher
object repository. This is transparent to the application. The
application should invoke update
() in
response to every invocation of the update functor, and let
stasher::heartbeat
figure out what needs to be
done.
The heartbeat object must go out of scope and get destroyed
before
the client connection handle.
In this example, post_update
() does that by:
Constructing the client connection handle and the manager object first, then the thread object, then the heartbeat object.
Constructing a manual guard object whose destructor waits for the thread to stop.
Starting the thread as the last order of business.
When these objects go out of scope in reverse order, the guard object stops the thread first. The thread object maintains its reference on the heartbeat object on its execution stack. So with the thread stopped, once the thread and the heartbeat object go out of scope, there cannot be any other references to the heartbeat object. Next, the manager object and the client connection handle can safely go otu of scope and get destroyed.
Here's the expected output from running this example:
./heartbeat
Type (blindly) then ENTER to update this instance status, empty line to quit
-------------------------------------------------------------------------------
octopus.objrepo.example.com, pid 28837: First instance's status
-------------------------------------------------------------------------------
-------------------------------------------------------------------------------
octopus.objrepo.example.com, pid 28837: First instance's status
octopus.objrepo.example.com, pid 28916: (none)
-------------------------------------------------------------------------------
Second instance
-------------------------------------------------------------------------------
octopus.objrepo.example.com, pid 28837: First instance's status
octopus.objrepo.example.com, pid 28916: Second instance
-------------------------------------------------------------------------------
-------------------------------------------------------------------------------
octopus.objrepo.example.com, pid 28916: Second instance
-------------------------------------------------------------------------------
When this process starts, another instance already runs, process 28837, and the heartbeat object contains its existing status. Immediately afterward, the heartbeat object posts its first update, an initial, empty status.
This is a documented function of the update functor:
it gets invoked any time the heartbeat object changes. The update functor
also gets invoked at other times, too. This
includes the initial contents of the heartbeat object, and this also
includes any time the application instance updates its heartbeat.
Invoking update
() results in another invocation
of the update functor,
and another call to update
(), but the second
call to update
() does not result in another
update to the heartbeat object. The application
does not need to implement that logic, but should just pass along
the update_type_t
to
update
().
Therefore, each time the heartbeat thread gets an update message, it checks the current contents of the heartbeat object, and if it's different than the previous value, it gets displayed.
In the above example, “Second instance” gets read by the main thread from standard input, then posted as this instance's new status, which gets immediately shown as a result of the heartbeat object update.
Some time later, the other instance of the application, process 28837 stops, and this instance's periodic refresh of the heartbeat object removes its status.