User-Defined Aggregate (UDA) Examples

User Defined Aggregate (UDA) extensions allow developers to extend the set of aggregate functions that are already built into AnzoGraph DB. A UDA maps multiple rows of input values to a single row of output values. For example, the following code snippet comprises a complete, minimal, working example of a UDA:

*
* implementation:
* 
  struct ALL : Aggregate {
    bool all = true;
    void accumulate(const Row &a) override { all &= a.getBoolean(0); }
    void save      (      Row &s) override { s.setBoolean(0, all);   }
    void merge     (const Row &s) override { all &= s.getBoolean(0); }
    void result    (      Row &r) override { r.setBoolean(0, all);   }
  };
*
* signature:
*
  extern "C" ExtensionFactory *udx_ALL() { return new FactoryFor<ALL>(); }
*
* meta-data:
*
  {
    "name"        : "http://example/all",
    "signature"   : "udx_ALL",
    "type"        : "aggregate",
    "arguments"   : "boolean",
    "states"      : "boolean",
    "results"     : "boolean",
    "description" : "Returns the logical conjunction of a list of booleans"
  }
*

The implementation of this example UDA extension uses the setBoolean and getBoolean routines to process the values of the input and output arguments and perform the operation of the function.

After compiling and registering the UDA in an extension library file, you can use the extension anywhere in a SPARQL query where the syntax allows. For example:

