Sources

Last Updated: 2021-01-05

Grouparoo imports data into customer records using sources. This type of Plugin defines an App and one more Connections with the import direction. The same Plugin can also make destinations, but this document focuses on how to make a source.

A source will define how Grouparoo should import properties into records. An example Property might be firstName and the postgres Plugin knows how to fill that out for a user by querying the database.

Sources also can define schedules. These know how to connect with the data to know what has changed. A change will trigger records and their properties being updated. Schedules are often recurring.

App

An App represents the ability to communicate with a source. For example, this could mean server address and credentials for a data warehouse.

The properties (PluginApp) of an App are:

  • name: The user-facing name of the App. For example, snowflake or postgres.

  • options: Options needed to configure and connect to your App. This might commonly be an apiKey or username and password. Options have key (string), displayName (string), description (string), placeholder (string), and required (boolean) attributes.

  • icon: The path to the App's icon file (SVG or PNG). Icons should be stored in the /public directory of your Plugin.

  • App Methods

    These methods are used to configure and interact with the App.

    • test TestPluginMethod This method will be called when an App is created to ensure that the App can be reached. The appOptions and other data will be passed to it. The test method should whether it can connect with those options or not, along with a message. This will often use the connect result to verify success.
    • connect ConnectPluginAppMethod If defined, this method is called to get an instance of a client to the source. It uses the appOptions passed to it to return an object that knows how to make queries. It will cache the client and give it to Connection methods. If it is not defined, the Connection (import) methods will be expected to connect each time. Using connect results in higher efficiency sources. For example, the postgres Plugin uses this to keep a socket open with the database server.
    • disconnect DisconnectPluginAppMethod When defining connect, this method is used to disconnect the client. For example, to close the Postgres socket when the server is shutting down.

Connections

Your Plugin can provide multiple types of Connections for use within Grouparoo. Connections use Apps to import or export data. In the source case, this is about the import direction.

The properties (PluginConnection) of a source Connection are:

  • name: The user-facing name of the connection, like snowflake-query-import or postgres-table-import.

  • direction: Use import for sources.

  • description: The user-facing description of the connection.

  • app: The name of the App created before. For example, your snowflake-query-import connection might require the snowflake App, also defined by your Plugin.

  • options: Options needed to configure this connection. This differs from the options of an App, and is likely dependent on the App options. For example, an postgres-table-import connection requires a table option to be set by the Grouparoo user, and will use the name, host, password and database options from the postgres App that it is used to get the list of table names.

  • propertyOptions: Options needed to configure each Property. Each has a these fields: key, displayName, required, description, type. You can also define an options method to define the option dynamically. For example, the postgres-table-import connection needs a column name to import the data for the Property.

  • scheduleOptions: Options needed to configure a schedule. Each has a these fields: key, required, description, type. You can also define an options method to define the option dynamically. For example, the postgres-table-import connection needs a column name to known which column is the "updated_at" timestamp.

  • Source Methods

    These methods are used in the creation of a source with properties.

    • sourceOptions SourceOptionsMethod This allows more dynamic options to be specified. For example, it could also use code to provide a set of choices for a list selection. For example, the postgres-table-import connection lists out table names.
    • sourcePreview SourcePreviewMethod Show the user a preview of the data. Given the current options, if possible return a preview. For example, the postgres-table-import returns 10 rows from the selected table.
    • sourceFilters SourceFilterMethod During Property creation, this allows the Plugin to determine what filters are available. For example, the postgres-table-import notes that integers columns have actions such as greater than and string columns have different filters such as like.
    • uniquePropertyBootstrapOptions UniquePropertyBootstrapOptions: There is an interesting case during creation of the first source. It's possible that the first source (Postgres User Table) might need one of it's own properties (user_id) to bootstrap the rest. This allows a source to define the options that such a Property would use.
  • Property Methods

    One of these should be defined. The choice is whether or not the Plugin can import many users at once for a Property or it is done one at a time.

    • recordProperty RecordPropertyPluginMethod Given a Record and the Property definition, get the value of the Property for that Record.
    • recordProperties RecordPropertiesPluginMethod Given an array of records and the Property definition, get the value of the Property for each Record.
  • Schedule Methods

    These methods are used when the source can have a Schedule.

    • records RecordsPluginMethod This method is the core method to implement a Schedule. It uses a high-water mark and given options to return a collection of records to import. This options might also return properties to set on the records The main goal of this method is to call await plugin.createImport(schedule, run, row) for each row of data, where row is an object like {user_id: 9, email: "person@example.com", first_name: "person"}
    • sourceRunPercentComplete SourceRunPercentCompleteMethod If implemented, this method can help the user by getting a percentage that an import session is complete. SQL sources tend divide how many imports have been done by the number of total records since the last change.

When defining methods, link to the method directly, do not use a string name.

SQL Sources

Most SQL-based sources implement two connections: table and query.

