From 7554212a7714bafe03f85a590ee15b6482e1e4df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 19 Feb 2025 16:58:44 +0100 Subject: [PATCH] chore: Add a bunch of docstrings --- .../batch_exports/destination_tests.py | 131 +++++++++++++++++- 1 file changed, 129 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/batch_exports/destination_tests.py b/posthog/temporal/batch_exports/destination_tests.py index 270d25c2b73f7..17e85634313e4 100644 --- a/posthog/temporal/batch_exports/destination_tests.py +++ b/posthog/temporal/batch_exports/destination_tests.py @@ -16,10 +16,19 @@ class Status(enum.StrEnum): @dataclasses.dataclass class DestinationTestStepResult: + """The result of a test step. + + Attributes: + status: Whether the test passed or failed. + message: An optional message, only included on failure, describing the + potential cause for the `status`. + """ + status: Status message: str | None = None def as_dict(self) -> DestinationTestStepResultDict: + """Serialize this as a dictionary.""" return { "status": str(self.status), "message": self.message, @@ -35,6 +44,7 @@ class DestinationTestStep: Attributes: name: A short (ideally) string used to identify this step. description: A longer string with more details about this step. + result: After running this test step, the result will be populated. """ def __init__(self, name: str, description: str) -> None: @@ -44,9 +54,15 @@ def __init__(self, name: str, description: str) -> None: @abc.abstractmethod async def run(self) -> DestinationTestStepResult: + """Method called to run this test step. + + Subclasses should override this method and implement their concrete running + operations. + """ raise NotImplementedError def as_dict(self) -> DestinationTestStepDict: + """Serialize this as a dictionary.""" base: dict[str, str | DestinationTestStepResultDict | None] = { "name": self.name, "description": self.description, @@ -59,16 +75,45 @@ def as_dict(self) -> DestinationTestStepDict: class DestinationTest: + """Interface representing a test executed for a particular destination. + + A test is composed of multiple test steps organized in a linear hierarchy. + This is used to represent that a parent test step should pass before allowing + the next test steps (its children) to run. As a concrete example, if we have + a test to check whether we can connect to a database, and a second test step + to check whether we can create a table, it makes no sense to run the second + test step if the first one fails. Future revisions of this interface could + expand the hierarchy to allow for multiple paths (a tree), but for now a simple + list is sufficient. + + Attributes: + steps: A property returning a sequence of steps to run. + """ + @abc.abstractmethod def configure(self, **kwargs): + """Method to configure a concrete test. + + By "configure" I mean setting any attributes required to initialize and/or + run test steps. + + Subclasses should override this to set any attributes. This decoupling of + configuration from initialization allows us to serialize a `DestinationTest` + without needing to configure it. + """ raise NotImplementedError @property @abc.abstractmethod def steps(self) -> collections.abc.Sequence[DestinationTestStep]: + """Property returning a sequence of steps to run. + + Subclasses should override this with their required test steps. + """ raise NotImplementedError def run_step(self, step: int) -> DestinationTestStep: + """Run the test step at index `step`.""" test_step = self.steps[step] step_result = async_to_sync(test_step.run)() @@ -76,10 +121,24 @@ def run_step(self, step: int) -> DestinationTestStep: return test_step def as_dict(self) -> dict[str, list[DestinationTestStepDict]]: + """Serialize this as a dictionary.""" return {"steps": [step.as_dict() for step in self.steps]} class S3CheckBucketExistsTestStep(DestinationTestStep): + """Test whether an S3 bucket exists and we can access it. + + This test could not be broken into two as the bucket not existing and not having + permissions to access it looks the same from our perspective. + + Attributes: + bucket_name: The bucket we are checking. + region: Region where the bucket is supposed to be. + endpoint_url: Set for S3-compatible destinations. + aws_access_key_id: Access key ID for the bucket. + aws_secret_access_key: Secret access key for the bucket. + """ + def __init__( self, bucket_name: str | None = None, @@ -98,6 +157,7 @@ def __init__( self.aws_secret_access_key = aws_secret_access_key async def run(self) -> DestinationTestStepResult: + """Run this test step.""" import aioboto3 from botocore.exceptions import ClientError @@ -141,22 +201,34 @@ async def run(self) -> DestinationTestStepResult: class S3DestinationTest(DestinationTest): + """A concrete implementation of a `DestinationTest` for S3. + + Attributes: + bucket_name: The bucket we are batch exporting to. + region: Region where the bucket is supposed to be. + endpoint_url: Set for S3-compatible destinations. + aws_access_key_id: Access key ID for the bucket. + aws_secret_access_key: Secret access key for the bucket. + """ + def __init__(self): self.bucket_name = None self.region = None + self.endpoint_url = None self.aws_access_key_id = None self.aws_secret_access_key = None - self.endpoint_url = None def configure(self, **kwargs): + """Configure this test with necessary attributes.""" self.bucket_name = kwargs.get("bucket_name", None) self.region = kwargs.get("region", None) + self.endpoint_url = kwargs.get("endpoint_url", None) self.aws_access_key_id = kwargs.get("aws_access_key_id", None) self.aws_secret_access_key = kwargs.get("aws_secret_access_key", None) - self.endpoint_url = kwargs.get("endpoint_url", None) @property def steps(self) -> collections.abc.Sequence[DestinationTestStep]: + """Sequence of test steps that make up this destination test.""" return [ S3CheckBucketExistsTestStep( bucket_name=self.bucket_name, @@ -169,12 +241,24 @@ def steps(self) -> collections.abc.Sequence[DestinationTestStep]: class BigQueryCheckProjectExistsTestStep(DestinationTestStep): + """Test whether a BigQuery project exists and we can access it. + + This test could not be broken into two as the project not existing and us not + having permissions to access it looks the same from our perspective. + + Attributes: + project_id: ID of the BigQuery project we are checking. + service_account_info: Service account credentials used to access the + project. + """ + def __init__(self, project_id: str | None = None, service_account_info: dict[str, str] | None = None) -> None: super().__init__(name="Check project exists", description="Verify the configured project exists") self.project_id = project_id self.service_account_info = service_account_info async def run(self) -> DestinationTestStepResult: + """Run this test step.""" from posthog.temporal.batch_exports.bigquery_batch_export import BigQueryClient if self.project_id is None or self.service_account_info is None: @@ -193,6 +277,18 @@ async def run(self) -> DestinationTestStepResult: class BigQueryCheckDatasetExistsTestStep(DestinationTestStep): + """Test whether a BigQuery dataset exists and we can access it. + + This test could not be broken into two as the dataset not existing and us not + having permissions to access it looks the same from our perspective. + + Attributes: + project_id: ID of the BigQuery project containing the dataset. + dataset_id: The ID of the dataset we are checking. + service_account_info: Service account credentials used to access the + project and dataset. + """ + def __init__( self, project_id: str | None = None, @@ -205,6 +301,7 @@ def __init__( self.service_account_info = service_account_info async def run(self) -> DestinationTestStepResult: + """Run this test step.""" from google.cloud.exceptions import NotFound from posthog.temporal.batch_exports.bigquery_batch_export import BigQueryClient @@ -226,6 +323,23 @@ async def run(self) -> DestinationTestStepResult: class BigQueryCheckTableTestStep(DestinationTestStep): + """Test whether a BigQuery table exists or we can create it. + + A batch export will export data to an existing table or attempt to create + a new one if a table doesn't exist. In the second case, we should have + permissions to create a table. + + We also check for permissions to delete a table, although more as a side-effect + of needing to clean-up after ourselves. + + Attributes: + project_id: ID of the BigQuery project containing the dataset. + dataset_id: The ID of the dataset containing the table. + table_id: The ID of the table we are checking. + service_account_info: Service account credentials used to access the + project and dataset. + """ + def __init__( self, project_id: str | None = None, @@ -242,6 +356,7 @@ def __init__( self.service_account_info = service_account_info async def run(self) -> DestinationTestStepResult: + """Run this test step.""" from google.api_core.exceptions import BadRequest from google.cloud import bigquery from google.cloud.exceptions import NotFound @@ -293,6 +408,15 @@ async def run(self) -> DestinationTestStepResult: class BigQueryDestinationTest(DestinationTest): + """A concrete implementation of a `DestinationTest` for BigQuery. + + Attributes: + project_id: ID of BigQuery project we are batch exporting to. + dataset_id: ID of BigQuery dataset we are batch exporting to. + table_id: ID of BigQuery table we are batch exporting to. + service_account_info: Service account credentials used to access BigQuery. + """ + def __init__(self): self.project_id = None self.dataset_id = None @@ -300,6 +424,7 @@ def __init__(self): self.service_account_info = None def configure(self, **kwargs): + """Configure this test with necessary attributes.""" self.project_id = kwargs.get("project_id", None) self.dataset_id = kwargs.get("dataset_id", None) self.table_id = kwargs.get("table_id", None) @@ -312,6 +437,7 @@ def configure(self, **kwargs): @property def steps(self) -> collections.abc.Sequence[DestinationTestStep]: + """Sequence of test steps that make up this destination test.""" return [ BigQueryCheckProjectExistsTestStep( project_id=self.project_id, service_account_info=self.service_account_info @@ -331,6 +457,7 @@ def steps(self) -> collections.abc.Sequence[DestinationTestStep]: def get_destination_test( destination: str, ) -> DestinationTest: + """Resolve a destination to its corresponding `DestinationTest` implementation.""" if destination == "S3": return S3DestinationTest() elif destination == "BigQuery":