Set up the Openflow Connector for SQL Server¶

Note

The connector is subject to the Connector Terms.

This topic describes the steps to set up the Openflow Connector for SQL Server.

Prerequisites¶

  1. Ensure that you have reviewed About Openflow Connector for SQL Server.

  2. Ensure that you have reviewed Supported SQL Server versions.

  3. Ensure that you have set up Openflow.

  4. As a database administrator, perform the following tasks:

    1. Enable change tracking on the database and tables. The connector requires that change tracking be enabled on the database and tables before replication starts. Make sure that every table that you plan to have replicated has enabled change tracking. You can also enable change tracking for more tables while the connector is running. See the following code snippet:

      ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
      
      ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
      
      Copy
    2. Create a user for the connector. The connector requires a user with the VIEW CHANGE TRACKING grant on replicated tables. Give that user a password to access the connector’s configuration.

      CREATE LOGIN <user_name> WITH PASSWORD = <password>;
      CREATE USER <user_name> FOR LOGIN <user_name>;
      GRANT SELECT ON <schema>.<table> TO <user_name>;
      GRANT VIEW CHANGE TRACKING ON <schema>.<table> TO <user_name>;
      
      Copy
    3. Connect via SSL. If you’re planning to use an SSL connection to SQL Server, prepare the root certificate for your database server. It is required during configuration.

  5. As a Snowflake account administrator, perform the following tasks:

    1. Create a Snowflake user with the type as SERVICE. Create a database to store the replicated data, and set up privileges for the Snowflake user to create objects in that database by granting the USAGE and CREATE SCHEMA privileges.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
               WAREHOUSE_SIZE = 'MEDIUM'
               AUTO_SUSPEND = 300
               AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. Create a pair of secure keys (public and private). Store the private key for the user in a file to supply to the connector’s configuration. Assign the public key to the Snowflake service user:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      For more information, see Key-pair authentication and key-pair rotation.

    3. Designate a warehouse for the connector to use. Start with the MEDIUM warehouse size, then experiment with size depending on the amount of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than the warehouse size.

Set up the connector¶

As a data engineer, perform the following tasks to configure the connector:

Install the connector¶

  1. Navigate to the Openflow Overview page. In the Featured connectors section, select View more connectors.

  2. On the Openflow connectors page, find the connector and select Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list.

  4. Select Add.

    Note

    Before you install the connector, ensure that you have created a database and schema in Snowflake for the connector to store ingested data.

  5. Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.

  6. Authenticate to the runtime with your Snowflake account credentials.

The Openflow canvas appears with the connector process group added to it.

Configure the connector¶

You can configure the connector for the following use cases:

Replicate a set of tables in real-time¶

  1. Right-click on the imported process group and select Parameters.

  2. Populate the required parameter values as described in Flow parameters.

Flow parameters¶

Start with setting the parameters of the SQLServer Source Parameters context, then the SQLServer Destination Parameters context. After this is done, you can enable the connector. The connector should connect to both SQLServer and Snowflake and start running . However, the connector does not replicate any data until any tables to be replicated are explicitly added to its configuration.

To configure specific tables for replication, edit the SQLServer Ingestion Parameters context. After you apply the changes to the SQLServer Ingestion Parameters context, the configuration is picked up by the connector, and the replication lifecycle starts for every table.

SQLServer Source Parameters context¶

Parameter

Description

SQL Server Connection URL

The full JDBC URL to the source database.

Example:

  • jdbc:sqlserver://example.com:1433;encrypt=false;databaseName=<example_database>

SQL Server JDBC Driver

Select the Reference asset checkbox to upload the SQL Server JDBC driver.

SQL Server SSL Mode

Enable or disable SSL connections.

SQL Server Root SSL Certificate

The full contents of the root certificate for the database. Optional if SSL is disabled.

SQL Server Username

The username for the connector.

SQL Server Password

The password for the connector.

SQLServer Destination Parameters context¶

Parameter

Description