The Table Source is optimized for the common case where the data to import is a table with a foreign key to a unique Property. An example might be a "purchases" table with a "user_id" column. A simple "users" table with an "id" column also works. In these cases, this source allows the user to sum, count, and perform other operations without writing any SQL. This also makes it each to make schedules and optimize the import speed.

The Query Source allows someone to write any SQL. It uses a mustache templating system to be able to use another other Property. For example: SELECT SUM(price) FROM purchases INNER JOIN users ON ... WHERE users.id = {userId}. This flexibility makes it less performant on large datasets because it fetches Property values one Record at a time.

Both of these SQL sources tend to recalculate their properties each time a Record change is detected and the Record is being recalculated. This keeps things nice and update-to-date.

Schedule Sources

Another class of source runs more off data pulled during the Schedule phase than fetching properties. For example, the Google Sheets source will connect to a spreadsheet and note the cell values. Those get written directly into Record Property values for each Property ("column" in this case) defined.

This differs from the SQL sources because the spreadsheet is not contacted every time a Record is being recalculated.

Iteration and Testing

Read more about Plugin development to understand the best way to work on your new Plugin.

Example

Here is an example source plugin.ts file.

import { Initializer } from "actionhero";
import { plugin } from "@grouparoo/core";

import { test } from "./../lib/test";
import { connect } from "./../lib/connect";
import { disconnect } from "./../lib/disconnect";
import { exportRecord } from "../lib/export/exportRecord";
import { exportArrayProperties } from "../lib/export/exportArrayProperties";

import { getConnection as getTableConnection } from "../lib/table-import/connection";
import { getConnection as getQueryConnection } from "../lib/query-import/connection";

import { destinationOptions } from "../lib/export/destinationOptions";
import { destinationMappingOptions } from "../lib/export/destinationMappingOptions";

const packageJSON = require("./../../package.json");

export class Plugins extends Initializer {
  constructor() {
    super();
    this.name = packageJSON.name;
  }

  async initialize() {
    plugin.registerPlugin({
      name: packageJSON.name,
      icon: "/public/@grouparoo/postgres/postgres.png",
      apps: [
        {
          name: "postgres",
          options: [
            {
              key: "host",
              displayName: "Host",
              required: false,
              description: "The Postgres host.",
              placeholder: "localhost",
            },
            {
              key: "port",
              displayName: "Port",
              required: false,
              description: "The Postgres port.",
              placeholder: "5432",
            },
            {
              key: "database",
              displayName: "Database",
              required: true,
              description: "The Postgres database.",
            },
            {
              key: "schema",
              displayName: "Schema",
              required: false,
              description: "The Postgres schema (default: public).",
              placeholder: "public",
            },
            {
              key: "user",
              displayName: "User",
              required: false,
              description: "The Postgres user.",
            },
            {
              key: "password",
              displayName: "Password",
              required: false,
              description: "The Postgres user's password.",
            },
            {
              key: "ssl",
              displayName: "SSL",
              required: false,
              description:
                "Require the use of a SSL connection (default: false).  If you need custom SSL certificates paste in their values below.",
            },
            {
              key: "ssl_cert",
              displayName: "SSL Certificate",
              required: false,
              description: "The ssl certificate.",
            },
            {
              key: "ssl_key",
              displayName: "SSL Key",
              required: false,
              description: "The ssl certificate's key.",
            },
            {
              key: "ssl_ca",
              displayName: "SSL Certificate Authority",
              required: false,
              description: "The ssl certificate authority (CA).",
            },
          ],
          methods: { test, connect, disconnect },
        },
      ],
      connections: [getTableConnection(), getQueryConnection()],
    });
  }

  async start() {
    plugin.mountModels();
  }
}

Many SQL plugins use a helper from @grouparoo/app-templates to build the query and table sources. It helps standardize how things work across all the SQL types.

// table-import/connection
import { buildConnection } from "@grouparoo/app-templates/dist/source/table";
import { getSampleRows } from "./getSampleRows";
import { getColumns } from "./getColumns";
import { getTables } from "./getTables";
import { getChangedRows } from "./getChangedRows";
import { getPropertyValue } from "./getPropertyValue";
import { getPropertyValues } from "./getPropertyValues";
import { getChangedRowCount } from "./getChangedRowCount";

export function getConnection() {
  return buildConnection({
    app: "postgres",
    name: "postgres-table-import",
    description: "Import or update Records from a Postgres database table.",
    tableOptionDescription: "The table to scan",
    getSampleRows,
    getColumns,
    getTables,
    getChangedRows,
    getPropertyValue,
    getPropertyValues,
    getChangedRowCount,
  });
}
// query-import/connection
import { buildConnection } from "@grouparoo/app-templates/dist/source/query";
import { executeQuery } from "./executeQuery";
import { getChangedRows } from "./getChangedRows";

export function getConnection() {
  return buildConnection({
    app: "postgres",
    name: "postgres-query-import",
    description: "Import or update Records via a custom Postgres query.",
    executeQuery,
    getChangedRows,
  });
}