Skip to content

Commit

Permalink
Merge pull request #76 from keboola/adamvyborny-DBT-24-shared-buckets
Browse files Browse the repository at this point in the history
Support for shared buckets and removed testing of compounded PKs
  • Loading branch information
AdamVyborny authored Mar 30, 2023
2 parents 8710a3d + 806700d commit b219969
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ env:
SNOWFLAKE_PORT: 443
SNOWFLAKE_WAREHOUSE: "KEBOOLA_PROD_SMALL"
SNOWFLAKE_DATABASE: "SAPI_9317"
SNOWFLAKE_SCHEMA: "WORKSPACE_875822722"
SNOWFLAKE_USER: "SAPI_WORKSPACE_875822722"
SNOWFLAKE_SCHEMA: "WORKSPACE_964066099"
SNOWFLAKE_USER: "SAPI_WORKSPACE_964066099"
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}

# BigQuery
Expand Down
67 changes: 43 additions & 24 deletions src/DwhProvider/LocalSnowflakeProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use DbtTransformation\Service\DbtYamlCreateService\DbtProfilesYamlCreateService;
use DbtTransformation\Service\DbtYamlCreateService\DbtSourceYamlCreateService;
use Keboola\StorageApi\Client;
use Keboola\StorageApi\ClientException;
use Psr\Log\LoggerInterface;
use RuntimeException;

Expand All @@ -21,6 +22,8 @@ class LocalSnowflakeProvider implements DwhProviderInterface
protected Config $config;
protected string $projectPath;
protected LoggerInterface $logger;
/** @var array<int, int> */
protected array $projectIds = [];

