Snowflake Connector for PostgreSQL
Overviewβ
The Snowflake Connector for PostgreSQL allows replication of data from PostgreSQL into Snowflake. This document provides the necessary SQL commands and best practices for managing data sources, tables, schedules, monitoring replication, troubleshooting, and performing grants.
π¨ 1. Subscribing Tables for Replication (PostgreSQL Side)β
Before a table can be synced using the Snowflake Connector, it must first be subscribed in the source PostgreSQL database (ARC Postgres).
π Requesting Table Subscriptionβ
To subscribe a table from the ARC PostgreSQL database:
- Reach out to the Engineering Team
π§ Primary Contact: Martin "Tincho" Montenegro
βοΈ Email: tincho@pairteam.com
The engineering team should run the following command on the PostgreSQL side:
ALTER PUBLICATION snowflake_main ADD TABLE <your_table_name>;
Example:
ALTER PUBLICATION snowflake_main ADD TABLE
ββphone_communication_ai_summary_notes,
ββchart_events,
ββmessages,
ββecm_communications;
Once the table has been added to the publication, you can then add it in Snowflake via the Connector:
CALL PUBLIC.ADD_TABLES('ARCPOSTGRESPROD', 'public', ARRAY_CONSTRUCT('<your_table_name>'));
π 2. Configuring Replicationβ
β Add a Data Sourceβ
CALL PUBLIC.ADD_DATA_SOURCE('<data_source_name>', '<dest_db>');
-- Example:
CALL PUBLIC.ADD_DATA_SOURCE('ARCPOSTGRESPROD', 'SOURCE_ARC');
π Note: To create a new data source and configure a new agent on a new database,
follow the instructions on the Snowflake Connector Tutorial.
β Add Tables for Replicationβ
CALL PUBLIC.ADD_TABLES('<data_source_name>', '<schema_name>', ARRAY_CONSTRUCT('<table1>', '<table2>', ...));
-- Example:
CALL PUBLIC.ADD_TABLES('ARCPOSTGRESPROD', 'public', ARRAY_CONSTRUCT('messages'));
-- Example: Adding multiple tables at once
CALL PUBLIC.ADD_TABLES('ARCPOSTGRESPROD', 'public',
ARRAY_CONSTRUCT('channel_participants', 'chart_clinics', 'message_channels', 'memberships', 'diagnoses'
));
β Remove Tables from Replicationβ
CALL REMOVE_TABLE('<data_source_name>', '<schema>', '<table>');
-- Example:
CALL PUBLIC.REMOVE_TABLE('ARCPOSTGRESPROD', 'public', 'ecm_communications' )
-- Multiple tables:
CALL PUBLIC.REMOVE_TABLE('ARCPOSTGRESPROD', 'public',
ARRAY_CONSTRUCT('ecm_communications', 'message_channels'));
π Note: If a table is permanently removed from the sync process but still exists in the ARC PostgreSQL source database,
you must also remove it from the publication on the Postgres side to prevent replication slot bloat.
-- Example:
ALTER PUBLICATION snowflake_main DROP TABLE
finance_claims, finance_claim_versions;
β±οΈ 3. Schedule Replicationβ
The connector can replicate data in two modes: continuous or scheduled. The default is continuous mode. In the current setup, the connector runs hourly.
Enable Scheduleβ
CALL PUBLIC.ENABLE_SCHEDULED_REPLICATION('<data_source>', '<interval/cron>');
-- Examples:
CALL PUBLIC.ENABLE_SCHEDULED_REPLICATION('ARCPOSTGRESPROD', '60 MINUTE');
CALL PUBLIC.ENABLE_SCHEDULED_REPLICATION('ARCPOSTGRESPROD', '30 MINUTE');
CALL PUBLIC.ENABLE_SCHEDULED_REPLICATION('ARCPOSTGRESPROD', 'USING CRON 0 9 * * 4 UTC'); -- Thursdays 9 AM UTC
Disable Scheduleβ
CALL DISABLE_SCHEDULED_REPLICATION('ARCPOSTGRESPROD');
π Note: The minimum allowed frequency is 15 minutes. However, if the schedule is disabled, the connector runs in continuous mode.
π 4. Monitoring & Logsβ
Connector Metadataβ
SELECT * FROM CONNECTOR_CONFIGURATION;
SELECT * FROM DATA_SOURCES;
SELECT * FROM DATA_SOURCE_REPLICATION_STATE;
SELECT * FROM REPLICATION_STATE;
Logs & Telemetryβ
-- Connector logs
SELECT * FROM snowflake.telemetry.events
WHERE RECORD_TYPE = 'LOG'
AND RESOURCE_ATTRIBUTES:"snow.database.name" = 'SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL';
-- Only Errors/Warnings
SELECT * FROM snowflake.telemetry.events
WHERE RECORD_TYPE = 'LOG'
AND RECORD:"severity_text" IN ('ERROR', 'WARN');
-- Agent logs
SELECT * FROM AGENT_LOGS;
SELECT * FROM AGENT_LOGS WHERE LEVEL IN ('ERROR', 'WARN');
π οΈ 5. Troubleshooting / Maintenanceβ
Failed Replicationβ
SELECT DATA_SOURCE_NAME, SCHEMA_NAME, TABLE_NAME
FROM REPLICATION_STATE
WHERE INCREMENTAL_REPLICATION_STATUS = 'FAILED'
OR SNAPSHOT_REPLICATION_STATUS = 'FAILED'
OR SCHEMA_INTROSPECTION_STATUS IS NULL;
Connector Repairβ
CALL PUBLIC.REPAIR_CONNECTOR();