select (<http://example/all>(?a) as ?all) where ...

This example also includes meta-data information in the same file as the udx_ALL function extension, to register the function in AnzoGraph DB. In addition to the attributes common to every extension, aggregate extensions specify the following additional meta-data attributes.

Attribute Description
arguments An array of zero or more types that specifies the number and type of arguments required by any application of the UDA. Determines the shape of the input row passed as the "a" parameter of the accumulate method.
results An array of zero or more types that specifies the number and type of results that are returned by any application of the UDA. Determines the shape of the output row passed as the "r" parameter of the result method.
sorted A boolean value which, if true, indicates that the algorithm being implemented by the aggregate is sensitive to the order in which values are supplied to the accumulate() method. If the aggregate is computed in a subquery that has an ORDER by clause, the compiler normally discards it. If you specify sorted as true, the ORDER BY clause is preserved.
variadic A boolean value which, if true, indicates that the final type listed in the arguments array may be repeated one or more times in an application of the UDA. This attribute is optional.
states An array of zero or more types that specifies the number and type of states that are marshaled across the cluster when merging those intermediate results that accumulated on the slices during the accumulation phase.

Determines the shape of the output row passed as the "s" parameter of the save method and determines the shape of the input row passed as the "s" parameter of the merge method. This attribute is optional.

When compiling a query containing an aggregate extension, the AnzoGraph DB server or the leader node of a cluster verifies that the number and type of the arguments passed to the UDA are consistent with its domain, as specified by the arguments and variadic attributes of the meta-data description.

Aggregation Methods

Since AnzoGraph DB takes advantage of parallel processing, AnzoGraph DB performs aggregation in multiple steps. Each of the data slices operate on groups of values that are local to that particular slice and produce one result for each group. The results of those computations are stored in variables called aggregation states, and the aggregation state values are used to calculate the final result. Each UDA implements the following aggregation methods: accumulate(), save(), merge(), and result(). The UDA implements the save() method to transmit these partially aggregate local results or states to the leader, where they are "merged" in to a fresh instance of the UDA that will compute the final result.

Method Description
accumulate(const Row &a) Passes a row of values whose schema is specified by the "arguments" meta-data attribute. UDAs marked "variadic" may also receive additional values with type "a.shape().last()" at the end of the given row.
save(Row &s) Assigns the accumulated intermediate results to the given row, which will then be transmitted across the cluster to the target slice and merged in to the recipient instance of the UDA..
merge(const Row &s) Merges the intermediate results accumulated by another instance over on a remote slice with the internal state. The intermediate results of the remote slice instance are passed as a row whose shape is specified by the "states" meta-data attribute.
result(Row &r) Assigns the results of the aggregate to the given row, whose shape is specified by the "results" meta-data attribute.

When executing a query, each slice creates its own distinct instance of the UDA for every occurrence in the query by invoking the create method of the associated ExtensionFactory.

  • As each row of values streams through a slice, it is passed to the instance by calling its accumulate method, which responds by updating its internal state as necessary to record having processed the row in some appropriate way.
  • When all rows on the slice have been accumulated, the instance is now given a mutable row (of shape states) into which it serializes any intermediate results it has accumulated, and the instance is then destroyed.
  • The slice receiving the result now creates an instance of the UDA and a row (of shape states), and the system arranges for all intermediate states to be transmitted across the cluster and "merged" into the instance by passing each in turn to its merge method.

The factory, and the instances that it creates, are destroyed only when the query has eventually finished execution.

The Accumulate() Method

The accumulate() method void accumulate(const Row& r) accumulates values from the UDA's input arguments into the aggregation states. In the function, r is the input received by the UDA, and its type is Row. Include the appropriate get routines to define the UDX Data Types. The following table lists the available get routines. The Read Cell with Default routines are used to return a default value if the cell is not defined (empty).

Read Cell Read Cell with Default
defined(size_t) N/A
getBoolean(size_t) getBoolean(size_t, bool)
getByte(size_t) getByte(size_t, uint8_t)
getShort(size_t) getShort(size_t, short)
getInt(size_t) getInt(size_t, int)
getLong(size_t) getLong(size_t, long)
getFloat(size_t) getFloat(size_t, float)
getDouble(size_t) getDouble(size_t, double)
getDate(size_t) getDate(size_t, Date)
getTime(size_t) getTime(size_t, Time)
getDateTime(size_t) getDateTime(size_t, DateTime)
getDuration(size_t) getDuration(size_t, Duration)
getString(size_t) getString(size_t, String)
getLString(size_t) getLString(size_t, LString)
getUDT(size_t) getUDT(size_t, UDT)
getURI(size_t) getURI(size_t, URI)
getTag(size_t) getTag(size_t, String)
getType(size_t) N/A
getBlob(size_t) getBlob(size_t, Blob)

For example, the following accumulate() definition for arithmetic mean fetches the input long variable using getLong() and adds it into the summation. It also increments the count on each received input:

void accumulate(const Row& r) {
  m_sum += r.getLong(0);
  m_cnt += 1;
}

The Save() Method

The save() method (void save(Row& r) { }) saves each of the internal aggregation states into one of the UDX data types. In the function, r is the Row in which the accumulated result is saved so that it can be restored in the merge() function. Include the appropriate get or set routines to save the states from the accumulate() function. The following table lists the get and set routines available.

Save Internal State Restore Internal State Restore Internal State with Default Value
setBoolean(size_t, bool) getBoolean(size_t) getBoolean(size_t, bool)
setShort(size_t, short) getShort(size_t) getShort(size_t, short)
setInt(size_t, int) getInt(size_t) getInt(size_t, int)
setLong(size_t, long) getLong(size_t) getLong(size_t, long)
setFloat(size_t, float) getFloat(size_t) getFloat(size_t, float)
setDouble(size_t, double) getDouble(size_t) getDouble(size_t, double)
setDate(size_t, Date) getDate(size_t) getDate(size_t, Date)
setTime(size_t, Time) getTime(size_t) getTime(size_t, Time)
setDateTime(size_t, DateTime) getDateTime(size_t) getDateTime(size_t, DateTime)
setDuration(size_t, Duration) getDuration(size_t) getDuration(size_t, Duration)
setString(size_t, String) getString(size_t) getString(size_t, String)
setLString(size_t, LString) getLString(size_t) getLString(size_t, LString)
setUDT(size_t, UDT) getUDT(size_t) getUDT(size_t, UDT)
setURI(size_t, URI) getURI(size_t) getURI(size_t, URI)
setBlob(size_t, Blob) getBlob(size_t) getBlob(size_t, Blob)

For example, the following save() definition for the arithmetic mean accumulate() example above uses set routines to save the summation as a Long value into first cell of the row and total count as a Long value in the second cell of the row:

void save(Row& r) {
  r.setLong(0,m_sum)
  .setLong(1,m_cnt);
}

If the data structure of the internal aggregation state to save is more complex (such as a map or vector) than the fixed data types in the routines above, serialize the type in the save() step. It can then be de-serialized in the merge() step to get the original data. See the discEntropy.cpp example below for a sample UDA that employs the map data structure.

The Merge() Method

The merge() method (void merge(const Row& r) { }) merges all of the internal aggregation states from all of the data slices with the leader slice. In the function, r is the row with the previously computed result, which can be retrieved in the same sequence they are saved in

For example, in the following merge() definition for the arithmetic mean example above, the leader gets the summation of other slices using the received input of the first cell of the row and total count as the second cell of the row:

void merge(const Row& r) {
  m_sum += r.getLong(0);
  m_cnt += r.getLong(1);
}

The Result() Method

The result() method (void result(Row& r) { }) returns the final computation of the aggregation states. In the function, "r" is the row where the computed result is returned by setting the values for the cells with the appropriate set routines:

  • undefined(size_t)
  • setBoolean(size_t, bool)
  • setByte(size_t, byte)
  • setShort(size_t, short)
  • setInt(size_t, int)
  • setLong(size_t, long)
  • setFloat(size_t, float)
  • setDouble(size_t, double)
  • setDate(size_t, Date)
  • setTime(size_t, Time)
  • setDateTime(size_t, DateTime)
  • setDuration(size_t, Duration)
  • setString(size_t, String)
  • setLString(size_t, LString)
  • setUDT(size_t, UDT)
  • setURI(size_t, URI)
  • setBlob(size_t, Blob)
  • clear()

For example:

void result(Row& r) {
  if (m_cnt)
  r.setDouble(0,double(m_sum)/m_cnt);
}

Error Handling

In case of an error, an extension can signal an exception by including the azg_throw macro, which is similar to the printf() function in C. Create and specify as a string the full URI for the aggregate, and include the message to display when an error occurs.

azg_throw("aggregate_URI", "error_message");

Where aggregate_URI is a prefix that you define, followed by the aggregate name. The URI must be globally unique. Cambridge Semantics recommends that you use a format such as http://mycompany.com/grouping/etc#aggregate_name. For example:

azg_throw("http://cambridgesemantics.com/udx/aggregate#mean", "Error message – code %d", m_code);

Instantiating Extensions

Instantiate the extension by specifying the following extern "C" factory functions:

extern "C" ExtensionFactory* udx_aggregateName() { return new FactoryFor<aggregateName>(); }

Where aggregateName is the short name of the function, not the full URI. For example:

extern "C" ExtensionFactory* udx_mean() { return new FactoryFor<mean>(); }

Once the aggregate is defined, see Registering a UDX in an Extension Library and Compiling UDX Source Files.

Additional Aggregate Examples

mean.cpp

The following example file, mean.cpp, defines an aggregate extension named average.

Open mean.cpp in a separate window

#include "udx_api.hpp"  
using namespace std; 
using namespace udx2;

struct average : Aggregate {
  long m_sum = 0;
  long m_cnt = 0;

void accumulate(const Row& r) {
  m_sum += r.getLong(0);
  m_cnt += 1;
}

void save(Row& r) {
  r.setLong(0,m_sum)
  .setLong(1,m_cnt);
}

void merge(const Row& r) {
  m_sum += r.getLong(0);
  m_cnt += r.getLong(1);
}

void result(Row& r) {
  if (m_cnt)
  r.setDouble(0,double(m_sum)/m_cnt);
  }
};
extern "C" ExtensionFactory* udx_average() { return new FactoryFor<average>(); }

discEntropy.cpp

The following example UDA definition, discEntropy.cpp, computes discrete entropy and uses map as an internal aggregation state.

Open discEntropy.cpp in a separate window

#include <cmath>
#include "udx_api.hpp"
#include <sstream>
#include <iostream>
#include <map>

using namespace udx2;

struct DiscEntropy : Aggregate {
  long m_cnt = 0; // Total events
  std::map<std::string, long> m_map;

void accumulate(const Row& r) {
  // NOTE: Supply input as string even for numbers using SPARQL expression
  if(!r.getString(0).empty()) {
    m_map[std::string(r.getString(0).begin(), r.getString(0).end())]++;
    m_cnt += 1;
  }
}

void save(Row& r) {
  if(m_cnt != 0) {
    std::ostringstream out;
    for(const auto& [key, val] : m_map)
    out << key.size() << '~' << key << val << '~';
    r.setLong(0, m_cnt)
    .setString(1, out.str());
  }
}

void merge(const Row& r) {
  if(r.defined(0)) {
    std::istringstream in(r.getString(1).data());
    std::string key;
    long val;
    int len;
    char delimit;
    while(in.good()) {
      key.clear();
      val = len = 0;
      in >> len >> delimit;
      if(in && len) {
        std::vector<char> tmp(len);
        in.read(tmp.data(), len);
        key.assign(tmp.data(), len);
        in >> val >> delimit;
        m_map[key] += val;
      }
    }
    m_cnt += r.getLong(0);
  }
}

void result(Row& r) {
  if(m_cnt == 0)
  azg_throw("http://example/aggregates#discentropy",": insufficient data");
  // Formula: Entropy = -Sum[p(x) * log(p(x))] for all classes of x events.
  double entropy = 0;
  double prob = 0;
  for(const auto& elem : m_map) {
    prob = double(elem.second) / m_cnt;
    entropy += prob * std::log2(prob);
    }
    r.setDouble(0, (entropy >= 0.0) ? entropy : -1 * entropy);
  }
};

extern "C" ExtensionFactory* udx_csi_statistics_discentropy() { return new FactoryFor<DiscEntropy>(); }