public function __construct(
DbtSourceYamlCreateService $createSourceFileService,
Expand All @@ -42,35 +45,42 @@ public function __construct(
*/
public function createDbtYamlFiles(array $configurationNames = []): void
{
$this->createProfilesFileService->dumpYaml(
$this->projectPath,
$this->getOutputs($configurationNames, self::getDbtParams())
);
$this->setEnvVars();

if (!$this->config->generateSources()) {
return;
}

$client = new Client([
'url' => $this->config->getStorageApiUrl(),
'token' => $this->config->getStorageApiToken(),
]);

$inputTables = $this->config->getStorageInputTables();
$tables = $client->listTables();
$tablesData = [];
foreach ($tables as $table) {
if (empty($inputTables) || in_array($table['id'], $inputTables)) {
$tablesData[(string) $table['bucket']['id']][] = $table;
if ($this->config->generateSources()) {
$client = new Client([
'url' => $this->config->getStorageApiUrl(),
'token' => $this->config->getStorageApiToken(),
]);

$inputTables = $this->config->getStorageInputTables();
foreach ($client->listBuckets() as $bucket) {
$tables = $client->listTables($bucket['id']);
foreach ($tables as $table) {
if (empty($inputTables) || in_array($table['id'], $inputTables)) {
$tablesData[(string) $bucket['id']]['tables'][] = $table;
if (isset($bucket['sourceBucket']['project']['id'])) {
$sourceProjectId = $bucket['sourceBucket']['project']['id'];
$this->projectIds[$sourceProjectId] = $sourceProjectId;
$tablesData[(string) $bucket['id']]['projectId'] = $sourceProjectId;
}
}
}
}
}

$this->createSourceFileService->dumpYaml(
$this->createProfilesFileService->dumpYaml(
$this->projectPath,
$tablesData,
$this->config->getFreshness()
$this->getOutputs($configurationNames, self::getDbtParams(), $this->projectIds),
);
$this->setEnvVars();

if ($this->config->generateSources()) {
$this->createSourceFileService->dumpYaml(
$this->projectPath,
$tablesData,
$this->config->getFreshness()
);
}
}

protected function setEnvVars(): void
Expand All @@ -81,6 +91,10 @@ protected function setEnvVars(): void
putenv(sprintf('DBT_KBC_PROD_TYPE=%s', $workspace['type']));
putenv(sprintf('DBT_KBC_PROD_SCHEMA=%s', $workspace['schema']));
putenv(sprintf('DBT_KBC_PROD_DATABASE=%s', $workspace['database']));
foreach ($this->projectIds as $projectId) {
$stackPrefix = strtok($workspace['database'], '_');
putenv(sprintf('DBT_KBC_PROD_%d_DATABASE=%s_%d', $projectId, $stackPrefix, $projectId));
}
putenv(sprintf('DBT_KBC_PROD_WAREHOUSE=%s', $workspace['warehouse']));
$account = str_replace(self::STRING_TO_REMOVE_FROM_HOST, '', $workspace['host']);
putenv(sprintf('DBT_KBC_PROD_ACCOUNT=%s', $account));
Expand Down Expand Up @@ -125,13 +139,18 @@ public static function getDbtParams(): array
/**
* @param array<int, string> $configurationNames
* @param array<int, string> $dbtParams
* @param array<int, int> $projectIds
* @return array<string, array<string, string>>
*/
public static function getOutputs(array $configurationNames, array $dbtParams): array
public static function getOutputs(array $configurationNames, array $dbtParams, array $projectIds = []): array
{
$outputs = [];
if (empty($configurationNames)) {
$outputs['kbc_prod'] = self::getOutputDefinition('KBC_PROD', $dbtParams);
foreach ($projectIds as $projectId) {
$configurationName = sprintf('kbc_prod_%d', $projectId);
$outputs[$configurationName] = self::getOutputDefinition(strtoupper($configurationName), $dbtParams);
}
}

foreach ($configurationNames as $configurationName) {
Expand Down
14 changes: 8 additions & 6 deletions src/Service/DbtYamlCreateService/DbtSourceYamlCreateService.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
class DbtSourceYamlCreateService extends DbtYamlCreateService
{
/**
* @param array<string, array<int, array<string, mixed>>> $tablesData
* @param array<string, array{projectId?: mixed, tables: array<int, mixed>}> $tablesData
* @param array<string, array{period: string, count: int}> $freshness
*/
public function dumpYaml(
string $projectPath,
array $tablesData,
array $freshness,
string $dbEnvVarName = 'DBT_KBC_PROD_DATABASE'
array $freshness
): void {
$modelFolderPath = sprintf('%s/models/_sources/', $projectPath);
$this->createFolderIfNotExist($modelFolderPath);
Expand All @@ -31,10 +30,13 @@ public function dumpYaml(
[
'name' => $bucket,
'freshness' => $freshness,
'database' => sprintf('{{ env_var("%s") }}', $dbEnvVarName),
'database' => sprintf(
'{{ env_var("DBT_KBC_PROD%s_DATABASE") }}',
isset($tables['projectId']) ? ('_' . $tables['projectId']) : ''
),
'schema' => $bucket,
'loaded_at_field' => '"_timestamp"',
'tables' => array_map($this->formatTableSources(), $tables),
'tables' => array_map($this->formatTableSources(), $tables['tables']),
],
],
], 8)
Expand All @@ -54,7 +56,7 @@ protected function formatTableSources(): Closure
],
];

if (!empty($table['primaryKey'])) {
if (!empty($table['primaryKey']) && count($table['primaryKey']) === 1) {
$tables['columns'] = array_map(
static function ($primaryColumn) {
return [
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/run-action-with-debug-step/expected-stdout
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ Required dependencies:

Connection:
account: keboola
user: SAPI_WORKSPACE_875822722
user: SAPI_WORKSPACE_%s
database: SAPI_9317
schema: WORKSPACE_875822722
schema: WORKSPACE_%s
warehouse: KEBOOLA_PROD_SMALL
role: None
client_session_keep_alive: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ Concurrency: 1 threads (target='kbc_prod')
1 of 4 PASS source_not_null_in.c-test-bucket_test__id_ ......................... [PASS in %ss]
2 of 4 START test source_unique_in.c-test-bucket_test__id_ ..................... [RUN]
2 of 4 PASS source_unique_in.c-test-bucket_test__id_ ........................... [PASS in %ss]
3 of 4 START view model WORKSPACE_875822722.stg_model .......................... [RUN]
3 of 4 OK created view model WORKSPACE_875822722.stg_model ..................... [SUCCESS 1 in %ss]
4 of 4 START view model WORKSPACE_875822722.fct_model .......................... [RUN]
4 of 4 OK created view model WORKSPACE_875822722.fct_model ..................... [SUCCESS 1 in %ss]
3 of 4 START view model WORKSPACE_%s.stg_model .......................... [RUN]
3 of 4 OK created view model WORKSPACE_%s.stg_model ..................... [SUCCESS 1 in %ss]
4 of 4 START view model WORKSPACE_%s.fct_model .......................... [RUN]
4 of 4 OK created view model WORKSPACE_%s.fct_model ..................... [SUCCESS 1 in %ss]
Finished running 2 tests, 2 view models in %s hours %s minutes and %s seconds (%ss).
Completed successfully
Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ Running with dbt=%s
Partial parse save file not found. Starting full parse.
Found 2 models, 0 tests, 0 snapshots, 0 analyses, %s macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
Concurrency: 4 threads (target='kbc_prod')
1 of 2 START view model WORKSPACE_875822722.stg_model %s [RUN]
1 of 2 OK created view model WORKSPACE_875822722.stg_model %s [SUCCESS 1 in %ss]
2 of 2 START view model WORKSPACE_875822722.fct_model %s [RUN]
2 of 2 OK created view model WORKSPACE_875822722.fct_model %s [SUCCESS 1 in %ss]
1 of 2 START view model WORKSPACE_%s.stg_model %s [RUN]
1 of 2 OK created view model WORKSPACE_%s.stg_model %s [SUCCESS 1 in %ss]
2 of 2 START view model WORKSPACE_%s.fct_model %s [RUN]
2 of 2 OK created view model WORKSPACE_%s.fct_model %s [SUCCESS 1 in %ss]
Finished running 2 view models in %s hours %s minutes and %s seconds (%ss).
Completed successfully
Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"compiled":{"stg_model.sql":"with source as (\n \n select * from \"KEBOOLA_9457\".\"in.c-test-bucket\".\"test\"\n \n ),\n \n renamed as (\n \n select\n \"id\",\n \"col2\",\n \"col3\",\n \"col4\"\n from source\n \n )\n \n select * from renamed","fct_model.sql":"-- Use the `ref` function to select from other models\n\nselect *\nfrom \"KEBOOLA_9457\".\"WORKSPACE_26035590\".\"stg_model\"\nwhere \"id\" = 1"}}
{"compiled":{"fct_model.sql":"-- Use the `ref` function to select from other models\n\nselect *\nfrom \"KEBOOLA_%d\".\"WORKSPACE_%d\".\"stg_model\"\nwhere \"id\" = 1","stg_model.sql":"with source as (\n \n select * from \"KEBOOLA_%d\".\"in.c-test-bucket\".\"test\"\n \n ),\n \n renamed as (\n \n select\n \"id\",\n \"col2\",\n \"col3\",\n \"col4\"\n from source\n \n )\n \n select * from renamed"}}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"repository":{"url":"https:\/\/github.com\/keboola\/dbt-test-project-public.git","branches":[{"branch":"HEAD","comment":"Removed seed","sha":"695d129","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Tue Sep 27 20:48:36 2022 +0200\""},{"branch":"branch-with-bigquery-sources","comment":"bigquery 12","sha":"4f91c23","author":{"name":"MiroCillik","email":"<[email protected]>"},"date":"Mon Sep 12 12:42:27 2022 +0200\""},{"branch":"branch-with-deps","comment":"Update packages.yml","sha":"4c502f1","author":{"name":"Miroslav \u010cill\u00edk","email":"<[email protected]>"},"date":"Fri Sep 9 14:18:18 2022 +0200\""},{"branch":"branch-with-mssql-sources","comment":"Update src_my_source.yml","sha":"e35b650","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Thu Sep 15 13:46:25 2022 +0200\""},{"branch":"branch-with-postgres-sources","comment":"Update src_my_source.yml","sha":"810f243","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Mon Sep 12 09:13:20 2022 +0200\""},{"branch":"branch-with-redshift-sources","comment":"Update stg_model.sql","sha":"62ebbf6","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Mon Sep 19 08:18:46 2022 +0200\""},{"branch":"branch-with-seed","comment":"Add seed","sha":"4d21bb0","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Tue Sep 27 20:49:07 2022 +0200\""},{"branch":"branch-with-sources","comment":"Fix bucket name","sha":"fe39bd0","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Tue Jun 28 08:50:26 2022 +0200\""},{"branch":"main","comment":"Removed seed","sha":"695d129","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Tue Sep 27 20:48:36 2022 +0200\""},{"branch":"missing-dbt-project-yml","comment":"Deleted dbt_project.yml for test","sha":"95b91fd","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Tue Feb 15 20:53:43 2022 +0100\""},{"branch":"ujovlado-license-and-readme","comment":"Add license","sha":"eea2e13","author":{"name":"Vladim\u00edr Kri\u0161ka","email":"<[email protected]>"},"date":"Tue May 31 00:06:01 2022 +0200\""}]}}
{"repository":{"url":"https:\/\/github.com\/keboola\/dbt-test-project-public.git","branches":[{"branch":"HEAD","comment":"Removed seed","sha":"695d129","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Tue Sep 27 20:48:36 2022 +0200\""},{"branch":"branch-with-bigquery-sources","comment":"bigquery 12","sha":"4f91c23","author":{"name":"MiroCillik","email":"<[email protected]>"},"date":"Mon Sep 12 12:42:27 2022 +0200\""},{"branch":"branch-with-deps","comment":"Update packages.yml","sha":"4c502f1","author":{"name":"Miroslav \u010cill\u00edk","email":"<[email protected]>"},"date":"Fri Sep 9 14:18:18 2022 +0200\""},{"branch":"branch-with-mssql-sources","comment":"Update src_my_source.yml","sha":"e35b650","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Thu Sep 15 13:46:25 2022 +0200\""},{"branch":"branch-with-postgres-sources","comment":"Update src_my_source.yml","sha":"810f243","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Mon Sep 12 09:13:20 2022 +0200\""},{"branch":"branch-with-redshift-sources","comment":"Update stg_model.sql","sha":"62ebbf6","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Mon Sep 19 08:18:46 2022 +0200\""},{"branch":"branch-with-seed","comment":"Update countries.csv","sha":"ef36ebb","author":{"name":"Adam V\u00fdborn\u00fd","email":"<[email protected]>"},"date":"Thu Mar 30 10:59:11 2023 +0200\""},{"branch":"branch-with-sources","comment":"Fix bucket name","sha":"fe39bd0","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Tue Jun 28 08:50:26 2022 +0200\""},{"branch":"main","comment":"Removed seed","sha":"695d129","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Tue Sep 27 20:48:36 2022 +0200\""},{"branch":"missing-dbt-project-yml","comment":"Deleted dbt_project.yml for test","sha":"95b91fd","author":{"name":"AdamVyborny","email":"<[email protected]>"},"date":"Tue Feb 15 20:53:43 2022 +0100\""},{"branch":"ujovlado-license-and-readme","comment":"Add license","sha":"eea2e13","author":{"name":"Vladim\u00edr Kri\u0161ka","email":"<[email protected]>"},"date":"Tue May 31 00:06:01 2022 +0200\""}]}}
20 changes: 10 additions & 10 deletions tests/phpunit/Helper/DbtCompileHelperTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,27 @@ public function testGetCompiledSqlFiles(): void
self::assertArrayHasKey('fct_model.sql', $compiled);
self::assertArrayHasKey('stg_model.sql', $compiled);

self::assertStringContainsString(
'from "SAPI_9317"."in.c-test-bucket"."test"',
self::assertStringMatchesFormat(
'%Afrom "SAPI_%d"."in.c-test-bucket"."test"%A',
(string) $compiled['source_not_null_in.c-test-bucket_test__id_.sql']
);

self::assertStringContainsString(
'"id" as unique_field,',
self::assertStringMatchesFormat(
'%A"id" as unique_field,%A',
(string) $compiled['source_unique_in.c-test-bucket_test__id_.sql']
);
self::assertStringContainsString(
'from "SAPI_9317"."in.c-test-bucket"."test"',
self::assertStringMatchesFormat(
'%Afrom "SAPI_%d"."in.c-test-bucket"."test"%A',
(string) $compiled['source_unique_in.c-test-bucket_test__id_.sql']
);

self::assertStringContainsString(
'from "SAPI_9317"."WORKSPACE_875822722"."stg_model"',
self::assertStringMatchesFormat(
'%Afrom "SAPI_%d"."WORKSPACE_%d"."stg_model"%A',
(string) $compiled['fct_model.sql']
);

self::assertStringContainsString(
'select * from "SAPI_9317"."in.c-test-bucket"."test"',
self::assertStringMatchesFormat(
'%Aselect * from "SAPI_%d"."in.c-test-bucket"."test"%A',
(string) $compiled['stg_model.sql']
);
}
Expand Down
11 changes: 7 additions & 4 deletions tests/phpunit/Service/DbtYamlCreateService/DbtYamlCreateTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ public function testCreateSourceYaml(): void
$service = new DbtSourceYamlCreateService();

$tablesData = [
'bucket-1' => [['name' => 'table1', 'primaryKey' => ['id']]],
'bucket-2' => [
'bucket-1' => ['tables' => [['name' => 'table1', 'primaryKey' => ['id']]]],
'bucket-2' => ['tables' => [
['name' => 'table2', 'primaryKey' => ['vatId']],
['name' => 'table3', 'primaryKey' => []],
],
['name' => 'tableWithCompoundPrimaryKey', 'primaryKey' => ['id', 'vatId']],
]],
'linked-bucket' => ['tables' => [
['name' => 'linkedTable', 'primaryKey' => []],
], 'projectId' => '9090'],
];

$freshness = [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Use the `ref` function to select from other models

select *
from "SAPI_9317"."WORKSPACE_875822722"."stg_model"
from "SAPI_9317"."WORKSPACE_964066099"."stg_model"
where "id" = 1
2 changes: 1 addition & 1 deletion tests/phpunit/data/models/_sources/bucket-2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ sources:
- unique
- not_null
-
name: table3
name: tableWithCompoundPrimaryKey
quoting:
database: true
schema: true
Expand Down
21 changes: 21 additions & 0 deletions tests/phpunit/data/models/_sources/linked-bucket.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version: 2
sources:
-
name: linked-bucket
freshness:
warn_after:
count: 1
period: hour
error_after:
count: 1
period: day
database: '{{ env_var("DBT_KBC_PROD_9090_DATABASE") }}'
schema: linked-bucket
loaded_at_field: '"_timestamp"'
tables:
-
name: linkedTable
quoting:
database: true
schema: true
identifier: true

0 comments on commit b219969

Please sign in to comment.