Title: | Four Process Assessment Database and Dispatcher |
Description: | Utilities for working with messages in the four four process architecture as json objects. |
Authors: | Russell Almond |
Maintainer: | Russell Almond <[email protected]> |
License: | Artistic-2.0 |
Version: | 0.8-5 |
Built: | 2025-02-12 06:01:31 UTC |
Source: | https://github.com/ralmond/Proc4 |
Utilities for working with messages in the four four process architecture as json objects.
Package: | Proc4 |
Version: | 0.8-5 |
Date: | 2023/06/23 |
Title: | Four Process Assessment Database and Dispatcher |
Authors@R: | person(given = "Russell", family = "Almond", role = c("aut", "cre"), email = "[email protected]", comment = c(ORCID = "0000-0002-8876-9337")) |
Author: | Russell Almond |
Maintainer: | Russell Almond <[email protected]> |
Depends: | R (>= 3.0), methods, |
Imports: | futile.logger, mongo, jsonlite |
Suggests: | utils, mongolite, knitr, rmarkdown, tidyr, CPTtools, rlang, bookdown, devtools, withr, testthat (>= 3.0.0), Peanut |
Description: | Utilities for working with messages in the four four process architecture as json objects. |
Collate: | ErrorHandling.R Message.R MessageQueue.R Listeners.R CaptureListener.R InjectionListener.R TableListener.R UpdateListener.R UpsertListener.R |
License: | Artistic-2.0 |
URL: | https://pluto.coe.fsu.edu/Proc4 |
VignetteBuilder: | knitr |
Support: | c( 'Bill & Melinda Gates Foundation grant "Games as Learning/Assessment: Stealth Assessment" (#0PP1035331, Val Shute, PI)', 'National Science Foundation grant "DIP: Game-based Assessment and Support of STEM-related Competencies" (#1628937, Val Shute, PI)', 'National Science Foundation grant "Mathematical Learning via Architectual Design and Modeling Using E-Rebuild." (\#1720533, Fengfeng Ke, PI)', 'Institute of Educational Statistics Grant: "Exploring adaptive cognitive and affective learning support for next-generation STEM learning games." (#R305A170376-20, Val Shute and Russell Almond, PIs') |
Config/testthat/edition: | 3 |
Config/pak/sysreqs: | libssl-dev libsasl2-dev |
Repository: | https://ralmond.r-universe.dev |
RemoteUrl: | https://github.com/ralmond/Proc4 |
RemoteRef: | HEAD |
RemoteSha: | b45286c302d5e2e795c41387d1c85ccabcbec0a2 |
Index of help topics:
CaptureListener Constructors for Listener Classes CaptureListener-class Class '"CaptureListener"' InjectionListener-class Class '"InjectionListener"' ListQueue-class Class '"ListQueue"' Listener A listener is an object which can recieve a message. ListenerSet-class Class '"ListenerSet"' MessageQueue-class Class '"MessageQueue"' MongoQueue-class Class '"MongoQueue"' P4Message Constructor and accessors for P4 Messages P4Message-class Class '"P4Message"' Proc4-package Four Process Assessment Database and Dispatcher TableListener-class Class '"TableListener"' UpdateListener-class Class '"UpdateListener"' UpsertListener-class Class '"UpsertListener"' buildListener Builds a listener from a JSON description. buildListenerSet Builds Listener Set from a a JSON configuration buildMessage Converts a JSON object into a P4 Message cleanMessageQueue Removes messages matching query from queue. fetchNextMessage Returns the next unprocessed message from a message queue. generateListenerExports Build tables from messages saved by the listner importMessages Imports a file full of messages into a message queue. listenerDataTable Fetches a data frame containing information captured by listener markAsProcessed Functions for manipulating entries in a message queue. mongoAppender-class Class '"mongoAppender"' notifyListeners Notifies listeners that a new message is available. registerOutput Registers a file used for output information from an engine. resetListeners Clears messages caches associated with listeners resetProcessedMessages Clears the processed flags on the matching messages serializeData Produces a string with a JSON representation of an R object withFlogging Invoke expression with errors logged and traced
This package exists to supply core functionality to other processes implementing processes in the four process architecture (Almond, Steinberg and Mislevy, 2002). In particular, it contains low level code dealing with implementing message queues in a document database (mongo) and reading/writing messages from JSON.
There are five major features of this package documented below:
The P4Message
object and the protocol for
converting messages to JSON and saving them in the mongo database.
A withFlogging
function which wraps the
A number of Listener
objects which implement an
observer protocol for messages.
The config
directory contains a number of javascript
files for building database schemas and indexes.
The dongle
directory contains a number of PHP scripts for
exposing the database via a web server.
Earlier verisons included a number of tools which wrap funcions in the
packages. In particular, this
included the as.json
, and buildObject
generic functions to manage the conversion from S4 object to JSON and
back, the saveRec
, and getManyRecs
methods for saving and restoring objects from a database, and the
for building Mongo queries from R-like
syntax. These have been moved to the
The extended four process architecture defines a message object
) with the following fields:
:Used for internal database ID.
:Object of class "character"
specifies the application in which the messages exit.
:Object of class "character"
identifies the user (student).
:Object of class "character"
identifies the context, task, or item.
:Object of class "character"
identifies the sender. This is usually one of
"Presentation Process", "Evidence Identification Process",
"Evidence Accumulation Process", or "Activity Selection Process".
:Object of class "character"
a general
title for the message context.
:Object of class "POSIXt"
which gives
the time at which the message was generated.
:Object of class "list"
which contains the
data to be transmitted with the message.
:A logical value: true if the message has
been processed, and false if the message is still in queue to be
processed. This field is set with markAsProcessed
:If a error occurs while processing this event,
information about the error can be stored here, either as an R
object, or as an R object of class error (or any class). This
field is accessed with processingError
and set with
Other classes can extend this message protocol by adding additional fields, but the header fields of the message object allow it to be routed.
In particular, the processed
field allows a database
collection of messages to be used as queue. Simply search for
unprocessed message and begin processing them oldest first, using
to mark the complete process and
to mark errors.
The mongo::as.json
functions build JSON
representations of S classes. In general, this process needs explicit
instructions on how to code/decode the fields of the object. Methods
of the inner mongo::as.jlist
provide this functionality.
Note that classes which extend P4Message
will need to use these methods. The cleanMessageJlist
does the common processing for the P4Message
parent class.
Finally, buildMessage
is a more specific version of the
generic builder.
The functions saveRec
facilitate saving and loading message
objects from the database. The function buildJQuery
gives R-like syntactic sugar to building mongo (JSON) queries.
The logging system for the Proc4
processes is mostly just the
protocol. Aside from
importing the futile.logger
package, Proc4
adds the
function withFlogging
executes a series of statements in
an environment in which the error messages will be logged, and at
higher logging levels, stack traces for errors and warnings are given.
The intention is that most message handling functions will be wrapped
in withFlogging
, so that information about the message
causing the error/warning will be available for debugging.
The package also supplies a mongoAppender
class, which
provides a way of logging messages to a database.
The Proc4
package implements an observer protocol called
. A listener is an abstract class which
implements the receiveMessage
function. The argument of
this function is a P4Message
object, which the
listener then does something with. (In most of the implemented
examples, this is to save it in a database.) Note that listeners
should also define a isListener
method to indicate that
it is a listener.
Four listeners are currently implemented (see
or the individal listener classes):
Creates an object of class
which stores the messages in
a list.
Creates an object of class
which inserts the message into
the designated database.
Creates an object of class
which updates the designated
Creates an object of class
which insert or replaces the
message in the designated collection.
Creates an object of class
which adds details from message
to rows of a data frame.
The RefListener
is an abstract class which
provides methods for the other classes (in particular, promoting the
class-based methods to true S4 methods. These include
, listenerName
, listeningFor
, and clearMessage
. Note that the default
methods for the latter two
functions rely on internal $receiveMessage()
class-based methods, which must be implemented in the
The ListenerSet
class is a mixin to associate a
collection of listeners with an object (the
and BNEngine
classes use this). The generic function notifyListeners
can be called. This logs information about the message (see logging
system above), save a copy of the message in a “Messages”
database, and calls the receiveMessage
method on all of
the listener objects in its collection.
Using the mongo database, both security (user IDs and passwords) is optional. Running mongo without security turned on is probably okay as long as the installation is (a) behing a firewall, and (b) the firewall is configured to not allow connections on the mongo port except from localhost. However, other users may want to turn on security.
The recommended security setup is to create four users, “EIP”,
“EAP”, “ASP”, and “C4” for the four processes and
to assign a password to each. The URI's of the database connections
then need to be modified to include the username and passwords. Each
process would have an ini.R
file which contains its password
which is stored in an appropriate configuration directory. (On *nix
systems, the recommend location is /usr/local/share/Proc4
The files Proc4.ini
(PHP format) and Proc4.js
(javascript format) can be used for saving the key usernames and
passwords. These files are located in the directory file.path(
library(help="Proc4")$path, "config")
. To install these files it is
necessary to copy the files to the configuration directory and edit
them so that the password reflects local preferences.
The file setupDatabases.js
in the config
creates databases for each of the processes and stores the appropriate
login credentials. (Note that this calls Proc4.js
to get these
credentials so that file must be established first.) This is a
javascript file designed to be run directly in mongo, i.e.,
mongo setupDatabases.js
. Note that it must be run by a user
which has the appropriate priveleges to create databases and modify
their security (a “root” user).
The file setupProc4.js
in the config
directory sets up
schemas and indexes for collections in the Proc4
database which
are used by the dongle process. Schemas are optional in mongo, but
the indexes should speed up operations.
The directory file.path(library(help="Proc4")$path, "config")
contains files that facilitate direct communciation with the mongo
database. In particular, there are a number of PHP scripts which if put
in a directory available to the web server will allow remote processes
to get information about users in the system. The scripts are:
Called when player logs in on a given day. As data returns information needed to restore gaming session (currently bank balance and list of trophies earned). Note that player details are updated by the EI process.
Called when player logs out. Currently not used. It is designed to help automatically shut down unneeded processed.
Called when current player competency estimates are required, e.g., when displaying player scores. It returns a list of statistics and their values in the data field; the exact statistics returned depend on the configuration of the EA process. This database collection is updated by the EA process after each game level is processed.
Called when the game wants the next level. The message data should contain information about what topic the player is currently addressing and a list of played and unplayed levels, with the unplayed levels sorted so the next level according to protocol is first on the list. The complete list of levels should be returned so that if levels on the list have already been completed, a new level would be entered. Although the PHP script has been built, the AS process to feed it has not.
In addition, there is a file called LLtoP4
in that directory
which is a bash
script for translating between xAPI and Proc4
message formats. The function LLtoP4Loop
repeatedly downloads
xAPI statements from the learning locker database, translates them to P4
format, and uploads them to the EI process database.
The vingette file Dongle.pdf
describes the dongle and database
structure in more detail.
Work on the Proc4, EIEvent and EABN packages has been supported by the following grants:
Bill and Melinda Gates Foundation grant “Games as Learning/Assessment: Stealth Assessment” (no. 0PP1035331, Val Shute, PI)
National Science Foundation grant “DIP: Game-based Assessment and Support of STEM-related Competencies” (no. 1628937, Val Shute, PI)
National Scient Foundation grant “Mathematical Learning via Architectual Design and Modeling Using E-Rebuild.” (no. 1720533, Fengfeng Ke, PI)
Institute of Educational Statistics Grant: “Exploring adaptive cognitive and affective learning support for next-generation STEM learning games.” (no. R305A170376-20, Val Shute and Russell Almond, PIs')
The Proc4 package developement was led by Russell Almond (Co-PI).
Russell Almond
Maintainer: Russell Almond <[email protected]>
Almond, R. G., Steinberg, L. S., and Mislevy, R.J. (2002). Enhancing the design and delivery of Assessment Systems: A Four-Process Architecture. Journal of Technology, Learning, and Assessment, 1, http://ejournals.bc.edu/ojs/index.php/jtla/article/view/1671.
The source code, and issues database can be found at https://github.com/ralmond/Proc4
This is used in configuration, it will build a listener from a JSON description of the listener. The “name” and “type” fields are required. The other fields should match the arguments for the constructor, with the exceptions noted below:
buildListener(specs, app, dburi, defaultDB="Proc4", ssl_options=mongolite::ssl_options(), noMongo = !missing(dburi) && length(dburi) > 0L && nchar(dburi) > 0L)
buildListener(specs, app, dburi, defaultDB="Proc4", ssl_options=mongolite::ssl_options(), noMongo = !missing(dburi) && length(dburi) > 0L && nchar(dburi) > 0L)
specs |
A named list (from the JSON) containing the instructions for building the listener. |
app |
A character value that will get substituted for the string “<app>” in the “name” and “sender” fields. |
dburi |
If a database is used for this listener, then this is the uri for the connection. Note that this is specified in the code and not in the JSON. |
defaultDB |
The name of the database with which the Listener will interact,
only used if no |
ssl_options |
Options used for an SSL connection to the database.
noMongo |
A logical value. If true, then the connection to the Mongo database will not be made, and CRUD operations will basically become no-ops. |
The input to this function is a list that comes from JSON (or some
other input method that returns a named list). The specs$type
field should be the name of a Listener
class. This
means that specs$type
is the name of a constructor function,
and the rest of the spec
argument are the arguments.
Currently, the following fields are used.
The name of the listener, required. The string
“<app>” is substituted for app
Required, the name of the constructor for the desired class. The function will generate an error if this does not correspond to the name of a class.
A string insterted into logged messages. The string
“<app>” is substituted for app
The name of the database in which the messages will be
recorded. If not present, then the defaultdb
will be used.
The name of the database collection in which the messages will be recorded.
A character vector giving the names of the messages the listener will pay attention to. Note that this maps to the field “messSet” in the listener object.
Used in the UpdateListener
and UpsertListener
to indicate the field to be
The name of a function used to encode the field
value to be modified as JSON. See stats2json
A character vector giving the names of the fields
used as the key for finding the message to replace. Usually
should contain c("uid","app")
This should be a named character vector (or list)
whose names indicate the names of the observables/statistics to
collect, and whose values are the types. See
; this field maps to the
“fieldlist” field of that class.
Other fields in specs
are ignored.
An object of the virtual class Listener
(i.e., something
for which isListener
should return true.
The field name “messages” maps to the internal field
. The field name “fields” maps to the internal
field fieldlist
Russell Almond
jspecs <- '[ { "name":"ppLS<app>", "type":"TableListener", "messages":["Coins Earned","Coins Spent", "LS Watched"], "fields":{ "uid":"character", "context":"character", "timestamp":"character", "currentMoney":"numeric", "appId":"numeric", "mess":"character", "money":"numeric", "onWhat":"character", "LS_duration":"difftime", "learningSupportType":"character" } }, { "name":"ToEA", "type":"InjectionListener", "dbname":"EARecords", "colname":"EvidenceSets", "messages":["New Observables"] }, { "name":"PPPersistantData", "type":"UpdateListener", "dbname":"Proc4", "colname":"Players", "targetField":"data", "jsonEncoder":"trophy2json", "messages":["Money Earned", "Money Spent"] } ]' speclist <- jsonlite::fromJSON(jspecs,FALSE) l1 <- buildListener(speclist[[1]],"test",mongo::makeDBuri()) l2 <- buildListener(speclist[[2]],"test",mongo::makeDBuri()) l3 <- buildListener(speclist[[3]],"test",mongo::makeDBuri())
jspecs <- '[ { "name":"ppLS<app>", "type":"TableListener", "messages":["Coins Earned","Coins Spent", "LS Watched"], "fields":{ "uid":"character", "context":"character", "timestamp":"character", "currentMoney":"numeric", "appId":"numeric", "mess":"character", "money":"numeric", "onWhat":"character", "LS_duration":"difftime", "learningSupportType":"character" } }, { "name":"ToEA", "type":"InjectionListener", "dbname":"EARecords", "colname":"EvidenceSets", "messages":["New Observables"] }, { "name":"PPPersistantData", "type":"UpdateListener", "dbname":"Proc4", "colname":"Players", "targetField":"data", "jsonEncoder":"trophy2json", "messages":["Money Earned", "Money Spent"] } ]' speclist <- jsonlite::fromJSON(jspecs,FALSE) l1 <- buildListener(speclist[[1]],"test",mongo::makeDBuri()) l2 <- buildListener(speclist[[2]],"test",mongo::makeDBuri()) l3 <- buildListener(speclist[[3]],"test",mongo::makeDBuri())
This method builds a ListenerSet
for an engine.
In particular, the config
is list which come from reading a
JSON file (see fromJSON
) which contains the rules for
building the Listener
s in the set.
buildListenerSet(sender, config, appid, lscol, dbname, dburi, sslops, registrycol, registrydbname, mongoverbose = FALSE)
buildListenerSet(sender, config, appid, lscol, dbname, dburi, sslops, registrycol, registrydbname, mongoverbose = FALSE)
sender |
A character scalar identifying the message sender. |
config |
A named list providing details of the contained listeners. |
appid |
A character scalar giving the application ID for the application being built. |
lscol |
A character scalar giving the name of the collection used for logging messages by the message set. |
dbname |
A character scalar giving the name of the database for the message log, as well as the default database for listeners. |
dburi |
A character scalar giving the URI of the mongo collection. |
sslops |
A list giving options for a SSL connection. See |
registrycol |
A character scalar giving the name of the colleciton for registering output. |
registrydbname |
A character scalar giving the name of the database in which the output registriation collection |
mongoverbose |
A flag for adding debugging information to Mongo
calls (see |
This method builds the listener set starting by calling
for each element of the
list. This then becomes the listeners
to the
Note that the appid
, dburi
, dbnmae
(mapped to
), and sslops
are passed to
to use for defaults.
An object of class ListenerSet
Russell Almond
## Not run: jspecs <- '{ "listeners":[ { "name":"ToAS", "type":"InjectionListener", "dbname":"ASRecords", "colname":"Statistics", "messages":["Statistics"] }, { "name":"PPStats", "type":"UpdateListener", "dbname":"Proc4", "colname":"Statistics", "targetField":"data", "jsonEncoder":"stats2json", "messages":["Statistics"] } ]}' speclist <- jsonlite::fromJSON(jspecs,FALSE) lset <- buildListenerSet("TestEngine",speclist$listeners, "ecd://pluto.coe.fsu.edu/P4Test", lscol="Messages",dbname="test", dburi="", sslops=mongolite::ssl_options(), registrycol="OutputFiles", registrydbname="test") ## End(Not run)
## Not run: jspecs <- '{ "listeners":[ { "name":"ToAS", "type":"InjectionListener", "dbname":"ASRecords", "colname":"Statistics", "messages":["Statistics"] }, { "name":"PPStats", "type":"UpdateListener", "dbname":"Proc4", "colname":"Statistics", "targetField":"data", "jsonEncoder":"stats2json", "messages":["Statistics"] } ]}' speclist <- jsonlite::fromJSON(jspecs,FALSE) lset <- buildListenerSet("TestEngine",speclist$listeners, "ecd://pluto.coe.fsu.edu/P4Test", lscol="Messages",dbname="test", dburi="", sslops=mongolite::ssl_options(), registrycol="OutputFiles", registrydbname="test") ## End(Not run)
The buildMessage
function is a parser to use with the
and getManyRecs
database query
functions. This function will convert the documents fetched from the
database into P4Message
buildMessage(rec,class="P4Message") cleanMessageJlist(rec) ## S4 method for signature 'P4Message,list' as.jlist(obj, ml, serialize = TRUE) ## S4 method for signature 'P4Message,list' parse.jlist(class, rec)
buildMessage(rec,class="P4Message") cleanMessageJlist(rec) ## S4 method for signature 'P4Message,list' as.jlist(obj, ml, serialize = TRUE) ## S4 method for signature 'P4Message,list' parse.jlist(class, rec)
rec |
A named list containing JSON data. |
class |
The class of the object being built. In the case of
obj |
The object being converted. This is mostly used for message dispatch. |
ml |
A named list containing JSON data. |
serialize |
If true, then the
The mdbIterate
method object returns a list
containing the fields of the JSON object with a
name=value format (see jlist). This is
the rec
argument to buildMessage
. In particular,
this is a builder function (see buildObject
which can be passed as the builder
argument to
or getOneRec()
when the object to be built is a P4Message
To facilitate the building subclasses, the (e.g., to check the
argument types and insert default values). The function
does that cleaning for the common fields of
the P4Message
object, so subclasses
can inheret the parsing for the commond message
fields. The as.jlist
method is a helper function
for the as.json
method. The
method (which calls
is a helper function for the
The data
field needs extra care as it could contain arbitrary R
objects. There are two strategies for handling the data field.
First, use serializeJSON
to turn the data
field into a slob (string large object), and
to decode it. This strategy
should cover most special cases, but does not result in easily edited
JSON output. Second, recursively apply unboxer
and use the function parseSimpleData
to undo the
coding. This results in output which should be more human readable,
but does not handle objects (either S3 or S4). It also may fail on
more complex list structures.
The function buildMessage
returns a
object populated with fields from the
argument. The function cleanMessageJlist
and the
method returns the cleaned rec
(suitable for passing to the P4Message
The function as.jlist
method returns the processed ml
object (ready to be converted to JSON).
I hit the barrier pretty quickly with trying to unparse the data manually. In particular, it was impossible to tell the difference between a list of integers and a vector of integers (or any other storage type). So, I went with the serialize solution.
The downside of the serial solution is that it stores the data field as a slob. This means that data values cannot be indexed. If this becomes a problem, a more complex implementation may be needed.
Russell Almond
, getOneRec
, P4Message
m1 <- P4Message("Fred","Task1","PP","Task Done", details=list("Selection"="B")) m2 <- P4Message("Fred","Task1","EI","New Obs", details=list("isCorrect"=TRUE,"Selection"="B")) m3 <- P4Message("Fred","Task1","EA","New Stats", details=list("score"=1,"theta"=0.12345,"noitems"=1)) ev1 <- P4Message("Phred","Level 1","PP","Task Done", timestamp=as.POSIXct("2018-12-21 00:01:01"), details=list("list"=list("one"=1,"two"=1:2),"vector"=(1:3))) m1a <- buildMessage(mongo::ununboxer(as.jlist(m1,attributes(m1)))) m2a <- buildMessage(mongo::ununboxer(as.jlist(m2,attributes(m2)))) m3a <- buildMessage(mongo::ununboxer(as.jlist(m3,attributes(m3)))) ev1a <- buildMessage(mongo::ununboxer(as.jlist(ev1,attributes(ev1))))
m1 <- P4Message("Fred","Task1","PP","Task Done", details=list("Selection"="B")) m2 <- P4Message("Fred","Task1","EI","New Obs", details=list("isCorrect"=TRUE,"Selection"="B")) m3 <- P4Message("Fred","Task1","EA","New Stats", details=list("score"=1,"theta"=0.12345,"noitems"=1)) ev1 <- P4Message("Phred","Level 1","PP","Task Done", timestamp=as.POSIXct("2018-12-21 00:01:01"), details=list("list"=list("one"=1,"two"=1:2),"vector"=(1:3))) m1a <- buildMessage(mongo::ununboxer(as.jlist(m1,attributes(m1)))) m2a <- buildMessage(mongo::ununboxer(as.jlist(m2,attributes(m2)))) m3a <- buildMessage(mongo::ununboxer(as.jlist(m3,attributes(m3)))) ev1a <- buildMessage(mongo::ununboxer(as.jlist(ev1,attributes(ev1))))
This listener simply takes its messages and adds them to a list. It is is mainly used for testing the message system.
This listener simply takes all messages and pushes them onto the
field. The messages
field is the complete
list of received messages, most recent to most ancient. The method
returns the most recent message.
This class implements the Listener
All reference classes extend and inherit methods from
signature(x = "CaptureListener")
: returns
signature(x = "CaptureListener")
: If
the message is in the messSet
, it adds the message to the
message list. (See details)
signature(x= "InjectionListener")
: Returns the name assigned to the listener.
signature(listener =
"CaptureListener", appid )
: Builds a data datable from the messages.
When the listenerDataTable
method is called, the table
is made by applying the attributes
function to the
list. As these are presumably
objects, this will expose the fields as
a database.
:Object of class list
the list of
messages in reverse chronological order.
:Returns the most recent message.
:Does the work of inserting the message. See Details.
:Empties the message list.
initialize(messages, ...)
:Sets the default values for the fields.
Russell Almond
This is an example of the observer design pattern. https://en.wikipedia.org/wiki/Observer_pattern.
, P4Message
mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EABN",mess="Statistics", details=list("Physics_EAP"=0.5237,"Physics_Mode"="High")) cl <- CaptureListener() receiveMessage(cl,mess1) stopifnot(all.equal(mess1,cl$lastMessage()))
mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EABN",mess="Statistics", details=list("Physics_EAP"=0.5237,"Physics_Mode"="High")) cl <- CaptureListener() receiveMessage(cl,mess1) stopifnot(all.equal(mess1,cl$lastMessage()))
Often a queue will contain a number of messages which do not get processed. This function
cleans out messages from the queue. This is typically called both
before (called “cleaning”) importing new messages (see
) and after (called “purging”).
cleanMessageQueue(queue, query, appid) ## S4 method for signature 'MongoQueue' cleanMessageQueue(queue, query, appid)
cleanMessageQueue(queue, query, appid) ## S4 method for signature 'MongoQueue' cleanMessageQueue(queue, query, appid)
queue |
An object of class |
query |
A list which forms a Mongo query for
selecting the messages to be removed. See
appid |
A character scalar giving the name of the application to be cleaned. |
Return value is undefined. Called for its side effects (removing messages from database collection).
Will log and throw database errors.
Generates log entries using 'futile.logger'.
Currently no method for 'ListQueue' objects.
Russell Almond
mq <- new("MongoQueue","QueueTest",mongo::MongoDB("Messages",noMongo=TRUE), builder=buildMessage) ## Remove Fred's messages from the database. cleanMessageQueue(mq,list(c(uid="Fred")),"QueueTest") ## Purge NO-OP messages from the imported data. cleanMessageQueue(mq,list(c(mess="NO-OP")),"QueueTest")
mq <- new("MongoQueue","QueueTest",mongo::MongoDB("Messages",noMongo=TRUE), builder=buildMessage) ## Remove Fred's messages from the database. cleanMessageQueue(mq,list(c(uid="Fred")),"QueueTest") ## Purge NO-OP messages from the imported data. cleanMessageQueue(mq,list(c(mess="NO-OP")),"QueueTest")
Searchers through the messages for the first unprocessed message. Return NULL
if none is found.
fetchNextMessage(queue) ## S4 method for signature 'MessageQueue' fetchNextMessage(queue)
fetchNextMessage(queue) ## S4 method for signature 'MessageQueue' fetchNextMessage(queue)
queue |
The |
The ListQueue
message iterates through its internal collection
until it finds an unprocessed message, or it runs out of messages.
The MongoQueue
message searches the collection.
Either an object of class P4Message
if there are no remaining unprocessed messages.
The ListQueue
method returns the current message if it has not
been processed. Otherwise, it increments to pointer until if either
finds an unprocessed messages or runs out of messages.
The MongoQueue
method sorts the unprocessed messages by
timestamp, and returns the one with the earliest message.
In both cases, markAsProcessed
must be called on the
processed message to advance the queue.
Russell Almond
, markAsProcessed
messy <- list( P4Message("test","Test 1","Tester","Test Message"), P4Message("test","Test 2","Tester","Test Message",processed=TRUE), P4Message("test","Test 3","Tester","Test Message")) messq <- new("ListQueue","Qtest",messy) mess1 <- fetchNextMessage(messq) mess1 fetchNextMessage(messq) markAsProcessed(messq,mess1) mess2 <- fetchNextMessage(messq) mess2 markAsProcessed(messq,mess2) mess3 <- fetchNextMessage(messq)
messy <- list( P4Message("test","Test 1","Tester","Test Message"), P4Message("test","Test 2","Tester","Test Message",processed=TRUE), P4Message("test","Test 3","Tester","Test Message")) messq <- new("ListQueue","Qtest",messy) mess1 <- fetchNextMessage(messq) mess1 fetchNextMessage(messq) markAsProcessed(messq,mess1) mess2 <- fetchNextMessage(messq) mess2 markAsProcessed(messq,mess2) mess3 <- fetchNextMessage(messq)
The function updateTable
extracts a data table from the
listener named by which
, saves it into the named file. It then
registers the generated file using registerOutput
The function generateListenerExports
calls the
for each element in the export list, which should
be a list of arguments to updateTable
generateListenerExports(ls, exportlist, appid, outdir, process = ls$sender) updateTable(ls, which, type, appid, outdir, fname = "<app>_<name>.csv", process = ls$sender, flattener = jsonlite::flatten, doc="", name=which)
generateListenerExports(ls, exportlist, appid, outdir, process = ls$sender) updateTable(ls, which, type, appid, outdir, fname = "<app>_<name>.csv", process = ls$sender, flattener = jsonlite::flatten, doc="", name=which)
ls |
The |
exportlist |
A list of lists of arguments to |
appid |
A character scalar giving the name of the application. This should be the long name (e.g., “ecd://org/unit/assessment” not the short name (“assessment”). |
outdir |
The path to the directory where the output should be stored. |
process |
A character scalare giving the name of the generating
process. Passed to |
which |
An itentifier for which listener will generate the table, in other words, the name of one of the listeners. |
type |
A character string identifying the type of the output.
Passed to |
fname |
A character vector giving a pattern for a file name. The
string “<app>” is substituted for |
flattener |
A function or string naming a function which is used to flatten nested data. See details. |
name |
Used to label the table in the registrity. |
doc |
A doc string added to the registrty. |
The updateTable
function calls the
on the listener
. As the details
fields of
the messages, could be nested, it might need to be flattened so that
it can be exported as a CSV file, so the flattener
function is
called. Then the resulting data table is written out to
The generateListenerExports
is fed a list of arguments for
. The idea is that this information can be included
in the config.json
file. Each element should be a list with
the following components:
Required, the name of the listener.
Optional, the type of the output (for the registry); defaults to “data”.
Optional, the name of table in the registry. Defaults
to which
Optional, the file name. This is actually a pattern,
and “<app>” is replaced with basename(appid)
“<name>” is replaced with name
. Default is
Optional, The name of the flatterner function.
Defaults to flatten
Optional, a character string describing the table in the registry.
Note that the appid
, outdir
and process
are taken from the call to generateListenerExports
These functions are mainly used for their side effects. The
function returns the generated table invisibly, or
if listenerDataTable
returns NULL
The generateListenerExports
returns the last exported table.
The data stored in the messages can in fact be nested deeply. So the
raw dataframe returned by listenerDataTable
could have
columns that are themselves data frames. The function
function unrolls these
columns into individual components.
Another frequenly used function is
In particular, the PnodeMargin
statistic returns
a labeled vector as output. This function splits it into columns with
headers name.state. Note that to call a function from
another package, that package must be named, so a call to
is in order.
Russell Almond
, registerOutput
## Not run: config.json <- '"listeners":[ {"name":"ToAS", "type":"InjectionListener", "dbname":"ASRecords", "colname":"Statistics", "jsonEncoder":"unparseData", "jsonDecoder":"parseData", "messages":["Statistics"] }, {"name":"PPStats", "type":"UpdateListener", "targetField":"data", "jsonEncoder":"stats2json", "colname":"Statistics", "messages":["Statistics"] } ], "listenerExports":[ {"which":"PPStats", "type": "data", "fname":"stats-<app>.csv", "flattener":"flattenStats", "doc": "Reporting statistics" }, {"which":"ToAS", "type": "hist", "fname":"hist-<app>.csv", "flattener":"flattenStats", "doc": "History of history variables." } ]' config <- jsonlite::fromJSON(config.json,FALSE) appid <- "ecd://example.edu/testgroup/test" outdir <- tempdir() ls <- buildListenerSet("EA",config$listeners, appid, lscol="Messages",dbname="test", dburi=mongo::makeDBuri(), sslops=mongolite::ssl_options(), registrycol="files",registrydbname="test") ## Need to make sure Peanut::flattenStats is recognized require(Peanut) updateTable(ls,"PPstats","data",appid,outdir) generateListenerExports(ls,config$listenerExports,appid,outdir) ## End(Not run)
## Not run: config.json <- '"listeners":[ {"name":"ToAS", "type":"InjectionListener", "dbname":"ASRecords", "colname":"Statistics", "jsonEncoder":"unparseData", "jsonDecoder":"parseData", "messages":["Statistics"] }, {"name":"PPStats", "type":"UpdateListener", "targetField":"data", "jsonEncoder":"stats2json", "colname":"Statistics", "messages":["Statistics"] } ], "listenerExports":[ {"which":"PPStats", "type": "data", "fname":"stats-<app>.csv", "flattener":"flattenStats", "doc": "Reporting statistics" }, {"which":"ToAS", "type": "hist", "fname":"hist-<app>.csv", "flattener":"flattenStats", "doc": "History of history variables." } ]' config <- jsonlite::fromJSON(config.json,FALSE) appid <- "ecd://example.edu/testgroup/test" outdir <- tempdir() ls <- buildListenerSet("EA",config$listeners, appid, lscol="Messages",dbname="test", dburi=mongo::makeDBuri(), sslops=mongolite::ssl_options(), registrycol="files",registrydbname="test") ## Need to make sure Peanut::flattenStats is recognized require(Peanut) updateTable(ls,"PPstats","data",appid,outdir) generateListenerExports(ls,config$listenerExports,appid,outdir) ## End(Not run)
Interts the contents of a JSON file full of messages into the message queue.
importMessages(queue, filelist, data.dir) ## S4 method for signature 'MongoQueue' importMessages(queue, filelist, data.dir)
importMessages(queue, filelist, data.dir) ## S4 method for signature 'MongoQueue' importMessages(queue, filelist, data.dir)
queue |
An object of class |
filelist |
A list of filenames of files containing data to be loaded. |
data.dir |
A character scalar giving the pathname of the directory containing the data files. |
No particular return message. Used for its side effects.
Current implementation uses the shell function 'mongoimport' which may not be the best implementation if the Mongo server is on a different machine.
Russell Almond
## Not run: mq <- MongoQueue("Test",mongo::MongoDB("TestMessages","test")) importMessages(mq,c("PretestResults.json","TestResults.json"),"/usr/local/share/Proc4/data/") ## End(Not run)
## Not run: mq <- MongoQueue("Test",mongo::MongoDB("TestMessages","test")) importMessages(mq,c("PretestResults.json","TestResults.json"),"/usr/local/share/Proc4/data/") ## End(Not run)
This listener takes messages that match its incomming set and inject them into another Mongo database (presumably a queue for another service).
The database is a mongo
collection identified
by dburi
, dbname
and colname
(collection within
the database). The mess
field of the P4Message
is checked against the applicable messages in messSet
. If it
is there, then the message is inserted into the collection.
This class implements the Listener
All reference classes extend and inherit methods from "envRefClass"
signature(x = "InjectionListener")
: returns
signature(x = "InjectionListener", message)
: If
the message is in the messSet
, it saves the message to the
database. (See details)
signature(x= "InjectionListener")
: Returns the name assigned to the listener.
signature(listener =
"InjectionListener", appid )
: Builds a data datable from the messages.
When the listenerDataTable
method is called,
a general find query (mdbFind
on the backing
collection. The app
, uid
, context
fields are selected, and the data
field is unpackaged and added as additional columns.
:Object of class character
which is used
as the sender field for the message.
:Object of class character
giving the
name of the Mongo database
:Object of class character
giving the url
of the Mongo database.
:Object of class character
giving the
column of the Mongo database.
:A vector of class character
giving the
name of messages which are sent to the database. Only messages
for which mess(message)
is an element of messSet
be inserted.
:Object of class MongoDB
giving the
database. Use messdb()
to access this field to makes sure
it has been set up.
:Accessor for the database collection. Initializes the connection if it has not been set up.
:Does the work of inserting the message. See Details.
:Empties the database collection of messages with this app id.
initialize(sender, dbname, dburi, colname, messSet,
:Sets default values for fields.
Russell Almond
This is an example of the observer design pattern. https://en.wikipedia.org/wiki/Observer_pattern.
, P4Message
## Not run: mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EIEvent",mess="New Observables", details=list(trophy="gold",solvedtime=10)) ilwind <- InjectionListener(sender="EIEvent",messSet="New Observables") receiveMessage(ilwind,mess1) ## End(Not run)
## Not run: mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EIEvent",mess="New Observables", details=list(trophy="gold",solvedtime=10)) ilwind <- InjectionListener(sender="EIEvent",messSet="New Observables") receiveMessage(ilwind,mess1) ## End(Not run)
A listener an an object that takes on the observer or listerner
role in the the listener (or observer) design pattern. A listener
will register itself with a speaker, and when the speaker sends a
message it will act accordingly. The receiveMessage
function must be implemented by a listener. It is called when the
speaker wants to send a message.
receiveMessage(x, message) isListener(x) ## S4 method for signature 'ANY' isListener(x) clearMessages(x, app) listenerName(x) listeningFor(x, newSet)
receiveMessage(x, message) isListener(x) ## S4 method for signature 'ANY' isListener(x) clearMessages(x, app) listenerName(x) listeningFor(x, newSet)
x |
A object of the virtual class |
message |
A |
app |
A character scalar identifying the application served by the listener. |
newSet |
A character vector giving the messages the listener is listening for. If empty, the listener processes all messages it recieves. |
The RefListener
class is an abstract class. Any object can become a
listener by giving it a method for receiveMessage
. The message
is intended to be a subclass of P4Message
, but in
practice, no restriction is placed on the type of the message.
As RefListener
an abstract class, it means
definition. Instead the generic function isListner
is used to
test if the object is a proper listener or not. The default method
checks for the presence of a receiveMessage
method. As this
might not work properly with S3 objects, an object can also register
itself directly by setting a method for isListner
which returns
Typically, a lister will register itself with the speaker objects.
For example the ListenerSet$addListener
method adds
itself to a list of listeners maintained by the object. When the
method is called, the
method is called on each listener in the list.
The isListener
function should return TRUE
, according to whether or not the object follows the
listner protocol.
The listenerName
returns a character scalar with the name of
the listener.
The receiveMessage
and clearMessages
functions are typically
invoked for side effects and it may have any return value.
The listeningFor
function returns a character vector giving the
messages used by the listener.
An identifier for the listener, mainly used in error messages.
A character vector giving the messages list
listener will process. Messages whose mess
field are
not in the list are not processed. As a special case, if
has length 0, then all messages are processed.
An object of class MongoDB
which contains a database to contain the messages. Note: if the
subclass does not use this, then the connection to the database will
not be made.
signature(x = "RefListener")
: Returns true,
as subclasses of RefListener
follow the listener protocol.
signature(x =
: This first checks to see if
is in the messList
field. If
so, it delegates the processing of the message to the
method. This class-based method must be
implemented in subclasses.
signature(sender = "RefListener",
: This delegates the process of cleaning the
message collection to the $reset()
class method.
: Returns the
name of the listener.
Returns the names of the messages this listener is listening for.
If newSet
is supplied, the message set is updated.
:Provides default values for various fields.
: Returns the MongoDB
object in the db
: Does the message processing.
Note that the RefListener
method returns and error, so
subclasses must implement this. Note, also that the filtering of
which messages to handle is done by the S4 method.
: This method clears out the old messages.
Again, subclasses must implement this method as the
class implementation raises an error. The
argument is because several different implementations
may store messages for more than one application in the same place.
:This method returns, or if the
second argument in present, sets the messSet
Russell Almond
Implementing Classes:
, UpdateListener
Related Classes:
, P4Message
setRefClass("FileListener",fields=c(file="character"), contains="RefListener", methods=c( receiveMessage = function (message) { cat("I (",listenerName(.self), ") just got the message ", mess(message), file=file,append=TRUE) }, reset = function(app) { cat("\f",file=file,append=TRUE) } )) myListener <- new("FileListener",name="Test",file="", messSet="Scored Response", db=mongo::MongoDB(noMongo=TRUE)) mess1 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=TRUE,seletion="D")) mess2 <- P4Message("Fred","Task 2","Evidence ID","Raw Response", as.POSIXct("2018-11-04 21:16:45 EST"), list(seletion="C")) isListener(myListener) listenerName(myListener) receiveMessage(myListener,mess1) ## This one is processed. receiveMessage(myListener,mess2) ## This one is ignored. clearMessages(myListener,"")
setRefClass("FileListener",fields=c(file="character"), contains="RefListener", methods=c( receiveMessage = function (message) { cat("I (",listenerName(.self), ") just got the message ", mess(message), file=file,append=TRUE) }, reset = function(app) { cat("\f",file=file,append=TRUE) } )) myListener <- new("FileListener",name="Test",file="", messSet="Scored Response", db=mongo::MongoDB(noMongo=TRUE)) mess1 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=TRUE,seletion="D")) mess2 <- P4Message("Fred","Task 2","Evidence ID","Raw Response", as.POSIXct("2018-11-04 21:16:45 EST"), list(seletion="C")) isListener(myListener) listenerName(myListener) receiveMessage(myListener,mess1) ## This one is processed. receiveMessage(myListener,mess2) ## This one is ignored. clearMessages(myListener,"")
These functions create objects of class
, UpdateListener
, and
CaptureListener(name="Capture",messages = list(), messSet=character(), ...) InjectionListener(name="Injection", db = mongo::MongoDB(noMongo=TRUE), messSet = character(), ...) UpdateListener(name="Update",db = mongo::MongoDB(noMongo=TRUE), targetField = "data", qfields = c("app", "uid"), jsonEncoder = "unparseData", jsonDecoder="parseData", messSet=character(), ...) UpsertListener(name="Upsert", messSet = character(), db = mongo::MongoDB(noMongo=TRUE), qfields = c("app", "uid"), ...) TableListener(name = "ppData", fieldlist = c(uid = "character", context = "character"), messSet = character(), ...)
CaptureListener(name="Capture",messages = list(), messSet=character(), ...) InjectionListener(name="Injection", db = mongo::MongoDB(noMongo=TRUE), messSet = character(), ...) UpdateListener(name="Update",db = mongo::MongoDB(noMongo=TRUE), targetField = "data", qfields = c("app", "uid"), jsonEncoder = "unparseData", jsonDecoder="parseData", messSet=character(), ...) UpsertListener(name="Upsert", messSet = character(), db = mongo::MongoDB(noMongo=TRUE), qfields = c("app", "uid"), ...) TableListener(name = "ppData", fieldlist = c(uid = "character", context = "character"), messSet = character(), ...)
messages |
A list into which to add the messages. |
messSet |
A character vector giving the message values of the messages
that will be processed. Messages whose |
db |
A |
targetField |
The name of the field that will be modified in the
database by the |
jsonEncoder |
A function that will be used to encode the data
object as JSON before it is set. See |
jsonDecoder |
A function that will be used to decode the data
object from JSON when building tables. See
qfields |
The fields that will be used as a key when trying to
find matching messages in the database for the
name |
An object of class |
fieldlist |
A named |
... |
Other arguments passed to the constructor. |
The functions are as follows:
Creates an object of class
which stores the messages in
a list.
Creates an object of class
which inserts the message into
the designated database.
Creates an object of class
which updates the designated
Creates an object of class
which insert or replaces the
message in the designated collection.
Creates an object of class
which adds details from message
to rows of a data frame.
See the class descriptions for more information.
An object of the virtual class Listener
Russell Almond
This is an example of the observer design pattern. https://en.wikipedia.org/wiki/Observer_pattern.
, P4Message
cl <- CaptureListener() il <- InjectionListener("Evidence Collector", db=mongo::MongoDB(collection="EvidenceSets", db="EARecords", url = "mongodb://localhost", noMongo=TRUE), messSet="New Observables") upsl <- UpsertListener("Save Observables", db=mongo::MongoDB(collection="LatestEvidence", db="EARecords", url = "mongodb://localhost", noMongo=TRUE), messSet="New Observables", qfields=c("app","uid")) trophy2json <- function(dat) { paste('{', '"trophyHall"', ':','[', paste( paste('{"',names(dat$trophyHall),'":"',dat$trophyHall,'"}', sep=""), collapse=", "), '],', '"bankBalance"', ':', dat$bankBalance, '}') } ul <- UpdateListener("Player Data", db=mongo::MongoDB(collection="Players", db="Proc4", url = "mongodb://localhost", noMongo=TRUE), targetField="data", messSet=c("Money Earned","Money Spent"), jsonEncoder="trophy2json") tabMaker <- TableListener(name="Trophy Table", messSet="New Observables", fieldlist=c(uid="character", context="character", timestamp="character", solvedtime="numeric", trophy="ordered(none,silver,gold)"))
cl <- CaptureListener() il <- InjectionListener("Evidence Collector", db=mongo::MongoDB(collection="EvidenceSets", db="EARecords", url = "mongodb://localhost", noMongo=TRUE), messSet="New Observables") upsl <- UpsertListener("Save Observables", db=mongo::MongoDB(collection="LatestEvidence", db="EARecords", url = "mongodb://localhost", noMongo=TRUE), messSet="New Observables", qfields=c("app","uid")) trophy2json <- function(dat) { paste('{', '"trophyHall"', ':','[', paste( paste('{"',names(dat$trophyHall),'":"',dat$trophyHall,'"}', sep=""), collapse=", "), '],', '"bankBalance"', ':', dat$bankBalance, '}') } ul <- UpdateListener("Player Data", db=mongo::MongoDB(collection="Players", db="Proc4", url = "mongodb://localhost", noMongo=TRUE), targetField="data", messSet=c("Money Earned","Money Spent"), jsonEncoder="trophy2json") tabMaker <- TableListener(name="Trophy Table", messSet="New Observables", fieldlist=c(uid="character", context="character", timestamp="character", solvedtime="numeric", trophy="ordered(none,silver,gold)"))
A number of listerner capture information. This method extracts the data as a data frame for further processing.
listenerDataTable(listener, appid = character()) ## S4 method for signature 'RefListener' listenerDataTable(listener, appid = character())
listenerDataTable(listener, appid = character()) ## S4 method for signature 'RefListener' listenerDataTable(listener, appid = character())
listener |
A |
appid |
The name of the application whose data is to be extracted. (In case data from more than one application is stored in the same collection.) |
A data.frame
giving the requested data.
Note that this data frame could in fact contain columns which are
themselves data frames. Consider calling
on the output.
Russell Almond
, registerOutput
## Not run: jspecs <- '{ "listeners":[ { "name":"ToAS", "type":"InjectionListener", "dbname":"ASRecords", "colname":"Statistics", "messages":["Statistics"] } ]}' speclist <- jsonlite::fromJSON(jspecs,FALSE) appid <- "ecd://pluto.coe.fsu.edu/P4Test" outdir <- "/usr/local/share/Proc4/data" lset <- buildListenerSet("TestEngine",speclist$listeners, appid=appid, lscol="Messages",dbname="test", dburi="", sslops=mongolite::ssl_options(), registrycol="OutputFiles", registrydbname="Proc4") ## After engine running. sl <- lset$listeners[["ToAS"]] sdat <- listenerDataTable(sl,NULL,appid) registerOutput(ls,"PP Statistics", file.path(outdir,"PPstats.csv"), appid,"EA") ## End(Not run)
## Not run: jspecs <- '{ "listeners":[ { "name":"ToAS", "type":"InjectionListener", "dbname":"ASRecords", "colname":"Statistics", "messages":["Statistics"] } ]}' speclist <- jsonlite::fromJSON(jspecs,FALSE) appid <- "ecd://pluto.coe.fsu.edu/P4Test" outdir <- "/usr/local/share/Proc4/data" lset <- buildListenerSet("TestEngine",speclist$listeners, appid=appid, lscol="Messages",dbname="test", dburi="", sslops=mongolite::ssl_options(), registrycol="OutputFiles", registrydbname="Proc4") ## After engine running. sl <- lset$listeners[["ToAS"]] sdat <- listenerDataTable(sl,NULL,appid) registerOutput(ls,"PP Statistics", file.path(outdir,"PPstats.csv"), appid,"EA") ## End(Not run)
This is a “mix-in” class that adds a speaker protocol to an
object, which is complementary to the Listener
protocol. This object maintains a list of listeners. When the
method is called, it notifies each of the
listeners by calling the receiveMessage
method on the
All reference classes extend and inherit methods from
. The class union
is either a ListenerSet
signature(x = "ListenerSet")
: Returns true,
as the ListenerSet follows the listener protocol.
signature(x = "ListenerSet")
: A synonym
for notifyListeners
signature(sender = "ListenerSet")
: A synonym
for the notifyListeners
internal method.
The key to this class is the notifyListeners
method. This
method should receive as its argument a P4Message
(The protocol is fairly robust to the type of message and the
type is not enforced. In fact, any object which has a
method should work.)
When the notifier is called it performs the following functions:
It saves the message to the collection represented by
. If messdb()
is the empty string) then the messages is not saved.
It calls the receiveMessage
method on each of
the objects in the listener list.
It logs the messages sent using the
, in the "Proc4"
The sending of the messages is logged a the “INFO” level,
and the actual message at the “DEBUG” level.
In addition, the ListenerSet
maintains a named list of
objects (that is, objects that have a
method). The methods addListener
maintain this list.
:Object of class character
:the name of
the source of the messages.
:Object of class character
: the URI for
the mongo
database. If null, then no
recording of messages to a database is done (except possibly in
the listeners).
:Object of class character
: the name of
the database in which messages should be logged.
:Object of class character
: the name of
the collection in which messages should be logged.
:A named list
objects, that is objects for which
is true.
:Object of class MongoDB
is a handle
to the collection where messages are logged, or NULL
if the
log database has not been initialized. As the database may have
not been initialized, programs should call the messdb()
method which will open the database connection if it is not yet
:This method calls
on all of the listeners. See
Protocol section above.
$addListener(name, listener)
:This method addes a lsitener to the list.
$initialize(sender, dburi, listeners, colname, ...)
creates the listener. Note, this does not initialize the database
collection. Call messdb()
to initialize the collection.
:This removes a listener from the collection by its name.
:Empties the database collection of messages with this app id.
: Returns the
database collection to which to log
messages. Creates the column if it has not been initialized.
: Returns the
database collection in which output files will be registered.
signature(name, filename, app, process,
type="data", doc="")
: Adds/updates a field in the database
collection of output files. This allows processes looking at the
database to find output summaries.
The notifyListeners
method uses the
protocol. In particular, it
logs sending the message at the “INFO” level, and the actual
message sent at the “DEBUG” level. In particular, setting
turn on logging of the actual message and
turn off logging of the message sent messages.
It is often useful to redirect the Proc4 logger to a log file. In
addition, changing the logging format to JSON, will allow the message
to be recovered. Thus, try
to activate logging in JSON format.
Russell Almond
, receiveMessage
, P4Message
Listener Classes.
, UpdateListener
, InjectionListener
This is a minimal implementation of the
abstract class. In this case, the
messages are just held in an internal array. It probably works well
for short queues, and does not require a database or other external
connection, so it useful for testing.
The queue is implemented with a list and a pointer to the current
position in the list. The $hasNext()
and $nextMessage()
methods implement a typical iterator paradigm.
Note that the MessageQueue
paradigm is slightly
different. Here the current message is the one returned by
until is is marked as processed
), which will then cause the
method to be called advancing the postition.
Class "MessageQueue"
, directly.
All reference classes extend and inherit methods from
signature(queue = "MessageQueue")
Returns the next unprocessed message from the Queue, or 'NULL' if
there are no processed messages in the queue.
signature(col = "ListQueue", mess = "ANY")
Marks a message as an error, and saves error message in
signature(col = "ListQueue", mess = "ANY")
Marks a message as processed.
signature(queue =
: Clears the processed flag for messages matching
the query.
Note, that currently there is no implementing method for
or importMessages
:Object of class character
giving ID of application.
:Object of class list
giving the messages.
:Object of class integer
giving the current
position of the queue.
:Advances the position, and returns the next message (or 'NULL' if all messages have been returned.
:Returns the current message.
$initialize(app, messages, ...)
:Resets the position to the beginnnig of the queue.
:Updates the message at the current position.
:Returns true if there are more messages in the queue.
:Fetches the next unprocessed
message. This is either the current message, if not processed, or
the $nextMessage()
method is called until the first
unprocessed message is found.
:Returns the number of messages remaining in queue. Note, count includes both processed and unprocessed messages.
This is an experimental implementation, and details may change in future release.
Russell Almond
A collection of message objects can serve as a queue: they can be
sorted by their timestamp
and then processed one at a
time. The function markAsProcessed
sets the processed flag on
the message and then saves it back to the database. The function
returns the processed flag.
The function markAsError
attaches an error to the message and
saves it. The function processingError
returns the error (if
it exists).
markAsProcessed(col, mess) ## S4 method for signature 'JSONDB,P4Message' markAsProcessed(col, mess) ## S4 method for signature 'ListQueue' markAsProcessed(col, mess) ## S4 method for signature 'MongoQueue,ANY' markAsProcessed(col, mess) ## S4 method for signature 'NULL,P4Message' markAsProcessed(col, mess) markAsError(col, mess, e) ## S4 method for signature 'JSONDB,P4Message' markAsError(col, mess, e) ## S4 method for signature 'ListQueue' markAsError(col, mess, e) ## S4 method for signature 'MongoQueue,ANY' markAsError(col, mess, e) ## S4 method for signature 'NULL,P4Message' markAsError(col, mess, e) processed(x) processingError(x)
markAsProcessed(col, mess) ## S4 method for signature 'JSONDB,P4Message' markAsProcessed(col, mess) ## S4 method for signature 'ListQueue' markAsProcessed(col, mess) ## S4 method for signature 'MongoQueue,ANY' markAsProcessed(col, mess) ## S4 method for signature 'NULL,P4Message' markAsProcessed(col, mess) markAsError(col, mess, e) ## S4 method for signature 'JSONDB,P4Message' markAsError(col, mess, e) ## S4 method for signature 'ListQueue' markAsError(col, mess, e) ## S4 method for signature 'MongoQueue,ANY' markAsError(col, mess, e) ## S4 method for signature 'NULL,P4Message' markAsError(col, mess, e) processed(x) processingError(x)
mess |
An object of class |
col |
A |
e |
An object indicating the error occurred. Note this could be either a string giving the error message of an object of an error class. In either case, it is converted to a string before saving. |
x |
A message object to be queried. |
A MongoDB
collection of messages can serve as a
queue (see MongoQueue
As messages are added into the queue, the processed
flag is set to false. The handler then fetches them one at a time
(sorting by the timestamp). It then does whatever action is required
to handle the message. Then the function markAsProcessed
called to set the processed
flag to true and update the entry
in the database.
Some thought needs to be given as to how to handle errors. The
function markAsError
attaches an error object to the message
and then updates it in the collection. The error object is turned
into a string (using toString
) before saving, so
it can be any type of R object (in particular, it could be either the
error message or the actual error object thrown by the function).
The functions markAsProcessed
and markAsError
return the modified message.
The function processed
returns a logical value indicating
whether or not the message has been processed.
The function processingError
returns the error object attached
to the message, or NULL
if no error object is returned. Note
that the error object could be of any type.
The functions markAsProcessed
and markAsError
do not
save the complete record, they just update the processed or error
There was a bug in early version of this function, which caused the error to be put into a list when it was saved. This needs to be carefully checked.
Russell Almond
, getOneRec
, timestamp
, MessageQueue
## Not run: col <- mongolite::mongo("TestMessages") col$remove('{}') # Clear out anything else in queue. mess1 <- P4Message("One","Adder","Tester","Add me",app="adder", details=list(x=1,y=1)) mess2 <- P4Message("Two","Adder","Tester","Add me",app="adder", details=list(x="two",y=2)) mess1 <- saveRec(mess1,col,FALSE) mess2 <- saveRec(mess2,col,FALSE) mess <- getOneRec(buildJQuery(app="adder", processed=FALSE), col, parseMessage, sort = c(timestamp = 1)) iterations <- 0 while (!is.null(mess)) { if (iterations > 4L) stop ("Test not terminating, flag not being set?") iterations <- iterations + 1 print(mess) print(details(mess)) out <- try(print(details(mess)$x+details(mess)$y)) if (is(out,'try-error')) mess <- markAsError(mess,col,out) mess <- markAsProcessed(mess,col) mess <- getOneRec(buildJQuery(app="adder", processed=FALSE), col, parseMessage, sort = c(timestamp = 1)) } mess1a <- getOneRec(buildJQuery(app="adder",uid="One"),col,parseMessage) mess2a <- getOneRec(buildJQuery(app="adder",uid="Two"),col,parseMessage) stopifnot(processed(mess1a),processed(mess2a), is.null(processingError(mess1a)), grepl("Error",processingError(mess2a))) ## End(Not run)
## Not run: col <- mongolite::mongo("TestMessages") col$remove('{}') # Clear out anything else in queue. mess1 <- P4Message("One","Adder","Tester","Add me",app="adder", details=list(x=1,y=1)) mess2 <- P4Message("Two","Adder","Tester","Add me",app="adder", details=list(x="two",y=2)) mess1 <- saveRec(mess1,col,FALSE) mess2 <- saveRec(mess2,col,FALSE) mess <- getOneRec(buildJQuery(app="adder", processed=FALSE), col, parseMessage, sort = c(timestamp = 1)) iterations <- 0 while (!is.null(mess)) { if (iterations > 4L) stop ("Test not terminating, flag not being set?") iterations <- iterations + 1 print(mess) print(details(mess)) out <- try(print(details(mess)$x+details(mess)$y)) if (is(out,'try-error')) mess <- markAsError(mess,col,out) mess <- markAsProcessed(mess,col) mess <- getOneRec(buildJQuery(app="adder", processed=FALSE), col, parseMessage, sort = c(timestamp = 1)) } mess1a <- getOneRec(buildJQuery(app="adder",uid="One"),col,parseMessage) mess2a <- getOneRec(buildJQuery(app="adder",uid="Two"),col,parseMessage) stopifnot(processed(mess1a),processed(mess2a), is.null(processingError(mess1a)), grepl("Error",processingError(mess2a))) ## End(Not run)
A message queue is an ordered collection of
objects. The principle idea is that
the fetchNextMessage()
function will fetch the next
unprocessed message, and consequently, this can be used to schedule
the work for a scoring engine.
The general queue functions are determined by two generic functions:
, and markAsProcessed()
The fetchNextMessage()
returns the “first” (the
meaning of first is defined by the implementing Queue object)
unprocssed message (i.e., processed(mess)=FALSE
Note that the fetchNextMessage()
function will continue
to return the same message until it is marked as processed using
. Note that simply setting
processed(mess) <- FALSE
is not sufficient because the change
is not stored in the queue.
All reference classes extend and inherit methods from
:Object of class character
giving the name
of the application.
The following generic functions are designed to work with subclasses of
message queues, however, currently only the
has all of the methods.
Removes messages matching query from queue.
Returns the next unprocessed message from the Queue, or 'NULL' if there are no processed messages in the queue.
Imports messages into a queue from a file.
Marks a message as an error, and saves error message in queue.
Marks a message as processed.
Clears the processed flag for messages matching the query.
$initialize(app, ...)
:Returns the number of messages remaining in queue.
The current implementations are MongoQueue
which uses a
database collection for the queue, and a partially implemented
which just uses an array of messages. This is
not fully implemented.
Some other alternatives would be to link to a formal queuing system, like Kafka, or to some kind of RPC server.
Russell Almond
This implements the appender
protocol logging to a database.
Note that flog.appender
expects a function as its argument. The $logger()
method returns
a function which can be passed to flog.appender
All reference classes extend and inherit methods from "envRefClass"
:Object of class JSONDB
the refernce to the column where the log will be stored.
:Object of class character
The application identifier for which we are logging errors.
(See app()
:Object of class character
giving the name of the processes (in the 4 Process sense)
that is generating the messages.
:Object of class character
if this has length greater than zero, it should be a file
to which the log is also sent. If it is ""
, then the log message is sent to standard output.
:This does the work of logging a line.
:This returns a function which does the logging.
Russell Almond
col <- mongo::MongoDB("ErrorLog","Admin",noMongo=TRUE) logfile <- tempfile("testlog","/tmp",fileext=".log") apnd <- mongoAppender(db=col,app="p4test",engine="Tester",tee=logfile) futile.logger::flog.appender(apnd$logger(),"TEST")
col <- mongo::MongoDB("ErrorLog","Admin",noMongo=TRUE) logfile <- tempfile("testlog","/tmp",fileext=".log") apnd <- mongoAppender(db=col,app="p4test",engine="Tester",tee=logfile) futile.logger::flog.appender(apnd$logger(),"TEST")
This is a message queue implemented as a database collection.
This wraps a collection in a Mongo (or other JSON-based) database.
The fetchNextMessage
looks for the first (earliest
timestamp) message which is not processed.
Class "MessageQueue"
, directly.
All reference classes extend and inherit methods from
signature(queue = "MongoQueue")
Removes messages matching query from queue.
signature(queue = "MessageQueue")
Returns the next unprocessed message from the Queue, or 'NULL' if
there are no processed messages in the queue.
signature(queue = "MongoQueue")
Imports messages into a queue from a file.
signature(col = "MongoQueue", mess =
: Marks a message as an error, and saves error message in
signature(col = "MongoQueue", mess =
: Marks a message as processed.
signature(queue =
: Clears the processed flag for messages matching
the query.
:Object of class character
giving the
identifier of the application. This is used to restrict the
field of the message to match the current
:Object of class
that provides a
reference to the database collection storing the messages.
:Object of class function
which is used
to reconstruct the messages from the data, see
and getOneRec()
:Returns the JSONDB
$initialize(app, messDB, builder, ...)
:Internal implementation of the fetch method.
:This method builds an index in the collection. Generally only needs to be done once.
:Returns the number of unprocessed messages remaining in queue.
It is probably a good idea to build an index on this database using
the “processed” and “timestamp” fields. The
method does this.
Russell Almond
This is a generic function for objects that send
objects. When this function is called, the
message is sent to the listeners; that is, the
function is called on the listener
objects. Often, this protocol is implemented by having the
include a ListenerSet
notifyListeners(sender, message)
notifyListeners(sender, message)
sender |
An object which sends messages. |
message |
A |
Function is invoked for its side effect, so return value may be anything.
Russell Almond
, Listener
## Not run: ## Requires Mongo database set up. MyListener <- setClass("MyListener",slots=c("name"="character")) setMethod("receiveMessage","MyListener", function(x,mess) cat("I (",x@name,") just got the message ",mess(mess),"\n")) lset <- ListenerSet$new(sender="Other",dburi="mongodb://localhost", colname="messages") lset$addListener("me",MyListener()) mess1 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=TRUE,seletion="D")) mess2 <- P4Message("Fred","Task 2","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:17:25 EST"), list(correct=FALSE,seletion="D")) lset$notifyListeners(mess1) lset$removeListener("me") notifyListeners(lset,mess2) ## End(Not run)
## Not run: ## Requires Mongo database set up. MyListener <- setClass("MyListener",slots=c("name"="character")) setMethod("receiveMessage","MyListener", function(x,mess) cat("I (",x@name,") just got the message ",mess(mess),"\n")) lset <- ListenerSet$new(sender="Other",dburi="mongodb://localhost", colname="messages") lset$addListener("me",MyListener()) mess1 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=TRUE,seletion="D")) mess2 <- P4Message("Fred","Task 2","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:17:25 EST"), list(correct=FALSE,seletion="D")) lset$notifyListeners(mess1) lset$removeListener("me") notifyListeners(lset,mess2) ## End(Not run)
The function P4Message()
creates an object of class
. The other functions access fields
of the messages.
P4Message(uid, context, sender, mess, timestamp = Sys.time(), details = list(), app = "default", processed=FALSE) app(x) app(x) <- value uid(x) uid(x) <- value mess(x) mess(x) <- value context(x) context(x) <- value sender(x) sender(x) <- value timestamp(x) timestamp(x) <- value details(x) details(x) <- value ## S4 method for signature 'P4Message' toString(x,...) ## S4 method for signature 'P4Message' show(object) ## S3 method for class 'P4Message' all.equal(target, current, ..., checkTimestamp = FALSE, check_ids = TRUE)
P4Message(uid, context, sender, mess, timestamp = Sys.time(), details = list(), app = "default", processed=FALSE) app(x) app(x) <- value uid(x) uid(x) <- value mess(x) mess(x) <- value context(x) context(x) <- value sender(x) sender(x) <- value timestamp(x) timestamp(x) <- value details(x) details(x) <- value ## S4 method for signature 'P4Message' toString(x,...) ## S4 method for signature 'P4Message' show(object) ## S3 method for class 'P4Message' all.equal(target, current, ..., checkTimestamp = FALSE, check_ids = TRUE)
uid |
A character object giving an identifier for the user or student. |
context |
A character object giving an identifier for the context, task, or item. |
sender |
A character object giving an identifier for the sender. In the four-process architecture, this should be one of “Activity Selection Process”, “Presentation Process”, “Evidnece Identification Process”, or “Evidence Accumulation Process”. |
mess |
A character object giving a message to be sent. |
timestamp |
The time the message was sent. |
details |
A list giving the data to be sent with the message. |
app |
An identifier for the application using the message. |
processed |
A logical flag: true if the message has been processed and false otherwise. |
x |
A message object to be queried, or converted to a string. |
... |
object |
A message object to be converted to a string. |
target |
A P4Message to compare. |
current |
A P4Message to compare. |
checkTimestamp |
Logical flag. If true, the timestamps are compared as part of the equality test. |
check_ids |
Logical flag. If true, the database ids are compared as part of the equality test. |
value |
A new value for the field, type varies, but usually character. |
This class represents a semi-structured data object with certain
header fields which can be indexed plus the free-form details()
field which contains the body of the message. It can be serielized in
JSON format (using as.json
in the Mongo database (using the
Using the public methods, the fields can be read but not set. The
generic functions are exported so that other object can extend the
class. The m_id
function accesses the mongo
ID of the object (the _id
The function all.equal.P4Message
checks two messages for
identical contents. The flags checkTimestamp
can be used to suppress the checking of those
fields. If timestamps are checked, they must be within .1 seconds to
be considered equal.
An object of class P4Message
The app()
, uid()
, context()
, sender()
, and
functions all return a character scalar. The
, function returns an object of type POSIXt
and the details()
function returns a list.
The function all.equal.P4Message
returns either TRUE
or a
vector of mode “character” describing the differences between
and current
Russell G. Almond
Almond, R. G., Steinberg, L. S., and Mislevy, R.J. (2002). Enhancing the design and delivery of Assessment Systems: A Four-Process Architecture. Journal of Technology, Learning, and Assessment, 1, http://ejournals.bc.edu/ojs/index.php/jtla/article/view/1671.
— class
, saveRec
mess1 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=TRUE,selection="D")) stopifnot( app(mess1) == "default", uid(mess1) == "Fred", context(mess1) == "Task 1", sender(mess1) == "Evidence ID", mess(mess1) == "Scored Response", timestamp(mess1) == as.POSIXct("2018-11-04 21:15:25 EST"), details(mess1)$correct==TRUE, details(mess1)$selection=="D" ) mess2 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=FALSE,selection="E")) all.equal(mess1,mess2) stopifnot(!isTRUE(all.equal(mess1,mess2)))
mess1 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=TRUE,selection="D")) stopifnot( app(mess1) == "default", uid(mess1) == "Fred", context(mess1) == "Task 1", sender(mess1) == "Evidence ID", mess(mess1) == "Scored Response", timestamp(mess1) == as.POSIXct("2018-11-04 21:15:25 EST"), details(mess1)$correct==TRUE, details(mess1)$selection=="D" ) mess2 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=FALSE,selection="E")) all.equal(mess1,mess2) stopifnot(!isTRUE(all.equal(mess1,mess2)))
This is a message which is sent from one process to another in the four process architecture. There are certain header fields with are used to route the message and the details field which is an arbitrary list of data which will can be used by the receiver.
This class represents a semi-structured data object with certain
header fields which can be indexed plus the free-form details()
field which contains the body of the message. It can be serielized in
JSON format (using as.json
) or saved
in the Mongo database (using the mongolite
Objects can be created by calls to the P4Message()
Because all messages have a processed flag and a timestamp, a message
collection becomes a queue. Simply search for the message with the
earliest timestamp with processed(mess)==FALSE
excute that. Then sets processed equal to true using
If an error occurs during processing, the error can be associated with
the message by setting the pError field using
:Used for internal database ID.
:Object of class "character"
specifies the application in which the messages exit.
:Object of class "character"
identifies the user (student).
:Object of class "character"
identifies the context, task, or item.
:Object of class "character"
identifies the sender. This is usually one of
"Presentation Process", "Evidence Identification Process",
"Evidence Accumulation Process", or "Activity Selection Process".
:Object of class "character"
a general
title for the message context.
:Object of class "POSIXt"
which gives
the time at which the message was generated.
:Object of class "list"
which contains the
data to be transmitted with the message.
:A logical value: true if the message has
been processed, and false if the message is still in queue to be
processed. This field is set with markAsProcessed
:If a error occurs while processing this event,
information about the error can be stored here, either as an R
object, or as an R object of class error (or any class). This
field is accessed with processingError
and set with
signature(x = "ANY")
: returns the _id
field, the database ID.
signature(x = "P4Message")
: returns the app field.
signature(obj = "P4Message", ml = "list")
coerces the object into a list to be processed by
signature(x = "P4Message")
: Coerces the
message into a JSON string.
signature(x = "P4Message")
: returns the
context field.
signature(x = "P4Message")
: returns the data
associated with the message as a list.
signature(x = "P4Message")
: returns the message field.
signature(x = "P4Message")
: returns the sender
signature(x = "P4Message")
: returns the timestamp.
signature(x = "P4Message")
: returns the user ID.
signature(x = "P4Message")
: returns a
logical value indicated whether or not the message has been marked
as processed.
signature(x = "P4Message")
: if an
error occurred while processing this message, returns a value
describing the error. Otherwise, returns NULL.
Russell G. Almond
Almond, R. G., Steinberg, L. S., and Mislevy, R.J. (2002). Enhancing the design and delivery of Assessment Systems: A Four-Process Architecture. Journal of Technology, Learning, and Assessment, 1, http://ejournals.bc.edu/ojs/index.php/jtla/article/view/1671.
— constructor
, saveRec
Many scoring engines provide output data files, or log file. This function registers the output files in a database collection, so that other functions can find them.
registerOutput(registrar, name, filename, app, process, type = "data", doc = "")
registerOutput(registrar, name, filename, app, process, type = "data", doc = "")
registrar |
The |
name |
A character scalar identifying the data file. |
filename |
A character scalar giving the path to the file. |
app |
A character scalar identifying the application |
process |
A character scalar identifying the name of the process (engine) generating the data). |
type |
A character scalar identify the data type. Currently supported values are “data” for data files in csv format, and “log” for log files. |
doc |
An object of type character describing the file. |
The file system.file("dongle/Status.php",package="Proc4")
provides a web interface listing the output files. It generates this
by looking at the “OutputFile” collection in the “Proc4”
database. It then builds links to the files, so they can be
The registerOutput
method is used to add, or update the date on
files in the collection.
Mostly used for is side-effects. Returns information about the success of the database operation.
Russell Almond
, listenerDataTable
## Not run: jspecs <- '{ "listeners":[ { "name":"ToAS", "type":"InjectionListener", "dbname":"ASRecords", "colname":"Statistics", "messages":["Statistics"] } ]}' speclist <- jsonlite::fromJSON(jspecs,FALSE) appid <- "ecd://pluto.coe.fsu.edu/P4Test" outdir <- "/usr/local/share/Proc4/data" lset <- buildListenerSet("TestEngine",speclist$listeners, appid=appid, lscol="Messages",dbname="test", dburi="", sslops=mongolite::ssl_options(), registrycol="OutputFiles", registrydbname="Proc4") ## After engine running. sl <- lset$listeners[["ToAS"]] sdat <- listenerDataTable(sl,NULL,appid) registerOutput(ls,"PP Statistics", file.path(outdir,"PPstats.csv"), appid,"EA") ## End(Not run)
## Not run: jspecs <- '{ "listeners":[ { "name":"ToAS", "type":"InjectionListener", "dbname":"ASRecords", "colname":"Statistics", "messages":["Statistics"] } ]}' speclist <- jsonlite::fromJSON(jspecs,FALSE) appid <- "ecd://pluto.coe.fsu.edu/P4Test" outdir <- "/usr/local/share/Proc4/data" lset <- buildListenerSet("TestEngine",speclist$listeners, appid=appid, lscol="Messages",dbname="test", dburi="", sslops=mongolite::ssl_options(), registrycol="OutputFiles", registrydbname="Proc4") ## After engine running. sl <- lset$listeners[["ToAS"]] sdat <- listenerDataTable(sl,NULL,appid) registerOutput(ls,"PP Statistics", file.path(outdir,"PPstats.csv"), appid,"EA") ## End(Not run)
Listeners often cache the messages in some way. This causes the
message cache to be cleared, and operation which is often useful
before a rerun. The which
argument is used to control which
listeners should have their cache cleared.
resetListeners(x, which, app) ## S4 method for signature 'ListenerSet' resetListeners(x, which, app) ## S4 method for signature 'NULL' resetListeners(x, which, app)
resetListeners(x, which, app) ## S4 method for signature 'ListenerSet' resetListeners(x, which, app) ## S4 method for signature 'NULL' resetListeners(x, which, app)
x |
A |
which |
A character vector containing the names of the listeners to reset. The special keyword “ALL” means all listeners will be reset. The special keyword “Self” means that the cache associated with the listener set will be reset. |
app |
A global applicaiton identifier. The reset operation should only be applied to messages from this application. |
Each Listener
object (including the listener set)
has a $reset()
method which empties the cache of messages. This
method calls the $reset()
method for each of the listeners
named in which
. The special keyword “ALL” is used to
reset all listeners and the special keyword “Self” is used to
refer to the ListenerSet
object itself (which may have a
database colleciton).
The ListenerSet
object is returned.
Russell Almond
## Not run: ## Requires Mongo database set up. data2json <- function(dat) { toJSON(sapply(dat,unboxer)) } listeners <- list( cl = CaptureListener(name="cl"), upd = UpdateListener(name="upd",messSet="New Observables", dburi="mongodb://localhost",dbname="test", targetField="data",jsonEncode="data2json", colname="Updated"), ups = UpsertListener(name="ups",sender="EIEvent",messSet="New Observables", dburi="mongodb://localhost",dbname="test", colname="Upserted", qfields=c("app","uid")), il = InjectionListener(name="il",sender="EIEvent",messSet="New Observables", dburi="mongodb://localhost",dbname="test", colname="Injected"), tl = TableListener(name="tl", messSet="New Observables", fieldlist=c(uid="character", context="character", timestamp="character", solvedtime="numeric", trophy="ordered(none,silver,gold)")) ) lset <- ListenerSet$new(sender="Other",dburi="mongodb://localhost", colname="messages",dbname="test",listeners=listeners) mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EIEvent",mess="New Observables", details=list(trophy="gold",solvedtime=10)) resetListeners(lset,"ALL","default") receiveMessage(lset,mess1) ## Check recieved messages. stopifnot(lset$messdb()$count(buildJQuery(app="default"))==1L, length(listeners$cl$messages)==1L, listeners$upd$messdb()$count(buildJQuery(app="default"))==1L, listeners$ups$messdb()$count(buildJQuery(app="default"))==1L, listeners$il$messdb()$count(buildJQuery(app="default"))==1L, nrow(listeners$tl$returnDF())==1L) resetListeners(lset,c("Self","cl","il","tl"),"default") stopifnot(lset$messdb()$count(buildJQuery(app="default"))==0L, length(listeners$cl$messages)==0L, listeners$upd$messdb()$count(buildJQuery(app="default"))==1L, listeners$ups$messdb()$count(buildJQuery(app="default"))==1L, listeners$il$messdb()$count(buildJQuery(app="default"))==0L, nrow(listeners$tl$returnDF())==0L) resetListeners(lset,"ALL","default") stopifnot(lset$messdb()$count(buildJQuery(app="default"))==0L, length(listeners$cl$messages)==0L, listeners$upd$messdb()$count(buildJQuery(app="default"))==0L, listeners$ups$messdb()$count(buildJQuery(app="default"))==0L, listeners$il$messdb()$count(buildJQuery(app="default"))==0L, nrow(listeners$tl$returnDF())==0L) ## End(Not run)
## Not run: ## Requires Mongo database set up. data2json <- function(dat) { toJSON(sapply(dat,unboxer)) } listeners <- list( cl = CaptureListener(name="cl"), upd = UpdateListener(name="upd",messSet="New Observables", dburi="mongodb://localhost",dbname="test", targetField="data",jsonEncode="data2json", colname="Updated"), ups = UpsertListener(name="ups",sender="EIEvent",messSet="New Observables", dburi="mongodb://localhost",dbname="test", colname="Upserted", qfields=c("app","uid")), il = InjectionListener(name="il",sender="EIEvent",messSet="New Observables", dburi="mongodb://localhost",dbname="test", colname="Injected"), tl = TableListener(name="tl", messSet="New Observables", fieldlist=c(uid="character", context="character", timestamp="character", solvedtime="numeric", trophy="ordered(none,silver,gold)")) ) lset <- ListenerSet$new(sender="Other",dburi="mongodb://localhost", colname="messages",dbname="test",listeners=listeners) mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EIEvent",mess="New Observables", details=list(trophy="gold",solvedtime=10)) resetListeners(lset,"ALL","default") receiveMessage(lset,mess1) ## Check recieved messages. stopifnot(lset$messdb()$count(buildJQuery(app="default"))==1L, length(listeners$cl$messages)==1L, listeners$upd$messdb()$count(buildJQuery(app="default"))==1L, listeners$ups$messdb()$count(buildJQuery(app="default"))==1L, listeners$il$messdb()$count(buildJQuery(app="default"))==1L, nrow(listeners$tl$returnDF())==1L) resetListeners(lset,c("Self","cl","il","tl"),"default") stopifnot(lset$messdb()$count(buildJQuery(app="default"))==0L, length(listeners$cl$messages)==0L, listeners$upd$messdb()$count(buildJQuery(app="default"))==1L, listeners$ups$messdb()$count(buildJQuery(app="default"))==1L, listeners$il$messdb()$count(buildJQuery(app="default"))==0L, nrow(listeners$tl$returnDF())==0L) resetListeners(lset,"ALL","default") stopifnot(lset$messdb()$count(buildJQuery(app="default"))==0L, length(listeners$cl$messages)==0L, listeners$upd$messdb()$count(buildJQuery(app="default"))==0L, listeners$ups$messdb()$count(buildJQuery(app="default"))==0L, listeners$il$messdb()$count(buildJQuery(app="default"))==0L, nrow(listeners$tl$returnDF())==0L) ## End(Not run)
The MessageQueue
class uses the processed field of the P4Message
object to
indicate which messages have been processed. This method clears the processed flag, so that messages
can be reprocessed.
resetProcessedMessages(queue, repquery) ## S4 method for signature 'MongoQueue' resetProcessedMessages(queue, repquery) ## S4 method for signature 'ListQueue' resetProcessedMessages(queue, repquery)
resetProcessedMessages(queue, repquery) ## S4 method for signature 'MongoQueue' resetProcessedMessages(queue, repquery) ## S4 method for signature 'ListQueue' resetProcessedMessages(queue, repquery)
queue |
An object of class |
repquery |
A list giving a mongo query (see |
When operating on a MongoQueue
, an update query is run which sets the processed
field of the messages to FALSE
. The repquery
is used to unmark a subset of messages.
For the ListQueue
method, all messages are unmarked regardless of the query.
Function run for side effects, result is status information.
The current ListQueue
implementation is pretty minimal, and will probably get updated.
Russell Almond
, markAsProcessed
, fetchNextMessage
## Writeme
## Writeme
This function wraps the serializeJSON
with encodeString
to properly quote internal quotes. This slob (string large object) can be stored in a database.
In particular, it is the default method for the jsonEncoder
field of the UpdateListener
jlist |
A list containing the data to be serialized. |
A quoted string containing the JSON representation of the argument.
Russell Almond
, encodeString
dat <- list(response="b", score=1) serializeData(dat)
dat <- list(response="b", score=1) serializeData(dat)
A listener that captures data from a P4Message
and puts it into a dataframe.
This listener builds up a data frame with selected data from the
messages. What data is captured is controlled by the fieldlist
object. This is a named character vector whose names correspond to
field names and whose values correspond to type names (see
. The type can also be one of the two
special types, ordered
or factor
. The following is a
summary of the most common types:
, "logical"
, "integer"
:These are numeric values.
:These are character values. They are not converted to factors (see factor types below).
, other values returned by
:These are usuable, but should be used with caution because the output data frame may not be easy to export to other program.
, "factor(...)":
These produce
objects of type ordered
with the comma separated values
between the parenthesis passed as the levels
For example, "ordered(Low,Medium,High)"
will produces
an ordered factor with three levels. (Note that levels should
be in increasing order for ordered factors, but this doesn't
matter for unordered factors.)
For most fields, the field name is matched to the corresponding
element of the details
of the messages. The exceptions
are the names app
, context
, mess
, sender
, which return the value of the corresponding
header fields of the message. Note that
This class implements the Listener
All reference classes extend and inherit methods from "envRefClass"
signature(x = "TableListener")
signature(x = "TableListener")
: If
the message is in the messSet
, it adds a row to its
internal table using the fields specified in fieldlist
(See details.)
signature(x = "TableListener")
: returns the name
of the table. This is usually also the filename where the table
will be stored.
signature(listener =
"TableListener", appid )
: Builds a data datable from the messages.
When the listenerDataTable
method is called, the table
just returns the internal table.
:Object of class character
naming the listener.
:A named character
vector giving the
names and types of the columns of the output matrix. See details.
:Object of class data.frame
this is the
output data frame. Note that the first line is blank line. Use
the function $returnDF()
to get the valid rows.
:A vector of class character
giving the
name of messages which are sent to the database. Only messages
for which mess(mess)
is an element of messSet
be added to the table..
:Processes the message argument.
: An internal function that sets up the first
row of the table as a blank line of the proper types. Called by
initialize(name, fieldlist, messSet, ...)
:Initializes the fields.
: Calls initDF()
to reset the table.
: Returns the part of the df
has data (e.g., omits first line which is used to set the types.)
Russell Almond, Lukas Liu, Nan Wang
This is an example of the observer design pattern. https://en.wikipedia.org/wiki/Observer_pattern.
, P4Message
mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EIEvent",mess="New Observables", as.POSIXct("2018-11-04 21:15:25 EST"), details=list(trophy="gold",solvedtime=10)) mess2 <- P4Message(app="default",uid="Phred",context="Around the Tree", sender="EIEvent",mess="New Observables", as.POSIXct("2018-11-04 21:16:35 EST"), details=list(trophy="silver",solvedtime=25)) tabMaker <- TableListener(name="Trophy Table", messSet="New Observables", fieldlist=c(uid="character", context="character", timestamp="character", solvedtime="numeric", trophy="ordered(none,silver,gold)")) receiveMessage(tabMaker,mess1) tabMaker$returnDF()
mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EIEvent",mess="New Observables", as.POSIXct("2018-11-04 21:15:25 EST"), details=list(trophy="gold",solvedtime=10)) mess2 <- P4Message(app="default",uid="Phred",context="Around the Tree", sender="EIEvent",mess="New Observables", as.POSIXct("2018-11-04 21:16:35 EST"), details=list(trophy="silver",solvedtime=25)) tabMaker <- TableListener(name="Trophy Table", messSet="New Observables", fieldlist=c(uid="character", context="character", timestamp="character", solvedtime="numeric", trophy="ordered(none,silver,gold)")) receiveMessage(tabMaker,mess1) tabMaker$returnDF()
This Listener
updates an existing record (in a Mongo
collection) for the student
), with the contents of the data (details) field of the
The database is a mongo
collection identified
by dburi
, dbname
and colname
(collection within
the database). The mess
field of the P4Message
is checked against the applicable messages in messSet
. If it
is there, then the record in the database corresponding to the
(by default app(mess)
and uid(mess)
) is
updated. Specifically, the field targetField
is set to
. The function jsonEncoder
is called to
encode the target field as a JSON object for injection into the
If targetField=""
, then the behavior is slightly different. Instead the fields in
(labeled by their names) are updated.
This class implements the Listener
All reference classes extend and inherit methods from
signature(x = "UpdateListener")
signature(x = "UpdateListener", message)
: If
the message is in the messSet
, it updates the record
corresponding to app(mess)
and uid(mess)
in the
database with the contents of details(mess)
. (See details.)
signature(x = "UpdateListener")
: Returns
the name assigned to the listener.
signature(listener =
"UpdateListener", appid )
: Builds a data datable from the messages.
When the listenerDataTable
method is called, the table
is made by applying mdbFind
to the target column.
The behavior is different depending on whether or not a
is specified. If there is no target field, then
all fields of the column are returned.
If there is a targetField
, then the jsonDecoder
is applied to its value, and it is joined with the app
, context
, timestamp
fields from the header to
make the data table.
:Object of class character
giving the
name of the Mongo database
:Object of class character
giving the url
of the Mongo database.
:Object of class character
giving the
column of the Mongo database.
:A vector of class character
giving the
name of messages which are sent to the database. Only messages
for which mess(mess)
is an element of messSet
be inserted.
:Object of class MongoDB
giving the
database. Use messdb()
to access this field to makes sure
it has been set up.
:Object of class character
giving the
names of the fields which should be considered a key for the
:Object of class character
the field which is to be set.
:A function
or a non-empty
scalar naming a function which will be used to encode
as a JSON object. The default is
:A function
or character
scalar naming a function which will be used to decode
the target field when building a data table.
The default is parseData
:Accessor for the database collection. Initializes the connection if it has not been set up.
:Does the work of updating the database. See Details.
:Empties the database collection of messages with this app id.
initialize(sender, dbname, dburi, colname, messSet,
:Sets default values for fields.
Russell Almond
This is an example of the observer design pattern. https://en.wikipedia.org/wiki/Observer_pattern.
, P4Message
The function unparseData
is the default encoder.
## Updating the data field. fm <- mongo::fake_mongo(count=list(0L,1L)) ul <- UpdateListener("tester", db=fm,messSet=c("Scored Response")) ## New message, insert mess1 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=TRUE,selection="D")) receiveMessage(ul,mess1) ## Message is update, update. mess1a <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:40 EST"), list(correct=TRUE,selection="D",key="D")) receiveMessage(ul,mess1a) ### No target field, details are added to record. data2json <- function(dat) { jsonlite::toJSON(mongo::unboxer(dat)) } upwind <- UpdateListener(messSet=c("Money Earned","Money Spent"), db=mongo::MongoDB("Players",noMongo=TRUE), targetField="", jsonEncoder="data2json") mess2 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EIEvent",mess="Money Earned", details=list(trophyHall=list(list("Down Hill"="gold"), list("Stairs"="silver")), bankBalance=10)) receiveMessage(upwind,mess2)
## Updating the data field. fm <- mongo::fake_mongo(count=list(0L,1L)) ul <- UpdateListener("tester", db=fm,messSet=c("Scored Response")) ## New message, insert mess1 <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:25 EST"), list(correct=TRUE,selection="D")) receiveMessage(ul,mess1) ## Message is update, update. mess1a <- P4Message("Fred","Task 1","Evidence ID","Scored Response", as.POSIXct("2018-11-04 21:15:40 EST"), list(correct=TRUE,selection="D",key="D")) receiveMessage(ul,mess1a) ### No target field, details are added to record. data2json <- function(dat) { jsonlite::toJSON(mongo::unboxer(dat)) } upwind <- UpdateListener(messSet=c("Money Earned","Money Spent"), db=mongo::MongoDB("Players",noMongo=TRUE), targetField="", jsonEncoder="data2json") mess2 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EIEvent",mess="Money Earned", details=list(trophyHall=list(list("Down Hill"="gold"), list("Stairs"="silver")), bankBalance=10)) receiveMessage(upwind,mess2)
This listener takes messages that match its incomming set and inject them into another Mongo database (presumably a queue for another service). If a matching message exists, it is replaced instead.
The database is a mongo
collection identified
by dburi
, dbname
and colname
(collection within
the database). The mess
field of the P4Message
is checked against the applicable messages in messSet
. If it
is there, then the message is saved in the collection.
Before the message is saved, the collection is checked to see if
another message exits which matches on the fields listed in
. If this is true, the message in the database is
replaced. If not, the message is inserted.
This class implements the Listener
All reference classes extend and inherit methods from "envRefClass"
signature(x = "UpsertListener")
: returns
signature(x = "UpsertListener", message)
: If
the message is in the messSet
, it saves or replaces the
message inthe database. (See details)
signature(x = "UpsertListener")
: Returns
the name assigned to the listener.
signature(listener =
"UpsertListener", appid )
: Builds a data datable from the messages.
When the listenerDataTable
method is called,
a general find query (mdbFind
on the backing
collection. The app
, uid
, context
fields are selected, and the data
field is unpackaged and added as additional columns.
:Object of class character
which is used
as the sender field for the message.
:Object of class character
giving the
name of the Mongo database
:Object of class character
giving the url
of the Mongo database.
:Object of class character
giving the
column of the Mongo database.
:Object of class character
giving the
names of the fields which should be considered a key for the
:A vector of class character
giving the
name of messages which are sent to the database. Only messages
for which mess(mess)
is an element of messSet
be inserted.
:Object of class MongoDB
giving the
database. Use messdb()
to access this field to makes sure
it has been set up.
:Accessor for the database collection. Initializes the connection if it has not been set up.
:Does the work of inserting the message. See Details.
:Empties the database collection of messages with this app id.
initialize(sender, dbname, dburi, colname, messSet,
qfields, ...)
:Sets the default values for the fields.
Russell Almond
This is an example of the observer design pattern. https://en.wikipedia.org/wiki/Observer_pattern.
, P4Message
## Not run: mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EABN",mess="Statistics", details=list("Physics_EAP"=0.5237,"Physics_Mode"="High")) ul <- UpsertListener(colname="Statistics",qfields=c("app","uid"), messSet=c("Statistics")) receiveMessage(ul,mess1) ## End(Not run)
## Not run: mess1 <- P4Message(app="default",uid="Phred",context="Down Hill", sender="EABN",mess="Statistics", details=list("Physics_EAP"=0.5237,"Physics_Mode"="High")) ul <- UpsertListener(colname="Statistics",qfields=c("app","uid"), messSet=c("Statistics")) receiveMessage(ul,mess1) ## End(Not run)
This is a version of try
with a couple of
important differences. First, error messages are redirected to the
log, using the flog.logger
Second, extra context information can be provided to aid with
debugging. Third, stack traces are added to the logs to assist with
later debugging.
withFlogging(expr, ..., context = deparse(substitute(expr)), loggername = flog.namespace(), tracelevel = c("WARN", "ERROR", "FATAL"))
withFlogging(expr, ..., context = deparse(substitute(expr)), loggername = flog.namespace(), tracelevel = c("WARN", "ERROR", "FATAL"))
expr |
The expression which will be exectued. |
... |
Additional context arguments. Each additional argument should have an explicit name. In the case of an error or warning, the additional context details will be added to the log. |
context |
A string identifying the context in which the error occurred. For example, it can identify the case which is being processed. |
loggername |
This is passed as the |
tracelevel |
A character vector giving the levels of conditions for which stack traces should be added to the log. Should be strings with values “TRACE”, “DEBUG”, “INFO”, “WARN”, “ERROR” or “FATAL”. |
The various processes of the four process assessment design are meant to run as servers. So when errors occur, it is important that they get logged with sufficient detail that they can be reproduced, fixed and added to the test suite to prevent recurrance.
First, signals are caught and redirected to the appropriate
handler. This has several
important advantages. First, the output can be directed to various
files depending on the origin package. In general, the name of the
package should be the name of the logger. So,
would log error from the EIEvent package to the named
file. Furthermore, flog.layout(layout.json,name="EIEvent")
will cause the log to be in JSON format.
Second, additional context information is logged at the “DEBUG”
level when an condition is signaled. The context
string is
printed along with the error or warning message. This can be used,
for example, to provide information about the user and task that was
being processed when the condition was signaled. In addition, any of
the ...
arguments are printed. This can be used to print
information about the message being processed and the initial state of
the system, so that the error condition can be reproduced.
Third, if the class of the exception is in the tracelevel
then a stack trace will be logged (at the “DEBUG” level) along
with the error. This should aid debugging.
Fourth, in the case of an error or fatal error, an object of class
(see try
). Among other things,
this guarentees that withFlogging
will always return control to
the next statement.
If expr
executes successfully (with no errors or fatal errors)
then the value of expr
will be returned. If an error occurs
during execution, then an object of class try-error
will be
Russell Almond
The code for executing the stack trace was taken from https://stackoverflow.com/questions/1975110/printing-stack-trace-and-continuing-after-error-occurs-in-r
, flog.logger
## Not run: ## Setup to log to file in json format. flog.appender(appender.file("/var/log/Proc4/Proc4_log.json"), name="Proc4") flog.layout(layout.json,name="EIEvent") ## End(Not run) xy <- withFlogging(stop("shoes untied"),context="walking",foot="left") stopifnot(is(xy,"try-error")) xx <- withFlogging(log(-1)) stopifnot(is.nan(xx)) withFlogging(log(-1),tracelevel=c("ERROR","FATAL"))
## Not run: ## Setup to log to file in json format. flog.appender(appender.file("/var/log/Proc4/Proc4_log.json"), name="Proc4") flog.layout(layout.json,name="EIEvent") ## End(Not run) xy <- withFlogging(stop("shoes untied"),context="walking",foot="left") stopifnot(is(xy,"try-error")) xx <- withFlogging(log(-1)) stopifnot(is.nan(xx)) withFlogging(log(-1),tracelevel=c("ERROR","FATAL"))