Skip to content

Commit

Permalink
Merge pull request #25 from keboola/adamvyborny-PST-1808
Browse files Browse the repository at this point in the history
New manifest support
AdamVyborny authored Jul 31, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 58f5aea + 6f09222 commit 0ca2e8b
Showing 210 changed files with 101,073 additions and 101,073 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@
"ext-mongodb": "*",
"keboola/csv": "^4.0",
"keboola/csvmap": "^2.2",
"keboola/php-component": "^9.0",
"keboola/php-component": "^10.1",
"keboola/retry": "^0.5.0",
"keboola/ssh-tunnel": "^2.0",
"league/uri": "^6.2",
1,394 changes: 649 additions & 745 deletions composer.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3'
services:
app: &main
build: .
@@ -18,6 +17,7 @@ services:
- dns.local
environment:
- GODEBUG="x509ignoreCN=0"
- KBC_DATA_TYPE_SUPPORT=none
entrypoint:
- sh
- -c
2 changes: 1 addition & 1 deletion src/Component.php
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ public function __construct(LoggerInterface $logger)
$exportCommandFactory,
$config,
$this->getLogger(),
$this->getInputState()
$this->getInputState(),
);
}

2 changes: 1 addition & 1 deletion src/Config/ConfigRowDefinition.php
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ protected function getParametersDefinition(): ArrayNodeDefinition
$parametersNode->validate()->always(function (array $v) {
if (isset($v['query'], $v['incrementalFetchingColumn']) && $v['query'] !== '') {
throw new InvalidConfigurationException(
'Both incremental fetching and query cannot be set together.'
'Both incremental fetching and query cannot be set together.',
);
}
if (isset($v['sort'], $v['incrementalFetchingColumn']) && $v['sort'] !== '') {
12 changes: 6 additions & 6 deletions src/Config/DbNode.php
Original file line number Diff line number Diff line change
@@ -31,14 +31,14 @@ public function __construct()
// Validation for "custom_uri" protocol
if (!isset($v['uri'])) {
throw new InvalidConfigurationException(
'The child node "uri" at path "parameters.db" must be configured.'
'The child node "uri" at path "parameters.db" must be configured.',
);
}

// SSH tunnel cannot be used with custom URI
if ($sshTunnelEnabled) {
throw new InvalidConfigurationException(
'Custom URI is not compatible with SSH tunnel support.'
'Custom URI is not compatible with SSH tunnel support.',
);
}

@@ -47,29 +47,29 @@ public function __construct()
if (isset($v[$key])) {
throw new InvalidConfigurationException(sprintf(
'Configuration node "db.%s" is not compatible with custom URI.',
$key
$key,
));
}
}
} else {
// Validation for "mongodb" or "mongodb+srv" protocol
if (!isset($v['host'])) {
throw new InvalidConfigurationException(
'The child node "host" at path "parameters.db" must be configured.'
'The child node "host" at path "parameters.db" must be configured.',
);
}

if (!isset($v['database'])) {
throw new InvalidConfigurationException(
'The child node "database" at path "parameters.db" must be configured.'
'The child node "database" at path "parameters.db" must be configured.',
);
}

// Validate auth options: both or none
if (isset($v['user']) xor isset($v['password'])) {
throw new InvalidConfigurationException(
'When passing authentication details,' .
' both "user" and "password" params are required'
' both "user" and "password" params are required',
);
}
}
2 changes: 1 addition & 1 deletion src/Config/OldConfigDefinition.php
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ protected function getParametersDefinition(): ArrayNodeDefinition
->always(function ($v) {
if (isset($v['query'], $v['incrementalFetchingColumn']) && $v['query'] !== '') {
throw new InvalidConfigurationException(
'Both incremental fetching and query cannot be set together.'
'Both incremental fetching and query cannot be set together.',
);
}
if (isset($v['sort'], $v['incrementalFetchingColumn']) && $v['sort'] !== '') {
20 changes: 10 additions & 10 deletions src/Export.php
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ public function __construct(
private ExportCommandFactory $exportCommandFactory,
private array $connectionOptions,
private ExportOptions $exportOptions,
private LoggerInterface $logger
private LoggerInterface $logger,
) {
$this->name = Strings::webalize($exportOptions->getName());
$this->retryProxy = new RetryProxy($this->getRetryPolicy(), new ExponentialBackOffPolicy());
@@ -59,7 +59,7 @@ public function export(): Generator

$this->logger->info(sprintf(
'Connected to %s',
$this->uriFactory->create($options)->getConnectionString()
$this->uriFactory->create($options)->getConnectionString(),
));
$this->logger->info(sprintf('Exporting "%s"', $this->name));

@@ -102,7 +102,7 @@ private function processJsonLine(string $line): Generator
} catch (NotEncodableValueException $e) {
$this->logger->warning(sprintf(
'Could not decode JSON: %s...',
substr($line, 0, 80)
substr($line, 0, 80),
));
}
}
@@ -118,7 +118,7 @@ protected function handleMongoExportFails(Throwable $e): void
'Export "%s" failed. Timeout occurred while waiting for data. ' .
'Please check your query. Problem can be a typo in the field name or missing index.' .
'In these cases, the full scan is made and it can take too long.',
$this->name
$this->name,
));
}

