Skip to content

Commit f8e705e

Browse files
committed
WIP - need to fix issue with sqlite runner txs
1 parent 9a68c8e commit f8e705e

File tree

5 files changed

+574
-41
lines changed

5 files changed

+574
-41
lines changed

src/flux/runner.py

+38-15
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from flux.backend.applied_migration import AppliedMigration
55
from flux.backend.base import MigrationBackend
66
from flux.config import FluxConfig
7-
from flux.exceptions import MigrationDirectoryCorruptedError
7+
from flux.exceptions import MigrationApplyError, MigrationDirectoryCorruptedError
88
from flux.migration.migration import Migration
99
from flux.migration.read_migration import (
1010
read_migrations,
@@ -37,12 +37,12 @@ class FluxRunner:
3737

3838
async def __aenter__(self):
3939
self._conn_ctx = self.backend.connection()
40-
self._tx_ctx = self.backend.transaction()
4140
self._lock_ctx = self.backend.migration_lock()
41+
self._tx_ctx = self.backend.transaction()
4242

4343
await self._conn_ctx.__aenter__()
44-
await self._tx_ctx.__aenter__()
4544
await self._lock_ctx.__aenter__()
45+
await self._tx_ctx.__aenter__()
4646

4747
if not await self.backend.is_initialized():
4848
await self.backend.initialize()
@@ -66,14 +66,17 @@ async def validate_applied_migrations(self):
6666
- There is no discontinuity in the applied migrations
6767
- The migration hashes of all applied migrations haven't changed
6868
"""
69+
applied_migrations = sorted(self.applied_migrations, key=lambda m: m.id)
70+
if not applied_migrations:
71+
return
72+
73+
last_applied_migration = applied_migrations[-1]
6974
applied_migration_files = [
70-
m
71-
for m in self.migrations
72-
if m.id in {_m.id for _m in self.applied_migrations}
75+
m for m in self.migrations if m.id <= last_applied_migration.id
7376
]
7477

75-
if not [m.id for m in applied_migration_files] == [
76-
m.id for m in self.migrations[: len(applied_migration_files)]
78+
if [m.id for m in applied_migration_files] != [
79+
m.id for m in applied_migrations
7780
]:
7881
raise MigrationDirectoryCorruptedError(
7982
"There is a discontinuity in the applied migrations"
@@ -100,17 +103,32 @@ async def apply_migrations(self, n: int | None = None):
100103
migrations_to_apply = unapplied_migrations[:n]
101104

102105
for migration in self.pre_apply_migrations:
103-
await self.backend.apply_migration(migration.up)
106+
try:
107+
await self.backend.apply_migration(migration.up)
108+
except Exception as e:
109+
raise MigrationApplyError(
110+
f"Failed to apply pre-apply migration {migration.id}"
111+
) from e
104112

105113
for migration in migrations_to_apply:
106114
if migration.id in {m.id for m in self.applied_migrations}:
107115
continue
108116

109-
await self.backend.apply_migration(migration.up)
110-
await self.backend.register_migration(migration)
117+
try:
118+
await self.backend.apply_migration(migration.up)
119+
await self.backend.register_migration(migration)
120+
except Exception as e:
121+
raise MigrationApplyError(
122+
f"Failed to apply migration {migration.id}"
123+
) from e
111124

112125
for migration in self.post_apply_migrations:
113-
await self.backend.apply_migration(migration.up)
126+
try:
127+
await self.backend.apply_migration(migration.up)
128+
except Exception as e:
129+
raise MigrationApplyError(
130+
f"Failed to apply post-apply migration {migration.id}"
131+
) from e
114132

115133
self.applied_migrations = await self.backend.get_applied_migrations()
116134

@@ -129,8 +147,13 @@ async def rollback_migrations(self, n: int | None = None):
129147
migrations_to_rollback = migrations_to_rollback[::-1]
130148

131149
for migration in migrations_to_rollback:
132-
if migration.down is not None:
133-
await self.backend.apply_migration(migration.down)
134-
await self.backend.unregister_migration(migration)
150+
try:
151+
if migration.down is not None:
152+
await self.backend.apply_migration(migration.down)
153+
await self.backend.unregister_migration(migration)
154+
except Exception as e:
155+
raise MigrationApplyError(
156+
f"Failed to rollback migration {migration.id}"
157+
) from e
135158

136159
self.applied_migrations = await self.backend.get_applied_migrations()

tests/integration/sqlite/backend.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from flux.backend.applied_migration import AppliedMigration
99
from flux.backend.base import MigrationBackend
1010
from flux.config import FluxConfig
11-
from flux.exceptions import MigrationApplyError
1211
from flux.migration.migration import Migration
1312

1413
VALID_TABLE_NAME = r"^[A-Za-z0-9_]+$"
@@ -127,7 +126,7 @@ async def register_migration(self, migration: Migration) -> AppliedMigration:
127126
)
128127
row = await cursor.fetchone()
129128
if row is None:
130-
raise MigrationApplyError("Failed to register migration")
129+
raise RuntimeError("Failed to register migration")
131130
return AppliedMigration(id=row[0], hash=row[1], applied_at=row[2])
132131

133132
async def unregister_migration(self, migration: Migration):

0 commit comments

Comments
 (0)