Refactoring

This commit is contained in:
2025-11-07 20:38:09 +02:00
parent 5566f6f6ba
commit 5bbb722b20
26 changed files with 1097 additions and 770 deletions

View File

@@ -1,135 +1,195 @@
import Foundation
import DataLiteCore
/// Thread-safe service for executing ordered database schema migrations.
#if os(Windows)
import WinSDK
#endif
/// A service that executes ordered database schema migrations.
///
/// `MigrationService` stores registered migrations and applies them sequentially
/// to update the database schema. Each migration runs only once, in version order,
/// based on the current schema version stored in the database.
/// ## Overview
///
/// The service is generic over:
/// - `Service`: a database service conforming to ``DatabaseServiceProtocol``
/// - `Storage`: a version storage conforming to ``VersionStorage``
/// This class manages migration registration and applies them sequentially to update the database
/// schema. Each migration corresponds to a specific version and runs only once, ensuring that
/// schema upgrades are applied in a consistent, deterministic way.
///
/// Migrations are identified by version and script URL. Both must be unique
/// across all registered migrations.
/// Migrations are executed within an exclusive transaction if any step fails, the entire process
/// is rolled back, leaving the database unchanged.
///
/// Execution is performed inside a single `.exclusive` transaction, ensuring
/// that either all pending migrations are applied successfully or none are.
/// On error, the database state is rolled back to the original version.
/// `MigrationService` coordinates the migration process by:
/// - Managing a registry of unique migrations.
/// - Reading and writing the current schema version through a ``VersionStorage`` implementation.
/// - Executing SQL scripts in ascending version order.
///
/// This type is safe to use from multiple threads.
/// It is safe for concurrent use. Internally, it uses a POSIX mutex to ensure thread-safe
/// registration and execution.
///
/// ## Usage
///
/// ```swift
/// let connection = try Connection(location: .inMemory, options: .readwrite)
/// let storage = UserVersionStorage<BitPackVersion>()
/// let service = MigrationService(service: connectionService, storage: storage)
/// let storage = UserVersionStorage<SemanticVersion>()
/// let service = MigrationService(provider: { connection }, storage: storage)
///
/// try service.add(Migration(version: "1.0.0", byResource: "v_1_0_0.sql")!)
/// try service.add(Migration(version: "1.0.1", byResource: "v_1_0_1.sql")!)
/// try service.add(Migration(version: "1.0.0", byResource: "v1_0_0.sql")!)
/// try service.add(Migration(version: "1.0.1", byResource: "v1_0_1.sql")!)
/// try service.migrate()
/// ```
///
/// ### Custom Versions and Storage
/// ## Topics
///
/// You can supply a custom `Version` type conforming to ``VersionRepresentable``
/// and a `VersionStorage` implementation that determines how and where the
/// version is persisted (e.g., `PRAGMA user_version`, metadata table, etc.).
/// ### Initializers
///
/// - ``init(provider:config:queue:storage:)``
/// - ``init(provider:config:queue:)``
///
/// ### Migration Management
///
/// - ``add(_:)``
/// - ``migrate()``
public final class MigrationService<
Service: DatabaseServiceProtocol,
Storage: VersionStorage
>:
ConnectionService,
MigrationServiceProtocol,
@unchecked Sendable
{
/// Schema version type used for migration ordering.
// MARK: - Typealiases
/// The type representing schema version ordering.
public typealias Version = Storage.Version
private let service: Service
// MARK: - Properties
private let storage: Storage
private var mutex = pthread_mutex_t()
private var migrations = Set<Migration<Version>>()
/// Encryption key provider delegated to the underlying database service.
public weak var keyProvider: DatabaseServiceKeyProvider? {
get { service.keyProvider }
set { service.keyProvider = newValue }
}
#if os(Windows)
private var mutex = SRWLOCK()
#else
private var mutex = pthread_mutex_t()
#endif
/// Creates a migration service with the given database service and storage.
// MARK: - Inits
/// Creates a migration service with a specified connection configuration and version storage.
///
/// - Parameters:
/// - service: Database service used to execute migrations.
/// - storage: Version storage for reading and writing schema version.
/// - provider: A closure that returns a new database connection.
/// - config: An optional configuration closure called after the connection is established.
/// - queue: An optional target queue for internal database operations.
/// - storage: The version storage responsible for reading and writing schema version data.
public init(
service: Service,
provider: @escaping ConnectionProvider,
config: ConnectionConfig? = nil,
queue: DispatchQueue? = nil,
storage: Storage
) {
self.service = service
self.storage = storage
pthread_mutex_init(&mutex, nil)
super.init(provider: provider, config: config, queue: queue)
#if os(Windows)
InitializeSRWLock(&mutex)
#else
pthread_mutex_init(&mutex, nil)
#endif
}
/// Creates a migration service using the default version storage.
///
/// - Parameters:
/// - provider: A closure that returns a new database connection.
/// - config: An optional configuration closure called after the connection is established.
/// - queue: An optional target queue for internal database operations.
public required init(
provider: @escaping ConnectionProvider,
config: ConnectionConfig? = nil,
queue: DispatchQueue? = nil
) {
self.storage = .init()
super.init(provider: provider, config: config, queue: queue)
#if os(Windows)
InitializeSRWLock(&mutex)
#else
pthread_mutex_init(&mutex, nil)
#endif
}
deinit {
pthread_mutex_destroy(&mutex)
#if !os(Windows)
pthread_mutex_destroy(&mutex)
#endif
}
// MARK: - Unsupported
@available(*, unavailable)
public override func setNeedsReconnect() -> Bool {
fatalError("Reconnection is not supported for MigrationService.")
}
@available(*, unavailable)
public override func perform<T>(_ closure: Perform<T>) throws -> T {
fatalError("Direct perform is not supported for MigrationService.")
}
// MARK: - Migration Management
/// Registers a new migration, ensuring version and script URL uniqueness.
///
/// - Parameter migration: The migration to register.
/// - Throws: ``MigrationError/duplicateMigration(_:)`` if the migration's
/// version or script URL is already registered.
/// - Throws: ``MigrationError/duplicateMigration(_:)`` if a migration with the same version or
/// script URL is already registered.
public func add(_ migration: Migration<Version>) throws(MigrationError<Version>) {
pthread_mutex_lock(&mutex)
defer { pthread_mutex_unlock(&mutex) }
guard !migrations.contains(where: {
$0.version == migration.version
|| $0.scriptURL == migration.scriptURL
}) else {
#if os(Windows)
AcquireSRWLockExclusive(&mutex)
defer { ReleaseSRWLockExclusive(&mutex) }
#else
pthread_mutex_lock(&mutex)
defer { pthread_mutex_unlock(&mutex) }
#endif
guard
!migrations.contains(where: {
$0.version == migration.version
|| $0.scriptURL == migration.scriptURL
})
else {
throw .duplicateMigration(migration)
}
migrations.insert(migration)
}
/// Executes all pending migrations inside a single exclusive transaction.
/// Executes all pending migrations in ascending version order.
///
/// This method retrieves the current schema version from storage, then determines
/// which migrations have a higher version. The selected migrations are sorted in
/// ascending order and each one's SQL script is executed in sequence. When all
/// scripts complete successfully, the stored version is updated to the highest
/// applied migration.
/// The service retrieves the current version from ``VersionStorage``, selects migrations with
/// higher versions, sorts them, and executes their scripts inside an exclusive transaction.
///
/// If a script is empty or execution fails, the process aborts and the transaction
/// is rolled back, leaving the database unchanged.
///
/// - Throws: ``MigrationError/emptyMigrationScript(_:)`` if a script is empty.
/// - Throws: ``MigrationError/migrationFailed(_:_:)`` if execution or version
/// update fails.
/// - Throws: ``MigrationError/emptyMigrationScript(_:)`` if a migration script is empty.
/// - Throws: ``MigrationError/migrationFailed(_:_:)`` if a migration fails to execute or the
/// version update cannot be persisted.
public func migrate() throws(MigrationError<Version>) {
pthread_mutex_lock(&mutex)
defer { pthread_mutex_unlock(&mutex) }
#if os(Windows)
AcquireSRWLockExclusive(&mutex)
defer { ReleaseSRWLockExclusive(&mutex) }
#else
pthread_mutex_lock(&mutex)
defer { pthread_mutex_unlock(&mutex) }
#endif
do {
try service.perform(in: .exclusive) { connection in
try storage.prepare(connection)
let version = try storage.getVersion(connection)
let migrations = migrations
.filter { $0.version > version }
.sorted { $0.version < $1.version }
for migration in migrations {
let script = try migration.script
guard !script.isEmpty else {
throw MigrationError.emptyMigrationScript(migration)
try super.perform { connection in
do {
try connection.beginTransaction(.exclusive)
try migrate(with: connection)
try connection.commitTransaction()
} catch {
if !connection.isAutocommit {
try connection.rollbackTransaction()
}
do {
try connection.execute(sql: script)
} catch {
throw MigrationError.migrationFailed(migration, error)
}
}
if let version = migrations.last?.version {
try storage.setVersion(connection, version)
throw error
}
}
} catch let error as MigrationError<Version> {
@@ -139,3 +199,31 @@ public final class MigrationService<
}
}
}
// MARK: - Private
private extension MigrationService {
func migrate(with connection: ConnectionProtocol) throws {
try storage.prepare(connection)
let version = try storage.getVersion(connection)
let migrations = migrations
.filter { $0.version > version }
.sorted { $0.version < $1.version }
for migration in migrations {
let script = try migration.script
guard !script.isEmpty else {
throw MigrationError.emptyMigrationScript(migration)
}
do {
try connection.execute(sql: script)
} catch {
throw MigrationError.migrationFailed(migration, error)
}
}
if let version = migrations.last?.version {
try storage.setVersion(connection, version)
}
}
}