@@ -150,7 +150,7 @@ protected function handleMongoExportFails(Throwable $e): void
if (preg_match('/query \'\\[[^\\]]*\\]\' is not valid JSON/i', $e->getMessage())) {
throw new UserException(sprintf(
'Export "%s" failed. Query "' . $this->exportOptions->getQuery() . '" is not valid JSON',
$this->name
$this->name,
));
}

@@ -159,7 +159,7 @@ protected function handleMongoExportFails(Throwable $e): void

public static function buildIncrementalFetchingParams(
ExportOptions $exportOptions,
string|int|float|null $inputState
string|int|float|null $inputState,
): ExportOptions {
$query = (object) [];
if (!is_null($inputState)) {
@@ -223,15 +223,15 @@ public function getLastFetchedValue(): mixed
if (count($incrementalFetchingColumn) > 1) {
$fullPathColumnMessage = sprintf(
' ("%s")',
$this->exportOptions->getIncrementalFetchingColumn()
$this->exportOptions->getIncrementalFetchingColumn(),
);
}
throw new UserException(
sprintf(
'Column "%s"%s does not exists.',
$item,
$fullPathColumnMessage
)
$fullPathColumnMessage,
),
);
}
$data = $data[$item];
@@ -240,7 +240,7 @@ public function getLastFetchedValue(): mixed
if (is_array($data)) {
throw new UserException(sprintf(
'Unexpected value "%s" in output of incremental fetching.',
json_encode($data)
json_encode($data),
));
}

10 changes: 5 additions & 5 deletions src/ExportHelper.php
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ public static function convertDatesToString(string $input, bool $isoDate = false
function (array $m) use ($isoDate): string {
return $isoDate ? '"ISODate(' . addslashes($m[1]) .')"' : $m[1];
},
$input
$input,
);

if ($output === null) {
@@ -56,7 +56,7 @@ public static function convertObjectIdToString(string $input): string
function (array $m): string {
return '"ObjectId(' . addslashes($m[1]) .')"';
},
$input
$input,
);

if ($output === null) {
@@ -73,7 +73,7 @@ public static function convertStringIdToObjectId(string $input): string
static function (array $m): string {
return str_replace($m[1], '{"$oid": "' . $m[2] . '"}', $m[0]);
},
$input
$input,
);

if ($output === null) {
@@ -90,7 +90,7 @@ public static function fixIsoDateInGteQuery(string $input): string
function (array $m): string {
return '"$gte":{"$date": ' . stripslashes($m[1]) . '}';
},
$input
$input,
);

if ($output === null) {
@@ -107,7 +107,7 @@ public static function fixObjectIdInGteQuery(string $input): string
function (array $m): string {
return '"$gte":{"$oid": ' . stripslashes($m[1]) . '}';
},
$input
$input,
);