Destination Database

The database where data will be persisted. It must already exist in Snowflake

Snowflake Account Identifier

Snowflake account name formatted as [organization-name]-[account-name] where data will be persisted

Snowflake Authentication Strategy

Strategy of authentication to Snowflake. Possible values: SNOWFLAKE_SESSION_TOKEN, when you are running the flow on SPCS and KEY_PAIR when you want to set up access using a private key.

Snowflake Private Key

The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined.

Snowflake Private Key File

The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with -----BEGIN PRIVATE. Select the Reference asset checkbox to upload the private key file.

Snowflake Private Key Password

The password associated with the Snowflake Private Key File

Snowflake Role

Snowflake role used during query execution

Snowflake Username

Username used to connect to Snowflake instance

Snowflake Warehouse

Snowflake warehouse used to run queries

SQLServer Ingestion Parameters context¶

Parameter

Description

Included Table Names

A comma-separated list of table paths, including their databases and schemas. Example: database_public.public.my_table, other_database.other_schema.other_table

Included Table Regex

A regular expression to match against table paths. Every path matching the expression will be replicated, and new tables matching the pattern that get created later will also be included automatically. Example: database_public.public\.auto_.*

Filter JSON

A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication. Example: [ {"database":"database_public","schema":"public", "table":"table1", "includedPattern":".*name"} ] will include all columns that end with name in table1 from the database_public database and the public schema.

Merge Task Schedule CRON

CRON expression defining periods when merge operations from Journal to Destination Table will be triggered. Set it to * * * * * ? if you want to have continuous merge or time schedule to limit warehouse run time.

For example:

  • The string * 0 * * * ? indicates that you want to schedule merges at full hour for one minute

  • The string * 20 14 ? * MON-FRI indicates that you want to schedule merges at 2:20 PM every Monday through Friday.

For additional information and examples, see the cron triggers tutorial in the Quartz Documentation

Remove and re-add a table to replication¶

To remove a table from replication, ensure that it is removed from the Included Table Names or Included Table Regex parameters in the Replication Parameters context.

If you want to re-add the table to replication later, first delete the corresponding destination table in Snowflake. Afterward, add the table back to the Included Table Names or Included Table Regex parameters. This ensures that the replication process starts fresh for the table.

This approach can also be used to recover from a failed table replication scenario.

Replicate a subset of columns in a table¶

The connector can filter the data replicated per table to a subset of configured columns.

To apply filters to columns, modify the Column Filter property in the Replication Parameters context, adding an array of configurations, one entry for every table to which you want to apply a filter.

Columns can be included or excluded by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.

The following example shows the fields that are available. The schema and table fields are mandatory. One or more of included, excluded, includedPattern, excludedPattern is required.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Track data changes in tables¶

The connector replicates not only the current state of data from the source tables, but also the intermediate states of every row. However, due to the change tracking mechanism used, there is no guarantee that all intermediate states of every row are replicated.

This data is stored in journal tables created in the same schema as the destination table.

The journal table names are formatted as: <source table name>_JOURNAL_<schema generation>

Here, <schema generation> is an integer increasing with every schema change on the source table. This means a source table that undergoes schema changes will have multiple journal tables.

Important

Snowflake recommends that you do not alter the journal tables or the data in them, in any way. They are used by the connector to update the destination table as part of the replication process.

Configure scheduling of merge tasks¶

The connector uses a warehouse to merge change data capture (CDC) data into destination tables. This operation is triggered by the MergeSnowflakeJournalTable processor. If there are no new changes or if no new flow files are waiting in the MergeSnowflakeJournalTable queue, no merge is triggered and the warehouse auto-suspends.

To limit the warehouse cost and limit merges to only scheduled time, use the CRON expression in the Merge task Schedule CRON parameter. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.

Run the flow¶

  1. Right-click on the plane and select Enable all Controller Services.

  2. Right-click on the imported process group and select Start. The connector starts the data ingestion.