Skip to content

Commit

Permalink
Feature/add update loader (#104)
Browse files Browse the repository at this point in the history
* Add an abstract DB loader and refacto common code between Insert and InsertUpdate loaders

* Add a Update loader
  • Loading branch information
ecourtial authored Feb 6, 2022
1 parent 3a25000 commit d71a743
Show file tree
Hide file tree
Showing 9 changed files with 487 additions and 192 deletions.
5 changes: 4 additions & 1 deletion changelog.MD
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Changelog

## New version
## 2.1

**New features**
* Added a new _Update_ loader in order to perform update operations while ignoring new entries.

**Miscellaneous**
* Added a strict mode to the step _options_ method. Default value is _false_.
Expand Down
4 changes: 2 additions & 2 deletions docs/Loaders/InsertUpdate.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Insert/Update Loader

Inserts data into a database table.
Inserts and/or updates data into a database table.

```php
/** @var \Wizaplace\Etl\Loaders\Insert $insertUpdate */
/** @var \Wizaplace\Etl\Loaders\InsertUpdate $insertUpdate */
$etl->load($insertUpdate, 'table_name', $options);
```

Expand Down
1 change: 1 addition & 0 deletions docs/Loaders/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ $etl->load($type, $destination, $options);

- [Insert](Insert.md)
- [Insert/Update](InsertUpdate.md)
- [Update](Update.md)
- [MemoryLoader](MemoryLoader.md)
109 changes: 109 additions & 0 deletions docs/Loaders/Update.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Update Loader

Updates existing data into a database table.

```php
/** @var \Wizaplace\Etl\Loaders\Update $update */
$etl->load($update, 'table_name', $options);
```

## Options

### Columns

Columns that will be loaded. If `null`, all columns in the process will be inserted/updated.

| Type | Default value |
| ----- | ------------- |
| array | `null` |

To select which columns will be loaded, use an array with the columns list:

```php
$options = [Update::COLUMNS => ['id', 'name', 'email']];
```

To map columns from the etl process to the database table, use an associative array where the `key` is the name of the process column and the `value` is the table column:

```php
$options = [Update::COLUMNS => [
'id' => 'user_id',
'name' => 'full_name',
]];
```

### Connection

Name of the database connection to use.

| Type | Default value |
| ------ | ------------- |
| string | default |

```php
$options = [Update::CONNECTION => 'app'];
```

### Key

List of primary keys or identifiers of the table.

| Type | Default value |
| ----- | ------------- |
| array | `['id']` |

```php
$options = [Update::KEY => ['id', Update::TYPE]];
```

### Timestamps

Populates the `created_at` and/or `updated_at` columns with the current timestamp when inserting or updating a row.

| Type | Default value |
| ------- | ------------- |
| boolean | `false` |

```php
$options = [Update::TIMESTAMPS => true];
```

### Transaction

Indicates if the loader will perform database transactions.

If run in a single transaction, treat the ETL process as a single atomic transaction and roll back on errors. If
run in multiple transactions, the best we can do is provide durability by trying to commit any inserts that are
accepted by the destination database.

| Type | Default value |
| ------- | ------------- |
| boolean | `true` |

```php
$options = [Update::TRANSACTION => false];
```

### Commit Size

Transaction commit size. The transaction option must be enabled.

The work in done in a single transaction if commit size is zero, and we want to roll back that transaction if an
insert fails. In that manner, the ETL process becomes ACID in that either all the inserts are committed or none
are. If the ETL process fails, we can replay the entire source after fixing the error.

If the work is done in multiple transactions, however, some transactions may have already been committed. The
inserts from later pending transactions therefore are not atomic or durable in the sense that the pipeline
can fail and inserts that are accepted still need to be committed. This would leave the database in a state
where it is difficult to determine which inserts have been accepted and which have not. Therefore, we try to
commit the pending transaction so any rows that have been reported as inserted will be durable in the database.
In terms of ACID properties of the destination database, since committing multiple transactions implies the
ETL process is not atomic, at least we can be durable.

| Type | Default value |
| ---- | ------------- |
| int | 100 |

```php
$options = [Update::COMMIT_SIZE => 500];
```
93 changes: 93 additions & 0 deletions src/Loaders/AbstractDbLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?php

/**
* @author Wizacha DevTeam <[email protected]>
* @copyright Copyright (c) Wizacha
* @copyright Copyright (c) Leonardo Marquine
* @license MIT
*/

declare(strict_types=1);

namespace Wizaplace\Etl\Loaders;

use Wizaplace\Etl\Database\Manager;
use Wizaplace\Etl\Database\Transaction;

abstract class AbstractDbLoader extends Loader
{
/**
* The database manager.
*/
protected Manager $db;

/**
* Time for timestamps columns.
*/
protected string $time;

/**
* Indicates if the table has timestamps columns.
*/
protected bool $timestamps = false;

/**
* Indicates if the loader will perform transactions.
*/
protected bool $transaction = true;

/**
* The database transaction manager.
*/
protected Transaction $transactionManager;

/**
* The connection name.
*/
protected string $connection = 'default';

/**
* Transaction commit size.
*/
protected int $commitSize = 0;

/**
* The columns to insert.
*
* @var string[]
*/
protected array $columns = [];

/**
* Create a new Insert Loader instance.
*/
public function __construct(Manager $manager)
{
$this->db = $manager;
}

public function initialize(): void
{
if ($this->timestamps) {
$this->time = date('Y-m-d G:i:s');
}

if ($this->transaction) {
$this->transactionManager = $this->db->transaction($this->connection)->size($this->commitSize);
}

if ([] !== $this->columns && array_keys($this->columns) === range(0, count($this->columns) - 1)) {
$this->columns = array_combine($this->columns, $this->columns);
}
}

/**
* Finalize the step.
*/
public function finalize(): void
{
if ($this->transaction) {
$this->transactionManager->close();
}
}
}
79 changes: 1 addition & 78 deletions src/Loaders/Insert.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,64 +11,20 @@

namespace Wizaplace\Etl\Loaders;

use Wizaplace\Etl\Database\Manager;
use Wizaplace\Etl\Database\Transaction;
use Wizaplace\Etl\Row;

class Insert extends Loader
class Insert extends AbstractDbLoader
{
public const CONNECTION = 'connection';
public const TIMESTAMPS = 'timestamps';
public const TRANSACTION = 'transaction';
public const COMMIT_SIZE = 'commitSize';

/**
* The connection name.
*/
protected string $connection = 'default';

/**
* The columns to insert.
*
* @var string[]
*/
protected array $columns = [];

/**
* Indicates if the table has timestamps columns.
*/
protected bool $timestamps = false;

/**
* Indicates if the loader will perform transactions.
*/
protected bool $transaction = true;

/**
* Transaction commit size.
*/
protected int $commitSize = 0;

/**
* Time for timestamps columns.
*/
protected string $time;

/**
* The insert statement.
*/
protected \PDOStatement $insert;

/**
* The database transaction manager.
*/
protected Transaction $transactionManager;

/**
* The database manager.
*/
protected Manager $db;

/**
* Properties that can be set via the options method.
*
Expand All @@ -82,29 +38,6 @@ class Insert extends Loader
self::COMMIT_SIZE,
];

/**
* Create a new Insert Loader instance.
*/
public function __construct(Manager $manager)
{
$this->db = $manager;
}

public function initialize(): void
{
if ($this->timestamps) {
$this->time = date('Y-m-d G:i:s');
}

if ($this->transaction) {
$this->transactionManager = $this->db->transaction($this->connection)->size($this->commitSize);
}

if ([] !== $this->columns && array_keys($this->columns) === range(0, count($this->columns) - 1)) {
$this->columns = array_combine($this->columns, $this->columns);
}
}

/**
* Load the given row.
*/
Expand All @@ -121,16 +54,6 @@ public function load(Row $row): void
}
}

/**
* Finalize the step.
*/
public function finalize(): void
{
if ($this->transaction) {
$this->transactionManager->close();
}
}

/**
* Prepare the insert statement.
*/
Expand Down
Loading

0 comments on commit d71a743

Please sign in to comment.