This function loops through the processing of evidence sets.
Description
The mainLoop
is used when the BNEngine
is
used as a server. It checks the queue (database or internal list),
for unprocessed EvidenceSet
objects, and calls
handleEvidence
on them in the order of their
timestamp
s. As a server, this is potentially an
infinite loop, see details for ways of gracefully terminating the loop.
Usage
mainLoop(eng, N=NULL)
mainLoop(eng, N=NULL)
Arguments
eng |
An |
N |
If supplied, this should be an integer. The loop will then handle that many cycles before quitting. |
Details
The evidenceQueue
field of the BNEngine
class is an object of type
MessageQueue
. All events have a
processed
field which is set to true when the
evidence set is processed. The function fetchNextMessage
fetches the oldest unprocessed evidence set, while markAsProcessed
sets the processed flag.
The mainLoop
function iterates over the following steps.
-
Fetch the oldest unprocessed Event:
eve <- fetchNextMessage(eng)
. -
Process the evidence set:
out <- handleEvidence(eng,eve)
. (Note: this expression will always return. If it generates an error, the error will be logged and an object of classtry-error
will be returned.) -
Mark the event as processed:
markAsProcessed(eng,eve)
.
At its simplest level, the funciton produces an infinite loop over these three statements, with some additional steps related to logging and control.
First, if the event queue is empty, the process sleeps for a time
given by eng$waittime
and then checks the queue again. At the
same time, it checks status of the active flag for the process using
the eng$stopWhenFinished()
call. If this returns true and the
queue is empty, processing will terminate.
To facilitate testing, the field eng$processN
can be set to a
finite value. This number is decremented at every cycle, and when
it reaches 0, the mainLoop
is terminated, whether or not
their are any remaining events to be processed. Setting
eng$processN
to an infinite value, will result in an infinite
loop that can only be stopped by using the active flag (or
interrupting the process).
Value
There is no return value. The function is used entirely for its side effects.
Activation
When the loop begins, it calls the eng$activate()
method to
mark the engine as active. When the loop finishes (outside of the
main try/catch block, so it should always return), it calls the
eng$deactivate()
method to signal that the engine has
terminated.
External processes can signal the engine through the
eng$shouldHalt()
and eng$stopWhenFinished()
. The former
is checked every iteration, and the main loop halts when it becomes
true. This allows for an immediate stop when needed. The latter is
checked only when the queue is empty and details whether or not the
process should continue to wait for more messages in the queue.
Database Engine. For the Mongo engine
(BNEngineMongo
) the
communication channel is the AuthorizedApps
collection in the
administrative database. In particular, the EAsignal
field is
read by both methods. The eng$activate()
method changes the
value of that field to “Running”. Changing the value of the
field to “Halt” will cause the eng$shouldHalt()
to be
true triggering a halt before processing the next evidence set.
Changing the value of that field to “Finish” will
eng$stopWhenFinished()
to be true, causing the loop to stop
then the queue is empty.
The following command issues from the Mongo shell will shut down the server for an application containing the string "appName" as part of its name (note “Halt” could be replaced with “finish”).
db.AuthorizedApps.update({app:{$regex:"appName"}},
{$set:{"EAsignal":"Halt"}});
No Database Engine. For the Mongo engine
(BNEngineMongo
) the
communication channel is a file named activeTest
. The
name (extension) of this file is changed to produce the signals.
The eng$activate()
method creates it with the extension
.running
. Changing the extension to .finish
or
.halt
will send the appropriate signal. The
eng$deactive()
method removes the file.
Note
Currently, when running in server model (i.e., with
eng$processN
set to infinity), there are two ways of stopping
the engine: a clean stop after all events are processed using the
active
flag, and an immediate stop, possibly mid cycle, by
killing the server process. It became apparent during testing that
there was a need for a graceful but immediate stop, i.e., a stop after
processing the current event. This should appear in later versions.
Author(s)
Russell Almond
See Also
BNEngine
, BNEngineMongo
,
BNEngineNDB
, MessageQueue
fetchNextMessage
, handleEvidence
,
markAsProcessed
Examples
## Not run: ## From EABN.R script app <- "ecd://epls.coe.fsu.edu/P4test" loglevel <- "DEBUG" source("/usr/local/share/Proc4/EAini.R") futile.logger::flog.appender(appender.file(logfile)) futile.logger::flog.threshold(loglevel) sess <- NeticaSession(LicenseKey=NeticaLicenseKey) startSession(sess) listeners <- lapply(names(EA.listenerSpecs), function (ll) do.call(ll,EA.listenerSpecs[[ll]])) names(listeners) <- names(EA.listenerSpecs) eng <- do.call(BNEngineMongo, c(EAeng.params,list(session=sess,listeners=listeners), EAeng.common)) loadManifest(eng) configStats(eng) setupDefaultSR(eng) ## Activate engine (if not already activated.) eng$activate() mainLoop(eng) ## Wait for cows to come home. ## End(Not run)
## Not run: ## From EABN.R script app <- "ecd://epls.coe.fsu.edu/P4test" loglevel <- "DEBUG" source("/usr/local/share/Proc4/EAini.R") futile.logger::flog.appender(appender.file(logfile)) futile.logger::flog.threshold(loglevel) sess <- NeticaSession(LicenseKey=NeticaLicenseKey) startSession(sess) listeners <- lapply(names(EA.listenerSpecs), function (ll) do.call(ll,EA.listenerSpecs[[ll]])) names(listeners) <- names(EA.listenerSpecs) eng <- do.call(BNEngineMongo, c(EAeng.params,list(session=sess,listeners=listeners), EAeng.common)) loadManifest(eng) configStats(eng) setupDefaultSR(eng) ## Activate engine (if not already activated.) eng$activate() mainLoop(eng) ## Wait for cows to come home. ## End(Not run)