if ($output === null) {
15 changes: 13 additions & 2 deletions src/Extractor.php
Original file line number Diff line number Diff line change
@@ -140,12 +140,23 @@ private function createSshTunnel(array $sshOptions): void
}

/**
* @param array<string, array{path: string, primaryKey: array<int, string>|string|null}> $manifestsData
* @param array<string, array{
* path: string,
* primaryKey: array<int, string>,
* columns: array<int, string>
* }> $manifestsData
*/
protected function generateManifests(array $manifestsData, ExportOptions $exportOptions): void
{
foreach ($manifestsData as $manifestData) {
(new Manifest($exportOptions, $manifestData['path'], $manifestData['primaryKey']))->generate();
(new Manifest(
$this->config->getDataTypeSupport(),
$exportOptions->isIncrementalFetching(),
$manifestData['path'],
$manifestData['primaryKey'],
$manifestData['columns'],
)
)->generate();
}
}

53 changes: 32 additions & 21 deletions src/Manifest.php
Original file line number Diff line number Diff line change
@@ -4,39 +4,50 @@

namespace MongoExtractor;

use MongoExtractor\Config\ExportOptions;
use Symfony\Component\Filesystem\Filesystem;
use Symfony\Component\Serializer\Encoder\JsonEncode;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Keboola\Component\Config\DatatypeSupport;
use Keboola\Component\Manifest\ManifestManager;
use Keboola\Component\Manifest\ManifestManager\Options\OutTable\ManifestOptions;
use Keboola\Component\Manifest\ManifestManager\Options\OutTable\ManifestOptionsSchema;

class Manifest
{
protected Filesystem $fs;
private JsonEncode $jsonEncode;
private ManifestManager $manifestManager;

/**
* @param array<int, string>|string|null $primaryKey
* @param array<int, string> $primaryKey
* @param array<int, string> $columns
*/
public function __construct(
private ExportOptions $exportOptions,
private string $path,
private array|string|null $primaryKey
private readonly DatatypeSupport $datatypeSupport,
private readonly bool $isIncrementalFetching,
private readonly string $path,
private readonly array $primaryKey,
private readonly array $columns,
) {
$this->fs = new Filesystem();
$this->jsonEncode = new JsonEncode;
$directoryPath = str_replace('/out/tables', '', pathinfo($this->path, PATHINFO_DIRNAME));
$this->manifestManager = new ManifestManager($directoryPath);
}

/**
* @throws \Keboola\Component\Manifest\ManifestManager\Options\OptionsValidationException
*/
public function generate(): void
{
$manifest = [
'primary_key' => $this->primaryKey,
'incremental' => $this->exportOptions->isIncrementalFetching(),

];

$this->fs->dumpFile(
$this->path,
$this->jsonEncode->encode($manifest, JsonEncoder::FORMAT)
$manifest = new ManifestOptions();
$manifest->setIncremental($this->isIncrementalFetching);
foreach ($this->columns as $column) {
$manifest->addSchema(new ManifestOptionsSchema(
$column,
null,
true,
in_array($column, $this->primaryKey, true),
));
}

$this->manifestManager->writeTableManifest(
pathinfo($this->path, PATHINFO_FILENAME),
$manifest,
$this->datatypeSupport->usingLegacyManifest(),
);
}
}
4 changes: 2 additions & 2 deletions src/Parse.php
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ public function __construct(

/**
* Parses exported json and creates .csv and .manifest files
* @return array<string, array{path: string, primaryKey: array<int, string>|string}>
* @return array<string, array{path: string, primaryKey: array<int, string>, columns: array<int, string>}>
* @throws \Keboola\Csv\Exception
* @throws \Keboola\Csv\InvalidArgumentException
*/
@@ -64,7 +64,7 @@ public function parse(Generator $exportOutput): array
'Done "%s", parsed %d %s in total',
$this->name,
$parsedDocumentsCount,
$parsedDocumentsCount === 1 ? 'record' : 'records'
$parsedDocumentsCount === 1 ? 'record' : 'records',
));

return $parser->getManifestData();
Loading

0 comments on commit 0ca2e8b

Please sign in to comment.