Skip to main content

FTP Sync implementation

S3 bucket structure

At root level, there'll be a directory for each provider we want to connect with. This directory contains two subdirectories:

  • IN used to store all files that come from the external source
  • OUT used to store all files that were generated in Arc and need to by copied to the external destination

The decission to have two subdirectories, one for incoming files and one for outgoing, is based on the idea of simplifying the organization and making it easier for the synchronization process to understand what needs to be copied and where.

[root]
└── ProviderName2
├── IN
│ ├── Rosters
│ │ ├── roster1.csv
│ │ └── roster2.csv
│ └── Reports
│ ├── report1.csv
│ └── report2.csv
└── OUT
├── Claims
│ ├── claims1.csv
│ └── claims2.csv
└── Reports
├── report1.csv
└── report2.csv

Syncronization issues

  1. How do we know which files we have to sync?
  2. What if the provider uploads a new version of a file that was already synced? (same name)
  3. What if the provider deletes a file we have already synced?

Using the DB

This solution uses a new table to keep track of which files were synced from the provider to S3. The only real way of knowing whether we already synced a file is by checking its contents and comparing it to the contents of the files we have in S3. Because this solution does not scale well, we have to use the file's metadata to make the comparisons:

  • path (full path of the file in the FTP server)
  • size (size of the file in bytes)
  • mtime (last modified time)

The SyncedFile model stores all this information for every file we copy from an FTP server into S3.

How do we know which files we have to sync?

We list all the available files in the FTP and query the DB to see which ones we have already synced. We only copy the files that are not in the DB (meaning, there's no entry with same path, size and mtime).

What happens if the provider uploads a new version of a file that was already synced? (same name)

This is already solved by the previous answer. If a new version of the same file was uploaded, the size and mtime won't match. The file will be copied again. To avoid collisions, when a file is copied to S3 the name is changed to include the timestamp in the filename.

What happens if the provider deletes a file we have synced?

This is a no-op for us. If a file was deleted on the FTP server, we won't do nothing. The file will remain in S3.

Using only S3

This solution uses only S3 to keep track of which files were synced from the provider to S3. It leverages the S3 Object's metadata attribute to keep track of the mtime of the file in the FTP server.

Unfortunately, S3 API only allows to list files filtering by the key prefix, so we have to list all the files in the bucket. The metadata field is not included in the List response, so we have to make an additional GET request for each file to get its metadata.

How do we know which files we have to sync?

We list all the available files in the FTP. We also list all the files in S3 (filtering by prefix). For each file in the FTP, we check if there's a file in S3 with a similar key (1), and the same size. If we found a candidate, we also need to check the mtime, but to do this we have to make an additional request to AWS API because Objects' metadata are not included in the List response.

(1): To avoid name collisions, when a file is copied to S3 the name is changed to include the timestamp in the filename. We cannot make an exact comparison of the key because of this.

What happens if the provider uploads a new version of a file that was already synced? (same name)

Solved by previous answer. Files with same name are renamed to include the timestamp so no collisions happen.

What happens if the provider deletes a file we have synced?

This is a no-op for us.

Tracking synced files

To keep track of which files were synced (in or out), we use a model called SyncedFile. This model has a remote_path attribute that stores the full path of the file in the remote FTP server, a state indicating if the file was synced or not, and a s3_file_id referencing an instance of S3File model.

# s3_file_id    UUID
# state String [in_progress, synced, failed]
# remote_server String [ProviderName]
# remote_path String
# remote_mtime DateTime
# size Integer
# synced_at DateTime
# direction String [in, out]
class SyncedFile < ApplicationRecord
belongs_to :s3_file

# This is just a high level reference, not a real implementation
def self.create_out(remote_server, remote_path, s3_path, file)
Aws::S3.upload_file("#{s3_path}/#{file.path}", file)

id = create!(
remote_server: remote_server,
remote_path: remote_path,
s3_file_id: S3File.create(file),
direction: 'out',
).id

FilesSyncOutJob.perform_later(id)
end
end

Configuration of external sources/destinations

The solution should be configuration-based, so that adding a new source is as easy and straightforward as possible.

Propossed configuration structure:

ProviderName2:
authCredentials:
client: ftp # the client determines the rest of the fields
user: XXXX
password: XXXX
endpoint: xxxxx.xxx/xxxxx/xxxx
syncIn:
- id: root # identify this sync process `ProviderName2.root`
source: /To PairTeam/
dest: /IN/
afterSync:
- NotifySlackJob
- ProcessRosterJob

afterSync is a list of jobs that will be executed after the sync process finishes. This is useful, for example, to trigger the processing of the files that were just synced, or to notify the Care Team that new files are available.

The jobs will be enqueued in the order they are defined in the configuration file, and will receive the list of files that were synced as arguments. If no files were synced, the job will not be enqueued.

E.g. if ProviderName2 rosters were synced, the following code will be executed:

NotifySlackJob.perform_later(['SyncedFileId1', 'SyncedFileId2'])

Note: The configuration snippet is a draft and is subject to change.

Schedule sync

# Schedulable job for syncing files
class FilesSyncInJob < ApplicationJob
def perform(*ids); end
end

# Invoke providing IDs of the sync we want to run
FilesSyncInJob.perform_now('ProviderName2.rosters')

Schedule example:

SyncProviderName2Reports:
cron: 0 3 * * * # once a day at 3am
class: FilesSyncInJob
description: Syncs reports for ProviderName2
args:
- ProviderName2.reports

SyncProviderName2Rosters:
cron: 0 0 1 * * # once a month
class: FilesSyncInJob
description: Syncs rosters for ProviderName2
args:
- ProviderName2.rosters