-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] Http Request Transformation in UDF #151
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, good job :) - left you some comments (nothing too major)
import socket | ||
import ssl | ||
|
||
_HttpRequestTransformation_chunk_size_column = "HttpRequestTransformation_chunk_size_column" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make these constants, meaning:
- you don't have to start with an underscore (no need to mark them private)
- make them all caps: "CHUNK_SIZE_COLUMN" for example
- you can remove the "HttpRequestTransformation" part from them (for brevity)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Deleted the statements anyway since we didn't use them.
default="application/json", | ||
description="The content type of the request.", | ||
) | ||
authorization_token: Union[str, None] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a SecretStr - we don't want tokens to be printed unnecessarily
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from koheesio.models import SecretStr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
default=None, | ||
description="The authorization token for the request.", | ||
) | ||
body_column: Union[str, None] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right type would be Optional[str]
rather than Union[str, None]
(same for the other Fields where you have it like this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check!
|
||
self.output.df = self.df | ||
|
||
@udf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's move the udf to the top of the file, makes it a bit more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay!
if ":" in rest: | ||
base_url, rest = rest.split(":", 1) | ||
if "/" in rest: | ||
port, path = rest.split("/", 1) | ||
else: | ||
port = rest | ||
port = int(port) | ||
else: | ||
base_url, path = rest.split("/", 1) | ||
port = 80 if protocol == "http" else 443 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we do this with regex instead? Seems a bit cleaner than this imho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for example:
import re
import pytest
url_parts = re.compile(
# protocol is a sequence of letters, digits, and characters followed by ://
r'^(?P<protocol>[^:]+)://'
# base_url is a sequence of characters until encountering a `/` or a `:` followed by 1 to 5 digits
r'(?P<base_url>\S+?(?=:\d{1,5}/|/|$))'
# port is an optional sequence of digits between 0 and 65535 preceded by a colon
r'(?::(?P<port>\d{1,5}))?'
# rest is the rest of the url
r'(?P<rest>\/.*)?$'
)
test_data = [
# url, expected_protocol, expected_base_url, expected_port, expected_rest
("https://example.com:443/some/random/url", "https", "example.com", "443", "/some/random/url"),
("http://something.a.bit.more.complicated:9999/foo/bar", "http", "something.a.bit.more.complicated", "9999", "/foo/bar"),
("https://no-port-example.ext/42/index.jsp?foo=bar&baz=bla", "https", "no-port-example.ext", None, "/42/index.jsp?foo=bar&baz=bla"),
("ftp://ftp.example.com/resource.txt", "ftp", "ftp.example.com", None, "/resource.txt"),
("http://localhost:8080/test", "http", "localhost", "8080", "/test"),
("https://sub.domain.example.com/path/to/resource?query=string&another=value", "https", "sub.domain.example.com", None, "/path/to/resource?query=string&another=value"),
("http://192.168.1.1:8080/admin", "http", "192.168.1.1", "8080", "/admin"),
("https://user:[email protected]:8443/path?query=param#fragment", "https", "user:[email protected]", "8443", "/path?query=param#fragment"),
("http://example.org", "http", "example.org", None, None),
("https://example.net/path/to/resource.html", "https", "example.net", None, "/path/to/resource.html"),
("http://example.com:80/path/to/resource?query=param", "http", "example.com", "80", "/path/to/resource?query=param"),
("custom_protocol://base_url:foo/rest", "custom_protocol", "base_url:foo", None, "/rest"),
("WEBDAV://example.com:8080/path/to/resource", "WEBDAV", "example.com", "8080", "/path/to/resource"),
]
@pytest.mark.parametrize("url, expected_protocol, expected_base_url, expected_port, expected_rest", test_data)
def test_url_parts(url, expected_protocol, expected_base_url, expected_port, expected_rest):
match = url_parts.match(url)
assert match is not None, f"Failed to match {url}"
assert match.group("protocol") == expected_protocol, f"Failed to match protocol: {match.groupdict()}"
assert match.group("port") == expected_port, f"Failed to match port: {match.groupdict()}"
assert match.group("base_url") == expected_base_url, f"Failed to match base_url: {match.groupdict()}"
assert match.group("rest") == expected_rest, f"Failed to match rest: : {match.groupdict()}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would argue the other way around but Regex it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be right.. (I am biased to regex, granted). Please run the above tests through your code and see if they pass as expected; if they do, I don't mind not using regex.
chunk = s.recv(chunk_size) | ||
if len(chunk) == 0: # No more data received, quitting | ||
break | ||
response = response + chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
response += chunk
def test_downloading_files(self, input_df: DataFrame, download_path: Path) -> None: | ||
"""Test that the files are downloaded and the DataFrame is transformed correctly.""" | ||
# Arrange | ||
expected_data = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure I saved the expected data in a file already in the data folder of the tests right? Either way, I would like to take this hardcoding out of the code and refer to a file instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure if it doesn't happen somewhere else, but the test_downloading_files is only checking the dataframe and if the files exist. not the contents of the downloaded file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna put the content in test files btw. ;)
This PR makes it possible to execute a Http request from a node in the cluster rather than collecting the data on the driver and executing the request there.
Description
In this PR a UDF was created that can handle http requests. The implementation is based on the socket and ssl library because the requests library is not able to be pickeled. The UDF can handle multiple scenarios GET, POST, PUT, DELETE, ... With authorization and a body to be send.
Proper documentation still needs to be written. Waiting for any remarks before doing so.
download file transformation
Motivation and Context
This is a suggestion to make the download file transformation proces better. Looking for feedback on the usecase.
How Has This Been Tested?
A test is written.
Screenshots (if appropriate):
Types of changes
Checklist: