Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add record/replay support #123

Merged
merged 8 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240716-172442.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add support for experimental record/replay testing.
time: 2024-07-16T17:24:42.271859-04:00
custom:
Author: peterallenwebb
Issue: "123"
34 changes: 25 additions & 9 deletions dbt/adapters/postgres/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from dbt.adapters.contracts.connection import AdapterResponse, Credentials
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.events.types import TypeCodeNotFound
from dbt.adapters.postgres.record import PostgresRecordReplayHandle
from dbt.adapters.sql import SQLConnectionManager
from dbt_common.exceptions import DbtDatabaseError, DbtRuntimeError
from dbt_common.events.functions import warn_or_error
from dbt_common.helper_types import Port
from dbt_common.record import get_record_mode_from_env, RecorderMode
from mashumaro.jsonschema.annotations import Maximum, Minimum
import psycopg2
from typing_extensions import Annotated
Expand Down Expand Up @@ -132,17 +134,31 @@ def open(cls, connection):
kwargs["application_name"] = credentials.application_name

def connect():
handle = psycopg2.connect(
dbname=credentials.database,
user=credentials.user,
host=credentials.host,
password=credentials.password,
port=credentials.port,
connect_timeout=credentials.connect_timeout,
**kwargs,
)
handle = None

# In replay mode, we won't connect to a real database at all, while
# in record and diff modes we do, but insert an intermediate handle
# object which monitors native connection activity.
rec_mode = get_record_mode_from_env()
if rec_mode != RecorderMode.REPLAY:
handle = psycopg2.connect(
dbname=credentials.database,
user=credentials.user,
host=credentials.host,
password=credentials.password,
port=credentials.port,
connect_timeout=credentials.connect_timeout,
**kwargs,
)

if rec_mode is not None:
# If using the record/replay mechanism, regardless of mode, we
# use a wrapper.
handle = PostgresRecordReplayHandle(handle, connection)

if credentials.role:
handle.cursor().execute("set role {}".format(credentials.role))

return handle

retryable_exceptions = [
Expand Down
2 changes: 2 additions & 0 deletions dbt/adapters/postgres/record/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from dbt.adapters.postgres.record.cursor.cursor import PostgresRecordReplayCursor
from dbt.adapters.postgres.record.handle import PostgresRecordReplayHandle
15 changes: 15 additions & 0 deletions dbt/adapters/postgres/record/cursor/cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from dbt_common.record import record_function

from dbt.adapters.record import RecordReplayCursor

from dbt.adapters.postgres.record.cursor.status import CursorGetStatusMessageRecord


class PostgresRecordReplayCursor(RecordReplayCursor):
"""A custom extension of RecordReplayCursor that adds the statusmessage
property which is specific to psycopg."""

@property
@record_function(CursorGetStatusMessageRecord, method=True, id_field_name="connection_name")
def statusmessage(self):
return self.native_cursor.statusmessage
21 changes: 21 additions & 0 deletions dbt/adapters/postgres/record/cursor/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import dataclasses
from typing import Optional

from dbt_common.record import Record, Recorder


@dataclasses.dataclass
class CursorGetStatusMessageParams:
connection_name: str


@dataclasses.dataclass
class CursorGetStatusMessageResult:
msg: Optional[str]


@Recorder.register_record_type
class CursorGetStatusMessageRecord(Record):
params_cls = CursorGetStatusMessageParams
result_cls = CursorGetStatusMessageResult
group = "Database"
12 changes: 12 additions & 0 deletions dbt/adapters/postgres/record/handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dbt.adapters.record import RecordReplayHandle

from dbt.adapters.postgres.record.cursor.cursor import PostgresRecordReplayCursor


class PostgresRecordReplayHandle(RecordReplayHandle):
"""A custom extension of RecordReplayHandle that returns
a psycopg-specific PostgresRecordReplayCursor object."""

def cursor(self):
cursor = None if self.native_handle is None else self.native_handle.cursor()
return PostgresRecordReplayCursor(cursor, self.connection)
Loading