Skip to content

Commit

Permalink
Merge pull request #390 from TEAMSchools/dagster-ssh-resource
Browse files Browse the repository at this point in the history
Dagster ssh resource
  • Loading branch information
cbini authored Nov 17, 2023
2 parents b33d841 + aad9f17 commit c6c0d82
Show file tree
Hide file tree
Showing 24 changed files with 129 additions and 172 deletions.
6 changes: 2 additions & 4 deletions src/teamster/core/achieve3k/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from paramiko.ssh_exception import SSHException

from teamster.core.sftp.assets import listdir_attr_r
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource


def build_sftp_sensor(
Expand All @@ -30,9 +30,7 @@ def build_sftp_sensor(
minimum_interval_seconds=minimum_interval_seconds,
asset_selection=AssetSelection.assets(*asset_defs),
)
def _sensor(
context: SensorEvaluationContext, ssh_achieve3k: SSHConfigurableResource
):
def _sensor(context: SensorEvaluationContext, ssh_achieve3k: SSHResource):
cursor: dict = json.loads(context.cursor or "{}")
now = pendulum.now(tz=timezone)

Expand Down
6 changes: 2 additions & 4 deletions src/teamster/core/adp/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from paramiko.ssh_exception import SSHException

from teamster.core.sftp.assets import listdir_attr_r
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource


def build_sftp_sensor(
Expand All @@ -28,9 +28,7 @@ def build_sftp_sensor(
minimum_interval_seconds=minimum_interval_seconds,
asset_selection=AssetSelection.assets(*asset_defs),
)
def _sensor(
context: SensorEvaluationContext, ssh_adp_workforce_now: SSHConfigurableResource
):
def _sensor(context: SensorEvaluationContext, ssh_adp_workforce_now: SSHResource):
cursor: dict = json.loads(context.cursor or "{}")
now = pendulum.now(tz=timezone)

Expand Down
6 changes: 2 additions & 4 deletions src/teamster/core/clever/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from paramiko.ssh_exception import SSHException

from teamster.core.sftp.assets import listdir_attr_r
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource


def build_sftp_sensor(
Expand All @@ -31,9 +31,7 @@ def build_sftp_sensor(
minimum_interval_seconds=minimum_interval_seconds,
asset_selection=AssetSelection.assets(*asset_defs),
)
def _sensor(
context: SensorEvaluationContext, ssh_clever_reports: SSHConfigurableResource
):
def _sensor(context: SensorEvaluationContext, ssh_clever_reports: SSHResource):
cursor: dict = json.loads(context.cursor or "{}")
now = pendulum.now(tz=timezone)

Expand Down
4 changes: 2 additions & 2 deletions src/teamster/core/datagun/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pandas import DataFrame
from sqlalchemy import literal_column, select, table, text

from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource
from teamster.core.utils.classes import CustomJSONEncoder


Expand Down Expand Up @@ -74,7 +74,7 @@ def transform_data(data, file_suffix, file_encoding=None, file_format=None):

def load_sftp(
context: AssetExecutionContext,
ssh: SSHConfigurableResource,
ssh: SSHResource,
data,
file_name,
destination_path,
Expand Down
4 changes: 2 additions & 2 deletions src/teamster/core/edplan/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from paramiko.ssh_exception import SSHException

from teamster.core.sftp.assets import listdir_attr_r
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource


def build_sftp_sensor(
Expand All @@ -28,7 +28,7 @@ def build_sftp_sensor(
minimum_interval_seconds=minimum_interval_seconds,
asset_selection=AssetSelection.assets(*asset_defs),
)
def _sensor(context: SensorEvaluationContext, ssh_edplan: SSHConfigurableResource):
def _sensor(context: SensorEvaluationContext, ssh_edplan: SSHResource):
cursor: dict = json.loads(context.cursor or "{}")
now = pendulum.now(tz=timezone)

Expand Down
4 changes: 2 additions & 2 deletions src/teamster/core/iready/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from paramiko.ssh_exception import SSHException

from teamster.core.sftp.assets import listdir_attr_r
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource


def build_sftp_sensor(
Expand All @@ -30,7 +30,7 @@ def build_sftp_sensor(
minimum_interval_seconds=minimum_interval_seconds,
asset_selection=AssetSelection.assets(*asset_defs),
)
def _sensor(context: SensorEvaluationContext, ssh_iready: SSHConfigurableResource):
def _sensor(context: SensorEvaluationContext, ssh_iready: SSHResource):
cursor: dict = json.loads(context.cursor or "{}")
now = pendulum.now(tz=timezone)

Expand Down
4 changes: 2 additions & 2 deletions src/teamster/core/powerschool/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sqlalchemy import literal_column, select, table, text

from teamster.core.sqlalchemy.resources import OracleResource
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource
from teamster.core.utils.classes import FiscalYearPartitionsDefinition


Expand All @@ -31,7 +31,7 @@ def build_powerschool_table_asset(
)
def _asset(
context: AssetExecutionContext,
ssh_powerschool: SSHConfigurableResource,
ssh_powerschool: SSHResource,
db_powerschool: OracleResource,
):
now = pendulum.now()
Expand Down
4 changes: 2 additions & 2 deletions src/teamster/core/powerschool/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from sqlalchemy import text

from teamster.core.sqlalchemy.resources import OracleResource
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource


def build_last_modified_schedule(
Expand All @@ -32,7 +32,7 @@ def build_last_modified_schedule(
)
def _schedule(
context: ScheduleEvaluationContext,
ssh_powerschool: SSHConfigurableResource,
ssh_powerschool: SSHResource,
db_powerschool: OracleResource,
):
ssh_tunnel = ssh_powerschool.get_tunnel(remote_port=1521, local_port=1521)
Expand Down
4 changes: 2 additions & 2 deletions src/teamster/core/powerschool/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from sqlalchemy import text

from teamster.core.sqlalchemy.resources import OracleResource
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource
from teamster.core.utils.classes import FiscalYearPartitionsDefinition


Expand All @@ -27,7 +27,7 @@ def build_partition_sensor(
)
def _sensor(
context: SensorEvaluationContext,
ssh_powerschool: SSHConfigurableResource,
ssh_powerschool: SSHResource,
db_powerschool: OracleResource,
):
now = pendulum.now(timezone).start_of("minute")
Expand Down
6 changes: 2 additions & 4 deletions src/teamster/core/renlearn/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from paramiko.ssh_exception import SSHException

from teamster.core.sftp.assets import listdir_attr_r
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource


def build_sftp_sensor(
Expand All @@ -34,9 +34,7 @@ def build_sftp_sensor(
minimum_interval_seconds=minimum_interval_seconds,
asset_selection=AssetSelection.assets(*asset_defs),
)
def _sensor(
context: SensorEvaluationContext, ssh_renlearn: SSHConfigurableResource
):
def _sensor(context: SensorEvaluationContext, ssh_renlearn: SSHResource):
cursor: dict = json.loads(context.cursor or "{}")
now = pendulum.now(tz=timezone)

Expand Down
6 changes: 3 additions & 3 deletions src/teamster/core/sftp/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from paramiko import SFTPClient
from slugify import slugify

from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource
from teamster.core.utils.functions import regex_pattern_replace


Expand All @@ -37,7 +37,7 @@ def listdir_attr_r(sftp_client: SFTPClient, remote_dir: str, files: list = []):
return files


def match_sftp_files(ssh: SSHConfigurableResource, remote_dir, remote_file_regex):
def match_sftp_files(ssh: SSHResource, remote_dir, remote_file_regex):
# list files remote filepath
with ssh.get_connection() as conn:
with conn.open_sftp() as sftp_client:
Expand Down Expand Up @@ -105,7 +105,7 @@ def build_sftp_asset(
def _asset(context: AssetExecutionContext):
context.log.debug(requests.get(url="https://api.ipify.org").text)

ssh: SSHConfigurableResource = getattr(context.resources, ssh_resource_key)
ssh: SSHResource = getattr(context.resources, ssh_resource_key)

# find matching file for partition
remote_file_regex_composed = compose_regex(
Expand Down
64 changes: 13 additions & 51 deletions src/teamster/core/ssh/resources.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,21 @@
from dagster import ConfigurableResource, InitResourceContext
from dagster_ssh import SSHResource
from pydantic import PrivateAttr
from sshtunnel import SSHTunnelForwarder


class SSHConfigurableResource(ConfigurableResource):
remote_host: str
remote_port: str = "22"
username: str = None
password: str = None
key_file: str = None
key_string: str = None
timeout: int = 10
keepalive_interval: int = 30
compress: bool = True
no_host_key_check: bool = True
allow_host_key_change: bool = False
tunnel_remote_host: str = "localhost"
class SSHResource(SSHResource):
remote_port = 22
tunnel_remote_host: str = None

_internal_resource: SSHResource = PrivateAttr()

def setup_for_execution(self, context: InitResourceContext) -> None:
self._internal_resource = SSHResource(
remote_host=self.remote_host,
remote_port=int(self.remote_port),
username=self.username,
password=self.password,
key_file=self.key_file,
key_string=self.key_string,
timeout=self.timeout,
keepalive_interval=self.keepalive_interval,
compress=self.compress,
no_host_key_check=self.no_host_key_check,
allow_host_key_change=self.allow_host_key_change,
logger=self.get_resource_context().log,
)

def get_connection(self):
return self._internal_resource.get_connection()

def get_tunnel(self, remote_port, remote_host=None, local_port=None):
if remote_host is None:
def get_tunnel(
self, remote_port, remote_host=None, local_port=None
) -> SSHTunnelForwarder:
if remote_host is not None:
pass
elif self.tunnel_remote_host is not None:
remote_host = self.tunnel_remote_host
else:
remote_host = "localhost"

return self._internal_resource.get_tunnel(
return super().get_tunnel(
remote_port=remote_port, remote_host=remote_host, local_port=local_port
)

def sftp_get(self, remote_filepath, local_filepath):
return self._internal_resource.sftp_get(
remote_filepath=remote_filepath, local_filepath=local_filepath
)

def sftp_put(self, remote_filepath, local_filepath, confirm=True):
return self._internal_resource.sftp_put(
remote_filepath=remote_filepath,
local_filepath=local_filepath,
confirm=confirm,
)
4 changes: 2 additions & 2 deletions src/teamster/core/titan/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from paramiko.ssh_exception import SSHException

from teamster.core.sftp.assets import listdir_attr_r
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource


def build_sftp_sensor(
Expand All @@ -29,7 +29,7 @@ def build_sftp_sensor(
minimum_interval_seconds=minimum_interval_seconds,
asset_selection=AssetSelection.assets(*asset_defs),
)
def _sensor(context: SensorEvaluationContext, ssh_titan: SSHConfigurableResource):
def _sensor(context: SensorEvaluationContext, ssh_titan: SSHResource):
cursor: dict = json.loads(context.cursor or "{}")
now = pendulum.now(tz=timezone)

Expand Down
16 changes: 8 additions & 8 deletions src/teamster/kippcamden/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from teamster.core.deanslist.resources import DeansListResource
from teamster.core.google.io.resources import gcs_io_manager
from teamster.core.sqlalchemy.resources import OracleResource, SqlAlchemyEngineResource
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.ssh.resources import SSHResource

from . import (
CODE_LOCATION,
Expand Down Expand Up @@ -84,34 +84,34 @@
subdomain="kippnj",
api_key_map="/etc/secret-volume/deanslist_api_key_map_yaml",
),
"ssh_couchdrop": SSHConfigurableResource(
"ssh_couchdrop": SSHResource(
remote_host="kipptaf.couchdrop.io",
username=EnvVar("COUCHDROP_SFTP_USERNAME"),
password=EnvVar("COUCHDROP_SFTP_PASSWORD"),
),
"ssh_cpn": SSHConfigurableResource(
"ssh_cpn": SSHResource(
remote_host="sftp.careevolution.com",
username=EnvVar("CPN_SFTP_USERNAME"),
password=EnvVar("CPN_SFTP_PASSWORD"),
),
"ssh_edplan": SSHConfigurableResource(
"ssh_edplan": SSHResource(
remote_host="secureftp.easyiep.com",
username=EnvVar("KIPPCAMDEN_EDPLAN_SFTP_USERNAME"),
password=EnvVar("KIPPCAMDEN_EDPLAN_SFTP_PASSWORD"),
),
"ssh_powerschool": SSHConfigurableResource(
"ssh_powerschool": SSHResource(
remote_host="pskcna.kippnj.org",
remote_port=EnvVar("KIPPCAMDEN_PS_SSH_PORT"),
remote_port=EnvVar("KIPPCAMDEN_PS_SSH_PORT").get_value(),
username=EnvVar("KIPPCAMDEN_PS_SSH_USERNAME"),
password=EnvVar("KIPPCAMDEN_PS_SSH_PASSWORD"),
tunnel_remote_host=EnvVar("KIPPCAMDEN_PS_SSH_REMOTE_BIND_HOST"),
),
"ssh_pythonanywhere": SSHConfigurableResource(
"ssh_pythonanywhere": SSHResource(
remote_host="ssh.pythonanywhere.com",
username=EnvVar("PYTHONANYWHERE_SFTP_USERNAME"),
password=EnvVar("PYTHONANYWHERE_SFTP_PASSWORD"),
),
"ssh_titan": SSHConfigurableResource(
"ssh_titan": SSHResource(
remote_host="sftp.titank12.com",
username=EnvVar("KIPPCAMDEN_TITAN_SFTP_USERNAME"),
password=EnvVar("KIPPCAMDEN_TITAN_SFTP_PASSWORD"),
Expand Down
Loading

0 comments on commit c6c0d82

Please sign in to comment.