1717#include " futurepublisher.hpp"
1818#include " sharedptr.hpp"
1919#include " processorcontainer.hpp"
20+ #include " graphstatestore.hpp"
21+ #include " resumefromsnapshottopicstate.hpp"
2022
2123#include < iostream>
24+ #include < fstream>
2225
2326using std::cout;
2427using std::endl;
@@ -27,16 +30,16 @@ using std::endl;
2730 * @file resuminggraph.cpp
2831 * @brief Basic Counter Graph with persistent storage.
2932 *
30- * @section Introduction
33+ * @section ex-rg-intro Introduction
3134 * This examples cover the most basic way to preserve state across Graph
3235 * instances.
3336 *
3437 * This covers how to resume state and a neat way to define initial state.
3538 *
36- * @section te Architecture
39+ * @section ex-rg-arch Graph Architecture
3740 *
38- * The example is composed of one Detector that counts `EventHappened`
39- * and publishes a total count of those events .
41+ * The example uses a minimal Graph with a single Detector and two topics.
42+ * The Detector counts `EventHappened` and publishes the total in `EventCount` .
4043 *
4144 * @dot "ResumingCounter"
4245digraph GraphAnalyzer {
@@ -51,59 +54,124 @@ digraph GraphAnalyzer {
5154}
5255 * @enddot
5356 *
54- * @section resume-from-snapshot ResumeFromSnapshotTopicState
55- * The framework offers this convenience TopicState that easily bridges output
56- * and input data without breaking the Topological sort.
57- * TODO(cscotti): more.
57+ * @section ex-rg-state-persistence State Persistence; Snapshots & ResumeFromSnapshotTopicState
5858 *
59- * @section Graph
60- * The graph instantiated and evaluation code is unaffected. Both
61- @snippet counterwithreset.cpp CounterWithResetGraph
62- * and
63- @snippet counterwithreset.cpp main
64- * work in the same way as with simple graphs.
59+ * This example shows how [GraphStateStore](@ref DetectorGraph::GraphStateStore
60+ * ) and [StateSnapshot](@ref DetectorGraph::StateSnapshot) can be used in
61+ * conjunction with [ResumeFromSnapshotTopicState](@ref DetectorGraph::ResumeFromSnapshotTopicState)
62+ * to provide an extensible, robust and transparent state persistence
63+ * mechanism.
6564 *
65+ * The diagram below shows the life cycle of StateSnapshots during boot.
66+ * @dot "State Lifetime FSM"
67+ digraph StateLifetime {
68+ node[fontname=Helvetica];
69+ edge[lp="r", fontname="Times-Italic", fontcolor=blue];
70+ "first_ever" [label="", shape=none];
71+ "first_ever" -> "PrimeSnapshot" [label="Start"];
72+ "PrimeSnapshot" [fontname=Monospace];
73+ "ResumeSnapshot" [fontname=Monospace];
74+ "PrimeSnapshot" -> "ResumeSnapshot" [label="Reads from Storage"];
75+ "ResumeSnapshot" -> "LatestSnapshot" [label="First Graph Evaluation"];
76+ subgraph clusterrunning {
77+ style=filled;
78+ color=lightskyblue;
79+ labeljust="r";
80+ label="Running & Updating GraphStateStore";
81+ "LatestSnapshot" [fontname=Monospace];
82+ "LatestSnapshot" -> "LatestSnapshot" [label="Subsequent Evaluations\n+\n Write to Storage"];
83+ }
84+ }
85+ * @enddot
86+ *
87+ * This life cycle can be seen in the `main` of this example:
88+ @snippetlineno resuminggraph.cpp main
89+ *
90+ * `PrimeSnapshot` is a StateSnapshot that contains the initial state/data that
91+ * should always be used - regardless of file-based state persistence.
92+ * In this example we synthesize `PrimeSnapshot` this way:
93+ @snippetlineno resuminggraph.cpp PrimeSnapshot
94+
95+ * `ResumeSnapshot` is the aggregation of TopicStates contained in
96+ * `PrimeSnapshot` and what is deserialized from storage in `ReadSnapshot()`:
97+ @snippetlineno resuminggraph.cpp Deserialization
98+
99+ * `ResumeSnapshot` is then used to construct a `ResumeFromSnapshotTopicState`
100+ * which is then posted to the graph to allow Detectors to resume/initialize
101+ * their state.
102+ @snippetlineno resuminggraph.cpp Evaluate-ResumeFromSnapshot
103+
104+ * From then on `mStateStore` in `ResumingGraph` is continually updated
105+ * from within `ResumingGraph::ProcessOutput()`:
106+ @snippetlineno resuminggraph.cpp UpdateStateStore
107+
108+ * The up-to-date `latestSnapshot` can then be flushed to disk as necessary
109+ * with `WriteSnapshot()`:
110+ @snippetlineno resuminggraph.cpp Serialization
111+
112+ @note
113+ Different applications may chose how to do this step differently. Depending on
114+ your chosen Serialization technology it can be too expensive to call
115+ `WriteSnapshot()` for each new processed input. Some applications may choose
116+ to only call `WriteSnapshot()` during shutdown. A more sophisticated approach
117+ is to call `WriteSnapshot()` only when a `TopicState` in a set of _critical_
118+ ones changes. This can be done by iterating through the list in
119+ `mGraph.GetOutputList()` as it only contains the changed `TopicStates`.
120+
121+ *
122+ * @section ex-rg-instance Running the Program
66123 * Running the program produces:
67124 \verbatim
125+ $ ./resuminggraph.out
68126DetectorGraph: Graph Initialized
69- EventCount.count = 1
70- EventCount.count = 2
71- EventCount.count = 3
72- EventCount.count = 4
73- EventCount.count = 5
74- EventCount.count = 0
75- EventCount.count = 1
76- EventCount.count = 2
127+ EventCount = 1001
128+ EventCount = 1002
129+ EventCount = 1003
130+ EventCount = 1004
131+ EventCount = 1005
132+ EventCount = 1006
133+ EventCount = 1007
134+
135+ $ ./resuminggraph.out
136+ DetectorGraph: Graph Initialized
137+ EventCount = 1008
138+ EventCount = 1009
139+ EventCount = 1010
140+ EventCount = 1011
141+ EventCount = 1012
142+ EventCount = 1013
143+ EventCount = 1014
144+
145+ $ cat resumingGraphSnapshot.txt
146+ 1014
77147 \endverbatim
78148 *
79- * One important thing to note in this case is that ProcessOutput is called 8
80- * times even though `main` only pushes 7 TopicStates into the graph - this is
81- * by design and allows all graph outputs to continue to be inspected exactly
82- * once per-evaluation.
83- * This is the case for all graphs with closed loops.
149+ * One important thing to note is
84150 *
85151 * @cond DO_NOT_DOCUMENT
86152 */
87153
88- // ! [EventHappened]
154+ enum class ResumingGraphTopicStateIds {
155+ kEventCount = 0 ,
156+ };
157+
89158struct EventHappened : public DetectorGraph ::TopicState
90159{
91160};
92- // ! [EventHappened]
93161
94- // ! [EventCount ]
162+ // ! [NamedEventCount ]
95163struct EventCount : public DetectorGraph ::TopicState
96164{
97165 EventCount (int aCount = 0 ) : count(aCount) {}
98166 int count;
99- };
100- // ! [EventCount]
101167
102- // ! [Reset]
103- struct Reset : public DetectorGraph ::TopicState
104- {
168+ DetectorGraph::TopicStateIdType GetId () const
169+ {
170+ return static_cast <DetectorGraph::TopicStateIdType>(
171+ ResumingGraphTopicStateIds::kEventCount );
172+ }
105173};
106- // ! [Reset ]
174+ // ! [NamedEventCount ]
107175
108176// ![ResumingCountDetector]
109177class ResumingCountDetector : public DetectorGraph ::Detector
@@ -119,11 +187,17 @@ class ResumingCountDetector : public DetectorGraph::Detector
119187 SetupPublishing<EventCount>(this );
120188 }
121189
122- void Evaluate (const DetectorGraph::ResumeFromSnapshotTopicState& aSnapshot)
190+ // ![Evaluate-ResumeFromSnapshot]
191+ void Evaluate (const DetectorGraph::ResumeFromSnapshotTopicState& aResumeFrom)
123192 {
124- // TODO
125- // if (aSnapshot.ResumeFromSnapshotTopicState...)
193+ const auto previousEventCount = aResumeFrom.snapshot .GetState <EventCount>();
194+ if (previousEventCount)
195+ {
196+ mEventCount = *previousEventCount;
197+ }
126198 }
199+ // ![Evaluate-ResumeFromSnapshot]
200+
127201 void Evaluate (const EventHappened&)
128202 {
129203 mEventCount .count ++;
@@ -142,29 +216,109 @@ class ResumingCountDetector : public DetectorGraph::Detector
142216class ResumingGraph : public DetectorGraph ::ProcessorContainer
143217{
144218public:
145- ResumingGraph ()
219+ ResumingGraph (const DetectorGraph::StateSnapshot& aInitialSnapshot )
146220 : mResumingCountDetector (&mGraph )
147221 {
148- // Post ResumeFromSnapshotTopicState
222+ ProcessData ( DetectorGraph:: ResumeFromSnapshotTopicState(aInitialSnapshot));
149223 }
150224
151225 ResumingCountDetector mResumingCountDetector ;
152226
153227 virtual void ProcessOutput ()
154228 {
155- // update snapshot.
156- // dump to file
229+ // ![UpdateStateStore]
230+ mStateStore .TakeNewSnapshot (mGraph .GetOutputList ());
231+ // ![UpdateStateStore]
232+ }
233+
234+ template <class TTopicState > ptr::shared_ptr<const TTopicState> GetLastTopicState () const
235+ {
236+ DG_ASSERT (DetectorGraph::TopicState::GetId<TTopicState>()
237+ != DetectorGraph::TopicState::kAnonymousTopicState );
238+
239+ return ptr::static_pointer_cast<const TTopicState>(
240+ mStateStore .GetLastState ()->GetState <TTopicState>());
241+ }
242+ ptr::shared_ptr<const DetectorGraph::StateSnapshot> GetLastState () const
243+ {
244+ return mStateStore .GetLastState ();
157245 }
246+
247+ private:
248+ DetectorGraph::GraphStateStore mStateStore ;
158249};
250+
159251// ![ResumingGraph]
160252
253+ constexpr char kSavedSnapshotFileName [] = " resumingGraphSnapshot.txt" ;
254+
255+ // ![PrimeSnapshot]
256+ DetectorGraph::StateSnapshot GetPrimeSnapshot ()
257+ {
258+ std::list< ptr::shared_ptr<const DetectorGraph::TopicState> > topicStatesList;
259+ topicStatesList.push_back (ptr::shared_ptr<const DetectorGraph::TopicState>(new EventCount (1000 )));
260+ // ... add any other TopicStates that need Prime states.
261+ return DetectorGraph::StateSnapshot (topicStatesList);
262+ }
263+ // ![PrimeSnapshot]
264+
265+ // ![Serialization]
266+ void WriteSnapshot (const DetectorGraph::StateSnapshot& aSnapshot)
267+ {
268+ std::ofstream snapshotOutStream (kSavedSnapshotFileName );
269+
270+ // Generally here you'd have a loop through a list of TopicState
271+ // serializers giving each one a chance to flush that topic's contents to
272+ // disk. And you'd normally use something fancier for Serialization (e.g.
273+ // protobufs).
274+ // For the purposes of this demo we'll mock that in a single method:
275+ {
276+ ptr::shared_ptr<const EventCount> countPtr = aSnapshot.GetState <EventCount>();
277+ if (countPtr)
278+ {
279+ snapshotOutStream << countPtr->count << endl;
280+ }
281+ // ... add serialization of any other TopicStates of interest.
282+ }
283+ }
284+ // ![Serialization]
285+
286+ // ![Deserialization]
287+ DetectorGraph::StateSnapshot ReadSnapshot (const DetectorGraph::StateSnapshot& primeSnapshot)
288+ {
289+ std::list< ptr::shared_ptr<const DetectorGraph::TopicState> > topicStatesList;
290+
291+ std::ifstream snapshotInStream (kSavedSnapshotFileName );
292+ if (snapshotInStream.is_open ())
293+ {
294+ // Generally here you'd have a loop through the TopicStates found on
295+ // Storage and adding each one to `topicStatesList`
296+ {
297+ int count;
298+ snapshotInStream >> count;
299+ topicStatesList.push_back (ptr::shared_ptr<const DetectorGraph::TopicState>(new EventCount (count)));
300+ }
301+ // ... add deserialization of any other TopicStates of interest.
302+ }
303+
304+ return DetectorGraph::StateSnapshot (primeSnapshot, topicStatesList);
305+ }
306+ // ![Deserialization]
307+
161308// ![main]
162309int main ()
163310{
164- ResumingGraph resumingGraph;
311+ DetectorGraph::StateSnapshot primeSnapshot = GetPrimeSnapshot ()
312+ DetectorGraph::StateSnapshot resumeSnapshot = ReadSnapshot (primeSnapshot);
313+
314+ ResumingGraph resumingGraph = ResumingGraph (resumeSnapshot);
165315 for (int i = 0 ; i < 7 ; ++i)
166316 {
167317 resumingGraph.ProcessData (EventHappened ());
318+ cout << " EventCount = " << resumingGraph.GetLastTopicState <EventCount>()->count << endl;
319+
320+ const auto latestSnapshot = resumingGraph.GetLastState ();
321+ WriteSnapshot (*latestSnapshot);
168322 }
169323}
170324// ![main]
0 commit comments