Skip to content

Commit

Permalink
feat: session state transfer (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
karenc-bq authored May 9, 2024
1 parent e758863 commit 164bef4
Show file tree
Hide file tree
Showing 37 changed files with 988 additions and 260 deletions.
17 changes: 0 additions & 17 deletions .github/styles/config/vocabularies/Aurora/accept.txt

This file was deleted.

12 changes: 5 additions & 7 deletions .github/workflows/autoscaling_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,18 @@ jobs:

steps:
- name: 'Clone repository'
uses: actions/checkout@v3
with:
fetch-depth: 50
uses: actions/checkout@v4

- name: 'Set up JDK 8'
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: 'corretto'
java-version: 8

- name: Install poetry
uses: abatilo/actions-poetry@v2
uses: abatilo/actions-poetry@v3
with:
poetry-version: '1.4.2'
poetry-version: '1.8.2'

- name: Install dependencies
run: poetry install
Expand Down Expand Up @@ -61,7 +59,7 @@ jobs:

- name: 'Archive results'
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: pytest-autoscaling-report
path: ./tests/integration/container/reports
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/draft_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: 'Install poetry'
uses: abatilo/actions-poetry@v2
with:
poetry-version: '1.4.2'
poetry-version: '1.8.2'
- name: 'Install dependencies'
run: poetry install
- name: 'Run mypy - static type checking'
Expand Down
14 changes: 6 additions & 8 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@ jobs:

steps:
- name: 'Clone repository'
uses: actions/checkout@v3
with:
fetch-depth: 50
uses: actions/checkout@v4

- name: 'Set up JDK 8'
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: 'corretto'
java-version: 8

- name: Install poetry
uses: abatilo/actions-poetry@v2
uses: abatilo/actions-poetry@v3
with:
poetry-version: '1.4.2'
poetry-version: '1.8.2'

- name: Install dependencies
run: poetry install

- name: 'Configure AWS Credentials'
uses: aws-actions/configure-aws-credentials@v1
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
Expand Down Expand Up @@ -70,7 +68,7 @@ jobs:

- name: 'Archive results'
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: pytest-integration-report
path: ./tests/integration/container/reports
Expand Down
12 changes: 5 additions & 7 deletions .github/workflows/integration_tests_codebuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ jobs:

steps:
- name: 'Clone repository'
uses: actions/checkout@v3
with:
fetch-depth: 50
uses: actions/checkout@v4

- name: 'Set up JDK 8'
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: 'corretto'
java-version: 8

- name: Install poetry
uses: abatilo/actions-poetry@v2
uses: abatilo/actions-poetry@v3
with:
poetry-version: '1.4.2'
poetry-version: '1.8.2'

- name: Install dependencies
run: poetry install
Expand Down Expand Up @@ -71,7 +69,7 @@ jobs:

- name: 'Archive results'
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: pytest-integration-report
path: ./tests/integration/container/reports
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
poetry-version: ["1.8.2"]

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/mysql_performance_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@ jobs:

steps:
- name: 'Clone repository'
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 50

- name: 'Set up JDK 8'
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: 'corretto'
java-version: 8

- name: Install poetry
uses: abatilo/actions-poetry@v2
with:
poetry-version: '1.4.2'
poetry-version: '1.8.2'

- name: Install dependencies
run: poetry install

- name: 'Configure AWS Credentials'
uses: aws-actions/configure-aws-credentials@v1
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
Expand Down Expand Up @@ -63,7 +63,7 @@ jobs:

- name: 'Archive results'
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: pytest-performance-report
path: ./tests/integration/container/reports
Expand Down
14 changes: 6 additions & 8 deletions .github/workflows/pg_performance_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,24 @@ jobs:

steps:
- name: 'Clone repository'
uses: actions/checkout@v3
with:
fetch-depth: 50
uses: actions/checkout@v4

- name: 'Set up JDK 8'
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: 'corretto'
java-version: 8

- name: Install poetry
uses: abatilo/actions-poetry@v2
uses: abatilo/actions-poetry@v3
with:
poetry-version: '1.4.2'
poetry-version: '1.8.2'

- name: Install dependencies
run: poetry install

- name: 'Configure AWS Credentials'
uses: aws-actions/configure-aws-credentials@v1
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
Expand Down Expand Up @@ -63,7 +61,7 @@ jobs:

- name: 'Archive results'
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: pytest-performance-report
path: ./tests/integration/container/reports
Expand Down
19 changes: 0 additions & 19 deletions .github/workflows/vale.yml

This file was deleted.

1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ The following table lists the connection properties used with the AWS Advanced P
| `failover_reader_connect_timeout_sec` | [Failover Plugin](./docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md) |
| `failover_timeout_sec` | [Failover Plugin](./docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md) |
| `failover_writer_reconnect_interval_sec` | [Failover Plugin](./docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md) |
| `keep_session_state_on_failover` | [Failover Plugin](./docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md) |
| `failure_detection_count` | [Host Monitoring Plugin](./docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md) |
| `failure_detection_enabled` | [Host Monitoring Plugin](./docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md) |
| `failure_detection_interval_ms` | [Host Monitoring Plugin](./docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md) |
Expand Down
52 changes: 4 additions & 48 deletions aws_advanced_python_wrapper/failover_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ def __init__(self, plugin_service: PluginService, props: Properties):
self._properties)
self._failover_reader_connect_timeout_sec = WrapperProperties.FAILOVER_READER_CONNECT_TIMEOUT_SEC.get_float(
self._properties)
self._keep_session_state_on_failover = WrapperProperties.KEEP_SESSION_STATE_ON_FAILOVER.get_bool(
self._properties)
self._telemetry_failover_additional_top_trace_setting = (
WrapperProperties.TELEMETRY_FAILOVER_ADDITIONAL_TOP_TRACE.get_bool(self._properties))
self._failover_mode: FailoverMode
Expand Down Expand Up @@ -162,12 +160,6 @@ def execute(self, target: type, method_name: str, execute_func: Callable, *args:
if self._is_closed and not self._allowed_on_closed_connection(method_name):
self._invalid_invocation_on_closed_connection()

if method_name == "Connection.set_read_only" and args is not None and len(args) > 0:
self._saved_read_only_status = bool(args[0])

if method_name == "Connection.autocommit_setter" and args is not None and len(args) > 0:
self._saved_auto_commit_status = bool(args[0])

try:
if self._requires_update_topology(method_name):
self._update_topology(False)
Expand Down Expand Up @@ -234,13 +226,6 @@ def _connect(
self._host_list_provider_service, host,
properties,
connect_func)
if self._keep_session_state_on_failover:
self._saved_read_only_status = False if self._saved_read_only_status == self._plugin_service.driver_dialect.is_read_only(
conn) \
else self._saved_read_only_status
self._saved_auto_commit_status = False \
if self._saved_read_only_status == self._plugin_service.driver_dialect.get_autocommit(conn) \
else self._saved_auto_commit_status

if is_initial_connection:
self._plugin_service.refresh_host_list(conn)
Expand All @@ -261,18 +246,15 @@ def _update_topology(self, force_update: bool):
else:
self._plugin_service.refresh_host_list()

def _transfer_session_state(self, from_conn: Connection, to_conn: Connection):
if from_conn is None or self._plugin_service.driver_dialect.is_closed(from_conn) or to_conn is None:
return

self._plugin_service.driver_dialect.transfer_session_state(from_conn, to_conn)

def _failover(self, failed_host: Optional[HostInfo]):
"""
Initiates the failover procedure. This process tries to establish a new connection to an instance in the topology.
:param failed_host: The host with network errors.
"""
if failed_host is not None:
self._plugin_service.set_availability(failed_host.as_aliases(), HostAvailability.UNAVAILABLE)

if self._failover_mode == FailoverMode.STRICT_WRITER:
self._failover_writer()
else:
Expand Down Expand Up @@ -312,8 +294,6 @@ def _failover_reader(self, failed_host: Optional[HostInfo]):
else:
if result.exception is not None:
raise result.exception
if self._keep_session_state_on_failover:
self.restore_session_state(result.connection)
if result.connection is not None and result.new_host is not None:
self._plugin_service.set_current_connection(result.connection, result.new_host)

Expand Down Expand Up @@ -342,7 +322,7 @@ def _failover_reader(self, failed_host: Optional[HostInfo]):

def _failover_writer(self):
telemetry_factory = self._plugin_service.get_telemetry_factory()
context = telemetry_factory.open_telemetry_context("failover to writer node", TelemetryTraceLevel.NESTED)
context = telemetry_factory.open_telemetry_context("failover to writer host", TelemetryTraceLevel.NESTED)
self._failover_writer_triggered_counter.inc()

try:
Expand All @@ -356,8 +336,6 @@ def _failover_writer(self):
raise FailoverFailedError(Messages.get("FailoverPlugin.UnableToConnectToWriter"))

writer_host = self._get_writer(result.topology)
if self._keep_session_state_on_failover:
self.restore_session_state(result.new_connection)

self._plugin_service.set_current_connection(result.new_connection, writer_host)

Expand All @@ -381,21 +359,6 @@ def _failover_writer(self):
if self._telemetry_failover_additional_top_trace_setting:
telemetry_factory.post_copy(context, TelemetryTraceLevel.FORCE_TOP_LEVEL)

def restore_session_state(self, conn: Optional[Connection]):
"""
Restores partial session state from saved values to a connection.
:param conn: The connection to transfer state to.
"""
if conn is None:
return

if self._saved_read_only_status is not None:
self._plugin_service.driver_dialect.set_read_only(conn, self._saved_read_only_status)

if self._saved_auto_commit_status is not None:
self._plugin_service.driver_dialect.set_autocommit(conn, self._saved_auto_commit_status)

def _invalidate_current_connection(self):
"""
Invalidate the current connection before switching to a new connection.
Expand Down Expand Up @@ -451,13 +414,6 @@ def _connect_to(self, host: HostInfo):
"""
try:
connection_for_host = self._plugin_service.connect(host, self._properties)
current_connection = self._plugin_service.current_connection

if connection_for_host is not None and current_connection is not None and \
current_connection != connection_for_host:
self._transfer_session_state(current_connection, connection_for_host)
self._invalidate_current_connection()

self._plugin_service.set_current_connection(connection_for_host, host)
self._plugin_service.update_in_transaction(False)

Expand Down
Loading

0 comments on commit 164bef4

Please sign in to comment.