Scheduling Automated Updates
There are often many data update operations that must be performed on a regular or periodic basis, for example, retrieving updates from external data sources, or performing repetitive operations that occur at inconvenient times and users cannot regularly perform themselves. Examples include such database operations as:
- Refreshing underlying data in materialized views
- Loading or Inserting new data
- Exporting data
AnzoGraph provides a CRON-like mechanism to automatically perform these operations. However, in AnzoGraph, these operations are managed entirely within the database itself, rather than being controlled by the configuration of external control files.
This topic provides instructions for setting up these automated database operations and describes all the configuration options and best practices available to control the scheduling, prioritization, error handling, and other aspects of performing these scheduled jobs.
There are two primary aspects to creating and configuring automated or scheduled database operations within AnzoGraph:
- Create and define the contents of one or more Cron graphs, each of which specify the database operations in one or more individual jobs to be scheduled for execution. The Cron graph files also specify configuration settings that control other aspects of each scheduled job, such as a job's scheduled execution time (particular dates and times or intervals), retry options, error handling policies, and so on.
Each Cron graph is defined simply as a collection of regular RDF triples, with each triple in the Cron graph specifying a particular scheduled job attribute or parameter.
- Update the scheduled Cron graph job settings in the AnzoGraph settings.conf file to include the Cron graphs you want to specify for execution. The settings.conf file contains two settings to control scheduling and execution of selected Cron graphs, cron_graphs and cron_graphs_recheck.
Creating a New Cron Graph File
A Cron graph file simply defines a collection of RDF triples that specify configuration and scheduling information for one or more Cron jobs. A Cron graph can contain any number of Cron jobs, and each cron job can have scheduling and error-handling policy different than other cron jobs in the same graph.
The following content provides a simple example of a cron graph file, for example, named cron1.ttl, which configures the scheduling of two separate data operation jobs with the same graph:
PREFIX azg: <http://www.anzograph.com/> . <job1> azg:BaseTime "2020-04-07:11:32"^^xsd:dateTime . <job1> azg:Schedule "1Day"^^xsd:duration . <job1> azg:ErrorPolicy "AbortDatabase" . <job1> azg:RetryInterval "1Hour"^^xsd::duration . <job1> azg:RetryCount 23 . <job1> azg:Statement "REFRESH VIEW <testView1>" . <job2> azg:BaseTime "2020-07-08:00:00"^^xsd:dateTime . <job2> azg:Schedule "1Day"^^xsd:duration . <job2> azg:ErrorPolicy "Ignore" . <job2> azg:Statement "REFRESH VIEW <testView2>" .
In this example, the subject position in Cron graph triples specifies individual job names, that is, job1 for scheduling and configuration of one scheduled job, and job2 for the scheduling and configuration of a second job. The predicate position of each triple specifies a particular attribute or parameter of a scheduled job. (The next section, Configuring Cron Graph Job Parameters, provides a more detailed description of each job attribute or parameter and the values you can specify for each job setting.)
The azg:Statement predicate keyword within each job specifies the specific database operation (any valid SPARQL statement or command) to be performed when the corresponding job is executed. You can specify multiple database operations to be performed by separating each statement by double semicolon characters(;;), that is sparqlstatement1;;sparqlstatement2. Each individual SPARQL statement in a job is executed as a separate AnzoGraph transaction following ACID principles.
Once you have created a Cron graph file, you load the Cron graph file triples into an AnzoGraph graph. To load a Cron graph file into AnzoGraph, you can use the LOAD command. For example:
LOAD <file:/filepath/cron1.ttl> INTO GRAPH <CronGraph1>
In this example, the Cron job triples stored in the cron1.ttl file are loaded into AnzoGraph in a graph named CronGraph1. It is this name, CronGraph1, you can specify with the the cron_graphs= setting in the AnzoGraph configuration file, installPath/config/settings.conf, to run the scheduled jobs defined within the CronGraph1 graph. With the cron_graphs setting, you can specify one or more Cron graph names, separating each name with a comma. For example:
cron_graphs=CronGraph1, CronGraph2
Each Cron graph is assigned a different Cron thread. The Cron thread acts as a "virtual user" that evaluates when to next run a Cron job defined within the same Cron graph. Each individual Cron thread runs only one job at a time. If two jobs are scheduled for the same time (or the current time is already later than the scheduled time), they are run sequentially.
To execute Cron jobs concurrently, you can define potentially concurrent Cron jobs in different Cron graphs, since jobs in different Cron graphs are run using different, independent Cron threads. For example, you could create one graph named, "quickjobs" that defines many shorter jobs, and create another graph "batchjobs", that runs longer-executing jobs, and jobs from the two Cron graphs could be run concurrently, independent of whether another job starts or finishes.
If AnzoGraph is stopped and subsequently restarted, Cron jobs return to their normal scheduled interval times. This can be critical for example, for nightly jobs scheduled for execution at midnight. For example, a skipped midnight job will not be performed until midnight of the next day.
Naming the Graph in a Cron Graph File
As an alternative to specifying the AnzoGraph graph name when you load the Cron graph triples, you can specify the name of the Cron graph within the triples file itself, for example:
PREFIX azg: <http://www.anzograph.com/> .
GRAPH <CronGraph1> {
<job1> azg:BaseTime "2020-04-07:11:32"^^xsd:dateTime . ... <job1> azg:Statement "REFRESH VIEW <testView1>" . <job2> azg:BaseTime "2020-07-08:00:00"^^xsd:dateTime . ... <job2> azg:Statement "REFRESH VIEW <testView2>" . }
You could then simply load the Cron graph file into AnzoGraph using the following LOAD command:
LOAD <file:/filepath/cron1.ttl>
Cron Graph Use Cases
This section describes example use cases for the automated update feature.
Streaming via Micro Batch Loading
One of the primary use cases for AnzoGraph Cron jobs is to schedule and regularly pull data from an external data or message streaming service into AnzoGraph. One prime example is Apache Kafka, which is an open source messaging platform that many customers have incorporated into their data pipeline architectures.
Customers are not limited to using Kafka for streaming data into AnzoGraph. Just highlighting a few, other alternatives that are available include services such as ActiveMQ, RabbitMQ, ZeroMQ, Amazon Kinesis, Apache Spark, and Apache Storm.
Using the Kafka managed service, users can create Cron jobs to ingest data in AnzoGraph using any arbitrary ETL query. To use a managed service such as Kafka, you need to first configure and start the Kafka server providing the streaming service to AnzoGraph. Then, you can specify the SERVICE statement in the WHERE clause of the SPARQL INSERT statement configured with the Cron job's "Statement" parameter to ingest data from the Kafka server. For example:
azg:Statement """INSERT {graph <http://mygraph.com>{ ?ins a <http://anzograph.com/Event>; <timestamp> ?timestamp ; <eventid> ?eventid. } } WHERE { SERVICE <http://cambridgesemantics.com/anzograph/service#kafka_stream> ("localhost:9092", "AZG_STREAM", "timestamp:message:long eventid:message:long eventname:message:String channelid:message:String event_type:message:String firm_id:message:String session:message:String price:message:double latency01:message:long latency02:message:long latency03:message:long latency04:message:long latency05:message:long symbol:message:String protocol:message:String new_price:message:double", "kafka.data.format=JSON group.id=AZG_CGROUP7 kafka.poll.duration=100 auto.offset.reset=latest enable.auto.commit=false") {} BIND(IRI(CONCAT("http://anzograph.com/kafka/",STRUUID())) as ?ins) }"""^^xsd:string .
In this example, the INSERT clause specifies the subject, predicate, and object/value of triples ingested from the Kafka service, the event, timestamp, and event ID.
Following execution of a Cron job using the Kafka service, you can verify the ingested triples from the service with a query such as the following:
SELECT (COUNT(*) as ?cnt) FROM <http://mygraph.com>
WHERE { ?ins a <http://anzograph.com/Event>.}"
Configuring Cron Graph Job Parameters
As shown in the earlier example, all configuration settings to define and schedule an AnzoGraph Cron graph job are specified via triples in a Cron graph. The predicate and object pairs you can define in a Cron graph triples file are the following
Option | Description |
---|---|
azg:Schedule or azg:Delay | Either azg:Schedule or azg:Delay are required for each job. The object values available for either setting are those allowed for the xsd:duration data type. (See the W3C description of XSD datatypes: xsd:duration.) The azg:Schedule duration is added to the azg:BaseTime setting, if specified, to produce the job's next scheduled execution time. If "azg:BaseTime" is specified for a job, the system uses AnzoGraph’s startup time. Thus, if AnzoGraph was started at 2 PM on Sunday, May 12, then the same scheduled job would run every day at 2 PM. It is important to note that these options set "scheduled request times", not guaranteed start times. If the system is busy enough that a given job would have multiple outstanding requested start times, only the last one is executed. If AnzoGraph is stopped and subsequently restarted, Cron jobs return to their normal scheduled interval times. If azg:Delay is specified instead of azg:Schedule, the execution of the associated Cron job statement is delayed by the specified interval (in seconds) from the time the the job statement had last completed execution. |
azg:Statement | Required parameter. This parameter specifies the text string of a SPARQL statement or command to be executed as part of the job. Specifying SPARQL statements in a separate referenceable file, rather than as a text string, for example, <file:/filepath/job1.rq>, is not currently supported. |
azg:BaseTime | Optional parameter. The azg:Schedule duration is added to the azg:BaseTime setting in order to produce the job's next scheduled execution time. If azg:BaseTime is not specified, AnzoGraph's start time is used to specify the the Cron job's first start time. |
azg:ErrorPolicy | Required parameter. Values allowed include "Ignore", "Disable", "BlockUsers", or "AbortDatabase". (See Setting a Cron Job Error Policy for a description of each error policy.) |
azg:RetryInterval | Optional parameter. If specified, and the associated Cron job statement produces an error, this parameter specifies an xsd:duration value that defines an interval of time, after which the associated job statement is retried. The job statement will be continually retried until the first success, or until the azg:RetryCount is reached, after which, the job returns to its normal scheduled time. |
azg:RetryCount | Optional parameter. This parameter defines the number of retries that will be attempted when a Cron job's SPARQL statement execution is not successful. If azg:RetryCount is specified, azg:RetryInterval must also be specified. When the number of job execution retries reach the Retry count, the specified azg:ErrorPolicy for this job is performed. If the "Ignore" error policy is specified, the associated job just resumes to its normal scheduled time. |
azg:RunAfterStartup | Optional parameter. This object specifies a boolean true or false value. If set to true, initial execution of the job occurs shortly after AnzoGraph startup, irrespective of its designated azg:Schedule setting. |
If any required triples are missing or specify an invalid triple, the associated Cron graph job is rejected and returns an error. (See Monitoring Job Execution and Reporting Errors.)
Configuring AnzoGraph to Run Cron Jobs
AnzoGraph maintains a number of "Cron" scheduling threads for processing jobs concurrently. Each Cron thread processes all the scheduled jobs contained in a specific Cron graph.
To configure AnzoGraph to run the scheduled jobs within Cron graphs, edit the configuration file, installPath/config/settings.conf, to specify values for the following two settings:
cron_graphs=cronGraphList
cron_graphs_recheck=refreshGraphInterval
With the cron_graphs= setting, you can specify one or more Cron graph names, separating each name with a comma. For example:
cron_graphs=CronGraph1, CronGraph2
Each Cron graph is assigned a different Cron thread. The Cron thread acts as a "virtual user" that evaluates when to next run a Cron job defined within the same Cron graph. Each individual Cron thread runs only one job at a time.
When the Cron threads for specified Cron graphs are started, AnzoGraph reads the Cron graphs job settings to obtain the configuration of any scheduled jobs each Cron graph contains. The second configuration setting, cron_graphs_recheck=, specifies an interval (in seconds) to "recheck" the Cron graphs included in the cron_graphs=cronGraphList for any changes. For example:
cron_graphs_recheck=10
If a Cron graph is non-existent or empty, the associated Cron thread periodically checks at the specified interval whether the named Cron graph is now loaded and has new jobs. If the contents of the Cron graphs changes, the new Cron graph job configuration settings are processed.
Setting a Cron Job Error Policy
To specify what happens when AnzoGraph encounters an error in processing or executing any specific Cron job, users specify an error policy in the configuration of individual Cron jobs. Users may specify different error policies for individual Cron jobs, but each Cron job must contain exactly one error policy. Error Policy choices are the following:
Error Policy | Description |
---|---|
"AbortDatabase" | Most conservative policy. Any critical Cron job failure will produce a crash-dump Xray that can be read by azgdoctor. |
"Ignore" | Most liberal policy. The error information is still reported in extracted Xrays and recorded in the internal sth_errors system table. |
"Disable" | Directs AnzoGraph not to attempt to run the Cron job statement again. |
"BlockUsers" |
Similar to "AbortDatabase", this error policy causes all subsequent user-generated SELECT queries to error out with a "Cron failed, contact your system administrator" message. To unblock users from performing SELECT statements, your AnzoGraph system administrator can restart the database or issue the "SET selects_blocked TO false" statement, for example: azgi -c "SET selects_blocked TO false" |
If a Retry Interval is specified in a Cron graph, Error Policy actions are postponed until the number of retries specified by a Retry Count is reached. No Error Policy actions are performed if you specify the "Ignore" error policy. However, regardless of a job's Error Policy and Retry Count configuration, each time a Cron job error occurs, AnzoGraph makes an entry in the "sth_errors" system table.
Monitoring Job Execution and Reporting Errors
All execution issues or errors arising from scheduled job execution are logged to AnzoGraph’s internal system tables. SPARQL statements executed from Cron graph jobs are registered in the internal "sth_queries" system table and any job execution errors are logged in the "sth_errors" system table.
System administrators can monitor the "sth_cron_events" system table for "event=failed" entries and take whatever corrective action is needed, based on those events. In addition to flagged events in the "sth_cron_events" system table, an administrator can diagnose failures by examining entries in the "sth_errors" table.
New entries in the "sth_cron_events" and "sth_errors" system tables are, by default, also spooled to disk to guarantee their full presence in crashdumps and operational Xrays.
You can query AnzoGraph's internal system tables using regular SPARQL queries, so you can query their contents just like that of any other database source:
- If an error is caused by one of a Cron graph's job configuration settings, the "sth_errors.basic_text" field value for an Cron error will begin with "Cron: ".
- For errors in execution of a Cron job statement itself, you can query the "sth_query" table joined with the "sth_errors" table. Entries in the "sth_queries" system table also have a label that makes it easier to identify the statements launched by a Cron job, so you can identify failed Cron statement queries as those system table entries where the sth_query.label begins with "cron:" and "sth_query.aborted=1".
- If AnzoGraph is stopped and subsequently restarted, Cron jobs return to their normal scheduled interval times. This can be critical for example, for nightly jobs scheduled for execution at midnight. A skipped midnight job will not be performed until midnight of the next day. An administrator can verify that no critical jobs were skipped, by tracking events labelled as sth_cron_events.event="finished".
The following sections provide examples of queries on several of the different internal database tables providing information on scheduled and executed Cron graph job statements.
Sth_cron_graph System Table
This table logs all scans of the Cron graphs (including Cron graph refreshes). The following query retrieves a tabular display of Sth_cron_graph entries.
azgi -c "select * where {table 'sth_cron_graphs'}"
Sth_cron_events System Table
This table logs activities related to execution of Cron jobs by their associated Cron threads. The following query retrieves a tabular display of Sth_cron_events entries.
azgi -c "select * where {table 'sth_cron_events'}"
Cron Graph Errors
Incorrect configuration of scheduled jobs and parameter settings in Cron graph triples may generate their own syntax or semantic errors and thus produce their own entries in the sth_errors table:
Cron Graph Errors | Error Messages |
---|---|
CronInvalidPredicate | "Cron: Invalid predicate" |
CronOneDuration | "Cron: Multiple durations are being requested" |
CronOneStatement | "Cron: Multiple statements are being requested" |
CronOneFirstTime | "Cron: Multiple base/first times are being requested" |
CronMissingStatement | "Cron: Missing Statement to execute" |
CronMissingErrorPolicy | "Cron: Missing ErrorPolicy to execute" |
CronConflictingErrorPolicy | "Cron: ErrorPolicy must be 'Ignore' if no RetryCount is specified" |
CronUnknownErrorPolicy | "Cron: Unknown ErrorPolicy" |
CronSingleErrorPolicy | "Cron: Only a single ErrorPolicy allowed per subject" |
CronSingleStatement | "Cron: Only a single Statement allowed per subject" |
CronSingleFirstTime | "Cron: Only a single FirstTime allowed per subject" |
CronMustBeLiteralNotIRI | "Cron: Object must be a literal, cannot be an IRI" |
CronWrongType | "Cron: Object wrong type" |
CronStatementFailed | "Cron: Statement failed to execute, see system table sth_errors for more information" |
CronRetryCountIncons | "Cron: Specifying an RetryCount requires a RetryInterval" |
CronRetryCountPos | "Cron: RetryCount must be greater than 0" |
CronIntervalPos | "Cron: Schedule, Delay, RetryInterval must be greater than 0" |
CronMissingRetryCount | "Cron: Retry requires a RetryCount unless ErrorPolicy is Ignore" |
CronBlockingUsers |
"All SELECTS blocked, contact your system administrator" azgi -c "SET selects_blocked TO false" |