Skip to main content

Snowflake Connector for PostgreSQL

Overview​

The Snowflake Connector for PostgreSQL allows replication of data from PostgreSQL into Snowflake. This document provides the necessary SQL commands and best practices for managing data sources, tables, schedules, monitoring replication, troubleshooting, and performing grants.


πŸ“¨ 1. Subscribing Tables for Replication (PostgreSQL Side)​

Before a table can be synced using the Snowflake Connector, it must first be subscribed in the source PostgreSQL database (ARC Postgres).

πŸ”” Requesting Table Subscription​

To subscribe a table from the ARC PostgreSQL database:

  • Reach out to the Engineering Team
    πŸ“§ Primary Contact: Martin "Tincho" Montenegro
    βœ‰οΈ Email: tincho@pairteam.com

The engineering team should run the following command on the PostgreSQL side:

ALTER PUBLICATION snowflake_main ADD TABLE <your_table_name>;

Example:

ALTER PUBLICATION snowflake_main ADD TABLE
  phone_communication_ai_summary_notes,
  chart_events,
  messages,
  ecm_communications;

Once the table has been added to the publication, you can then add it in Snowflake via the Connector:

CALL PUBLIC.ADD_TABLES('ARCPOSTGRESPROD', 'public', ARRAY_CONSTRUCT('<your_table_name>'));

πŸ” 2. Configuring Replication​

βœ… Add a Data Source​

CALL PUBLIC.ADD_DATA_SOURCE('<data_source_name>', '<dest_db>');
-- Example:
CALL PUBLIC.ADD_DATA_SOURCE('ARCPOSTGRESPROD', 'SOURCE_ARC');

πŸ“ Note: To create a new data source and configure a new agent on a new database,
follow the instructions on the Snowflake Connector Tutorial.

βž• Add Tables for Replication​

CALL PUBLIC.ADD_TABLES('<data_source_name>', '<schema_name>', ARRAY_CONSTRUCT('<table1>', '<table2>', ...));
-- Example:
CALL PUBLIC.ADD_TABLES('ARCPOSTGRESPROD', 'public', ARRAY_CONSTRUCT('messages'));

-- Example: Adding multiple tables at once
CALL PUBLIC.ADD_TABLES('ARCPOSTGRESPROD', 'public',
ARRAY_CONSTRUCT('channel_participants', 'chart_clinics', 'message_channels', 'memberships', 'diagnoses'
));

βž– Remove Tables from Replication​

CALL REMOVE_TABLE('<data_source_name>', '<schema>', '<table>');
-- Example:
CALL PUBLIC.REMOVE_TABLE('ARCPOSTGRESPROD', 'public', 'ecm_communications' )

-- Multiple tables:
CALL PUBLIC.REMOVE_TABLE('ARCPOSTGRESPROD', 'public',
ARRAY_CONSTRUCT('ecm_communications', 'message_channels'));

πŸ“ Note: If a table is permanently removed from the sync process but still exists in the ARC PostgreSQL source database,
you must also remove it from the publication on the Postgres side to prevent replication slot bloat.

-- Example:
ALTER PUBLICATION snowflake_main DROP TABLE
finance_claims, finance_claim_versions;

⏱️ 3. Schedule Replication​

The connector can replicate data in two modes: continuous or scheduled. The default is continuous mode. In the current setup, the connector runs hourly.

Enable Schedule​

CALL PUBLIC.ENABLE_SCHEDULED_REPLICATION('<data_source>', '<interval/cron>');
-- Examples:
CALL PUBLIC.ENABLE_SCHEDULED_REPLICATION('ARCPOSTGRESPROD', '60 MINUTE');
CALL PUBLIC.ENABLE_SCHEDULED_REPLICATION('ARCPOSTGRESPROD', '30 MINUTE');
CALL PUBLIC.ENABLE_SCHEDULED_REPLICATION('ARCPOSTGRESPROD', 'USING CRON 0 9 * * 4 UTC'); -- Thursdays 9 AM UTC

Disable Schedule​

CALL DISABLE_SCHEDULED_REPLICATION('ARCPOSTGRESPROD');

πŸ“ Note: The minimum allowed frequency is 15 minutes. However, if the schedule is disabled, the connector runs in continuous mode.

πŸ“ˆ 4. Monitoring & Logs​

Connector Metadata​

SELECT * FROM CONNECTOR_CONFIGURATION;
SELECT * FROM DATA_SOURCES;
SELECT * FROM DATA_SOURCE_REPLICATION_STATE;
SELECT * FROM REPLICATION_STATE;

Logs & Telemetry​

-- Connector logs
SELECT * FROM snowflake.telemetry.events
WHERE RECORD_TYPE = 'LOG'
AND RESOURCE_ATTRIBUTES:"snow.database.name" = 'SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL';

-- Only Errors/Warnings
SELECT * FROM snowflake.telemetry.events
WHERE RECORD_TYPE = 'LOG'
AND RECORD:"severity_text" IN ('ERROR', 'WARN');

-- Agent logs
SELECT * FROM AGENT_LOGS;
SELECT * FROM AGENT_LOGS WHERE LEVEL IN ('ERROR', 'WARN');

πŸ› οΈ 5. Troubleshooting / Maintenance​

Failed Replication​

SELECT DATA_SOURCE_NAME, SCHEMA_NAME, TABLE_NAME
FROM REPLICATION_STATE
WHERE INCREMENTAL_REPLICATION_STATUS = 'FAILED'
OR SNAPSHOT_REPLICATION_STATUS = 'FAILED'
OR SCHEMA_INTROSPECTION_STATUS IS NULL;

Connector Repair​

CALL PUBLIC.REPAIR_CONNECTOR();