Skip to content

Commit

Permalink
chore: Add a bunch of docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Feb 19, 2025
1 parent f4ad635 commit 7554212
Showing 1 changed file with 129 additions and 2 deletions.
131 changes: 129 additions & 2 deletions posthog/temporal/batch_exports/destination_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,19 @@ class Status(enum.StrEnum):

@dataclasses.dataclass
class DestinationTestStepResult:
"""The result of a test step.
Attributes:
status: Whether the test passed or failed.
message: An optional message, only included on failure, describing the
potential cause for the `status`.
"""

status: Status
message: str | None = None

def as_dict(self) -> DestinationTestStepResultDict:
"""Serialize this as a dictionary."""
return {
"status": str(self.status),
"message": self.message,
Expand All @@ -35,6 +44,7 @@ class DestinationTestStep:
Attributes:
name: A short (ideally) string used to identify this step.
description: A longer string with more details about this step.
result: After running this test step, the result will be populated.
"""

def __init__(self, name: str, description: str) -> None:
Expand All @@ -44,9 +54,15 @@ def __init__(self, name: str, description: str) -> None:

@abc.abstractmethod
async def run(self) -> DestinationTestStepResult:
"""Method called to run this test step.
Subclasses should override this method and implement their concrete running
operations.
"""
raise NotImplementedError

def as_dict(self) -> DestinationTestStepDict:
"""Serialize this as a dictionary."""
base: dict[str, str | DestinationTestStepResultDict | None] = {
"name": self.name,
"description": self.description,
Expand All @@ -59,27 +75,70 @@ def as_dict(self) -> DestinationTestStepDict:


class DestinationTest:
"""Interface representing a test executed for a particular destination.
A test is composed of multiple test steps organized in a linear hierarchy.
This is used to represent that a parent test step should pass before allowing
the next test steps (its children) to run. As a concrete example, if we have
a test to check whether we can connect to a database, and a second test step
to check whether we can create a table, it makes no sense to run the second
test step if the first one fails. Future revisions of this interface could
expand the hierarchy to allow for multiple paths (a tree), but for now a simple
list is sufficient.
Attributes:
steps: A property returning a sequence of steps to run.
"""

@abc.abstractmethod
def configure(self, **kwargs):
"""Method to configure a concrete test.
By "configure" I mean setting any attributes required to initialize and/or
run test steps.
Subclasses should override this to set any attributes. This decoupling of
configuration from initialization allows us to serialize a `DestinationTest`
without needing to configure it.
"""
raise NotImplementedError

@property
@abc.abstractmethod
def steps(self) -> collections.abc.Sequence[DestinationTestStep]:
"""Property returning a sequence of steps to run.
Subclasses should override this with their required test steps.
"""
raise NotImplementedError

def run_step(self, step: int) -> DestinationTestStep:
"""Run the test step at index `step`."""
test_step = self.steps[step]
step_result = async_to_sync(test_step.run)()

test_step.result = step_result
return test_step

def as_dict(self) -> dict[str, list[DestinationTestStepDict]]:
"""Serialize this as a dictionary."""
return {"steps": [step.as_dict() for step in self.steps]}


class S3CheckBucketExistsTestStep(DestinationTestStep):
"""Test whether an S3 bucket exists and we can access it.
This test could not be broken into two as the bucket not existing and not having
permissions to access it looks the same from our perspective.
Attributes:
bucket_name: The bucket we are checking.
region: Region where the bucket is supposed to be.
endpoint_url: Set for S3-compatible destinations.
aws_access_key_id: Access key ID for the bucket.
aws_secret_access_key: Secret access key for the bucket.
"""

def __init__(
self,
bucket_name: str | None = None,
Expand All @@ -98,6 +157,7 @@ def __init__(
self.aws_secret_access_key = aws_secret_access_key

async def run(self) -> DestinationTestStepResult:
"""Run this test step."""
import aioboto3
from botocore.exceptions import ClientError

Expand Down Expand Up @@ -141,22 +201,34 @@ async def run(self) -> DestinationTestStepResult:


class S3DestinationTest(DestinationTest):
"""A concrete implementation of a `DestinationTest` for S3.
Attributes:
bucket_name: The bucket we are batch exporting to.
region: Region where the bucket is supposed to be.
endpoint_url: Set for S3-compatible destinations.
aws_access_key_id: Access key ID for the bucket.
aws_secret_access_key: Secret access key for the bucket.
"""

def __init__(self):
self.bucket_name = None
self.region = None
self.endpoint_url = None
self.aws_access_key_id = None
self.aws_secret_access_key = None
self.endpoint_url = None

def configure(self, **kwargs):
"""Configure this test with necessary attributes."""
self.bucket_name = kwargs.get("bucket_name", None)
self.region = kwargs.get("region", None)
self.endpoint_url = kwargs.get("endpoint_url", None)
self.aws_access_key_id = kwargs.get("aws_access_key_id", None)
self.aws_secret_access_key = kwargs.get("aws_secret_access_key", None)
self.endpoint_url = kwargs.get("endpoint_url", None)

@property
def steps(self) -> collections.abc.Sequence[DestinationTestStep]:
"""Sequence of test steps that make up this destination test."""
return [
S3CheckBucketExistsTestStep(
bucket_name=self.bucket_name,
Expand All @@ -169,12 +241,24 @@ def steps(self) -> collections.abc.Sequence[DestinationTestStep]:


class BigQueryCheckProjectExistsTestStep(DestinationTestStep):
"""Test whether a BigQuery project exists and we can access it.
This test could not be broken into two as the project not existing and us not
having permissions to access it looks the same from our perspective.
Attributes:
project_id: ID of the BigQuery project we are checking.
service_account_info: Service account credentials used to access the
project.
"""

def __init__(self, project_id: str | None = None, service_account_info: dict[str, str] | None = None) -> None:
super().__init__(name="Check project exists", description="Verify the configured project exists")
self.project_id = project_id
self.service_account_info = service_account_info

async def run(self) -> DestinationTestStepResult:
"""Run this test step."""
from posthog.temporal.batch_exports.bigquery_batch_export import BigQueryClient

if self.project_id is None or self.service_account_info is None:
Expand All @@ -193,6 +277,18 @@ async def run(self) -> DestinationTestStepResult:


class BigQueryCheckDatasetExistsTestStep(DestinationTestStep):
"""Test whether a BigQuery dataset exists and we can access it.
This test could not be broken into two as the dataset not existing and us not
having permissions to access it looks the same from our perspective.
Attributes:
project_id: ID of the BigQuery project containing the dataset.
dataset_id: The ID of the dataset we are checking.
service_account_info: Service account credentials used to access the
project and dataset.
"""

def __init__(
self,
project_id: str | None = None,
Expand All @@ -205,6 +301,7 @@ def __init__(
self.service_account_info = service_account_info

async def run(self) -> DestinationTestStepResult:
"""Run this test step."""
from google.cloud.exceptions import NotFound

from posthog.temporal.batch_exports.bigquery_batch_export import BigQueryClient
Expand All @@ -226,6 +323,23 @@ async def run(self) -> DestinationTestStepResult:


class BigQueryCheckTableTestStep(DestinationTestStep):
"""Test whether a BigQuery table exists or we can create it.
A batch export will export data to an existing table or attempt to create
a new one if a table doesn't exist. In the second case, we should have
permissions to create a table.
We also check for permissions to delete a table, although more as a side-effect
of needing to clean-up after ourselves.
Attributes:
project_id: ID of the BigQuery project containing the dataset.
dataset_id: The ID of the dataset containing the table.
table_id: The ID of the table we are checking.
service_account_info: Service account credentials used to access the
project and dataset.
"""

def __init__(
self,
project_id: str | None = None,
Expand All @@ -242,6 +356,7 @@ def __init__(
self.service_account_info = service_account_info

async def run(self) -> DestinationTestStepResult:
"""Run this test step."""
from google.api_core.exceptions import BadRequest
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
Expand Down Expand Up @@ -293,13 +408,23 @@ async def run(self) -> DestinationTestStepResult:


class BigQueryDestinationTest(DestinationTest):
"""A concrete implementation of a `DestinationTest` for BigQuery.
Attributes:
project_id: ID of BigQuery project we are batch exporting to.
dataset_id: ID of BigQuery dataset we are batch exporting to.
table_id: ID of BigQuery table we are batch exporting to.
service_account_info: Service account credentials used to access BigQuery.
"""

def __init__(self):
self.project_id = None
self.dataset_id = None
self.table_id = None
self.service_account_info = None

def configure(self, **kwargs):
"""Configure this test with necessary attributes."""
self.project_id = kwargs.get("project_id", None)
self.dataset_id = kwargs.get("dataset_id", None)
self.table_id = kwargs.get("table_id", None)
Expand All @@ -312,6 +437,7 @@ def configure(self, **kwargs):

@property
def steps(self) -> collections.abc.Sequence[DestinationTestStep]:
"""Sequence of test steps that make up this destination test."""
return [
BigQueryCheckProjectExistsTestStep(
project_id=self.project_id, service_account_info=self.service_account_info
Expand All @@ -331,6 +457,7 @@ def steps(self) -> collections.abc.Sequence[DestinationTestStep]:
def get_destination_test(
destination: str,
) -> DestinationTest:
"""Resolve a destination to its corresponding `DestinationTest` implementation."""
if destination == "S3":
return S3DestinationTest()
elif destination == "BigQuery":
Expand Down

0 comments on commit 7554212

Please sign in to comment.