Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination connector implementation. #1

Open
wants to merge 23 commits into
base: feat/source
Choose a base branch
from

Conversation

parikshitg
Copy link
Collaborator

@parikshitg parikshitg commented Dec 12, 2024

Description

Destination implementation for SFTP connector.

Quick checks:

  • There is no other pull request for the same update/change.
  • I have written unit tests.
  • I have made sure that the PR is of reasonable size and can be easily reviewed.

@parikshitg parikshitg self-assigned this Dec 12, 2024
@parikshitg parikshitg marked this pull request as ready for review December 18, 2024 06:16
@parikshitg parikshitg linked an issue Dec 18, 2024 that may be closed by this pull request
2 tasks

filename, ok := record.Metadata["filename"]
if !ok {
filename = string(record.Key.Bytes())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the key contains some characters that would make an invalid file name? E.g. the key could be a structured ket.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This(filename missing from metadata) should never happen, record should have the filename metadata, but just to handle this we can use hash for filename?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, the filename should not be missing, if the record came from an SFTP source. But Conduit should be able to combine different sources, and so, for example, it should be possible to sync files from S3 to an SFTP destination.

The key could actually be the file name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so do you mean if record.Key is of type opencdc.StructuredData which is basically a map, then there should be a key "filename" which could be used here?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that the key can be a string (i.e. opencdc.RawData), similar to what we have in the S3 source.

destination/destination.go Outdated Show resolved Hide resolved
destination/destination.go Outdated Show resolved Hide resolved
filename = string(record.Key.Bytes())
}

err = d.sftpClient.Rename(path, fmt.Sprintf("%s/%s", d.config.DirectoryPath, filename))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this won't keep the original directory structure?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the original one like in source, but it will use the directoryPath in the config.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, if we're syncing directories (and we do, because the SFTP source has a dir parameter, and the SFTP destination has a dir parameter too), then it would be expected that the dir. structure is kept.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay,
We have "directoryPath" in both source and destination so that we can read from one dir in source and write to another dir in destination. The connector just syncs the files inside the source directory into destination directory.

So dir structure is kept as per destination dir parameter.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, then I misunderstood something about the source. I thought that the source directory is traversed recursively, but that doesn't appear to be the case, right? E.g. if I have:

/source/dir1/file1
/source/file2

then only file2 will be read?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. In that case, the destination code makes sense, but we should call this out in the README, that the connector will sync only those files that are in the source dir itself.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

destination/destination.go Outdated Show resolved Hide resolved
destination/destination_integration_test.go Show resolved Hide resolved
})
}

func TestDestination_Write(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be really good if we had a test for chunking too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@parikshitg parikshitg changed the base branch from main to feat/source January 8, 2025 10:47
README.md Outdated
Comment on lines 35 to 36
Destination connects to a remote server. It takes an `opencdc.Record`, extracts filename from the metadata and upload the file to the remote server. The connector supports both password and private key authentication methods.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chunking is a nice feature and it would be good to mention in the readme that we support it and in which way (what metadata is needed, and so on), so that other sources (not only SFTP) can use it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned.

@hariso
Copy link

hariso commented Jan 10, 2025

LGTM, thanks @parikshitg!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Connector: SFTP [Source/Destination]
2 participants