Onboarding Data from a Database Incrementally

When onboarding data from a database, Anzo provides the option to create a schema that is configured to import only data that has been added to the data source since the last time the data was onboarded. This topic provides instructions for writing a schema query to import data incrementally and gives an overview of the workflow to onboard the data.

Important: Anzo onboards new source data only; it does not process data that was updated or deleted in the source database. Running the ETL job for an incremental schema query replaces the existing data with the newly onboarded data. What happens to the existing data in an FLDS when I run an incremental ETL job?

Writing an Incremental Schema Query

  1. In the Anzo console, expand the Onboard menu and click Structured Data. Anzo displays the Data Sources screen, which lists any existing data sources. For example:

  2. On the Data Sources screen, click the data source for which you want to create a schema. Anzo displays the Overview screen for that source.
  3. Click the Schema tab, and then click the Create Schemas From Query button. Anzo displays the Create Schemas dialog box:

  4. In the Create Schemas dialog box, specify a name for the schema in the Schema Name field.
  5. In the Table Name field, specify a name for this schema table.
  6. At the bottom of the screen, enable the Include increment data option by sliding the slider to the right. Anzo displays additional settings.

  7. Populate the following fields so that you can use the values as a guide for writing the schema query:
    • Incremental Column Name: The source column whose value will be used to increment the data.
    • Value: The value in the column to use as the stopping point for the current import process and the starting point for the next import.
    • Comparator: The operator to use for comparing source values against the value above.
  8. In the query text field, type the SQL statement that will target the appropriate source data. The WHERE clause must include the incremental column name, the comparison operator, and an INCREMENTVALUE parameter that is substituted with the Value at runtime. For example:
    SELECT EmployeeID, FirstName, LastName, Title, Salary, BirthDate, HireDate, Region, Country
    FROM northwind.Employees
    WHERE EmployeeID > {INCREMENTVALUE}

    Make sure that the query includes the INCREMENTVALUE parameter and uses the same Incremental Column Name and Comparator values as the settings. For example:

  9. Click Save to save the query. Anzo creates the new schema and adds it to the list of schemas on the screen. For example:

Once the schema is created, the source data that the schema query targets can be onboarded to Anzo. For instructions on onboarding the data by letting Anzo automatically generate the mapping, model, and ETL pipeline, see Auto-Ingesting Imported Data. For information about manually creating mappings, models, and pipelines, see Working with Mappings, Modeling Data, and Working with Pipelines.

Running an Incremental Pipeline

When an incremental schema is added to an ETL job, a clock icon () is displayed when hovering over the component in the job. For example:

Clicking the clock icon opens the Incremental Load dialog box, which lists the Incremental Column Name, Value, and Comparator from the schema query. For example:

Publishing the job for this example will onboard only the records for which the EmployeeID is greater than 5. When the job is finished, Anzo adjusts the incremental load value to list the last value that was onboarded for the incremental column. Every time the pipeline is published, Anzo changes the incremental load value parameter to the highest or lowest value for the column, depending on the Comparator.

For example, viewing the Incremental Load dialog box after running the job above shows that the last EmployeeID value that was onboarded was 14:

The next time this job is run, Anzo will onboard only the records where EmployeeID is greater than 14. To view the number of rows processed after running a job, you can search the System Datasource for the following predicate on the Find tab in the Query Builder:

<http://cambridgesemantics.com/ontologies/2015/08/SDIService#rowsProcessed>

The Object column shows the number of rows processed each time the pipeline was run. For example:

Related Topics