Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update import script for trackdechets import + upsert domaine … #455

Merged
merged 2 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/end-to-end.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ jobs:
MCP_SCOPES: openid email profile phone organizations
STYLESHEET_URL: ""
redis:
image: redis
image: redis:7.0
ports:
- 6379:6379
postgres:
image: postgres:12.12
image: postgres:14.1
env:
POSTGRES_USER: ${{ env.PGUSER }}
POSTGRES_PASSWORD: ${{ env.PGPASSWORD }}
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.8"

services:
db:
image: postgres:latest
image: postgres:14.1
ports:
- "5432:5432"
environment:
Expand All @@ -13,7 +13,7 @@ services:
- db-data:/var/lib/postgresql/data

redis:
image: redis:latest
image: redis:7.0
ports:
- "6379:6379"

Expand Down
142 changes: 116 additions & 26 deletions scripts/import-domains.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { isEmpty, toInteger } from "lodash";
import fs from "fs";
import {
addAuthorizedDomain,
addTrackdechetsDomain,
upsert,
} from "../src/repositories/organization/setters";
import {
Expand All @@ -24,6 +24,11 @@ import {
} from "../src/services/script-helpers";
import { AxiosError } from "axios";
import { isAFreeEmailProvider } from "../src/services/uses-a-free-email-provider";
import {
isServicePublic,
isWasteManagementOrganization,
} from "../src/services/organization";
import { isDomainValid, isSiretValid } from "../src/services/security";

const { parse, transform, stringify } = require("csv");

Expand Down Expand Up @@ -51,13 +56,22 @@ const maxInseeCallRateInMs = rateInMsFromArgs !== 0 ? rateInMsFromArgs : 125;
header: true,
});

type CsvData = { siren: string; declarant: string; last_declared_at: string };
type InputCsvData = { domain: string; siret: string };
type OutputCsvData = { domain: string; siret: string; result: string };
const input_file_lines = await getNumberOfLineInFile(INPUT_FILE);
let i = 1;
let rejected_invalid_domain_count = 0;
let rejected_invalid_siret_count = 0;
let rejected_free_email_domain_count = 0;
let rejected_empty_org_count = 0;
let rejected_service_public_count = 0;
let rejected_waste_management_organizations = 0;
let unexpected_error_count = 0;
let success_count = 0;

// 50ms is an estimated additional delay from insee API
const estimatedExecutionTimeInMilliseconds =
Math.max(maxInseeCallRateInMs, 320) * input_file_lines;
(maxInseeCallRateInMs + 50) * input_file_lines;

logger.info("");
logger.info(
Expand All @@ -71,42 +85,72 @@ const maxInseeCallRateInMs = rateInMsFromArgs !== 0 ? rateInMsFromArgs : 125;

const transformStream = transform(
async function (
data: CsvData,
done: (err: null | Error, data: null | CsvData) => void,
data: InputCsvData,
done: (err: null | Error, data: OutputCsvData) => void,
) {
const start = startDurationMesure();
try {
const siren = data.siren;
const domain = data.declarant.split("@").pop()!;
logger.info(`${i}: processing ${siren} <> ${domain}...`);
const siret = data.siret;
const domain = data.domain;
logger.info(`${i}: processing ${siret} <> ${domain}...`);
// 0. checks params
if (!isDomainValid(domain)) {
i++;
rejected_invalid_domain_count++;
return done(null, { ...data, result: "rejected_invalid_domain" });
}
if (!isSiretValid(siret)) {
i++;
rejected_invalid_siret_count++;
return done(null, { ...data, result: "rejected_invalid_siret" });
}

// 1. get organizationInfo
const organizationInfo = await getOrganizationInfo(siren, access_token);
// 1. checks for free email providers
if (isAFreeEmailProvider(domain)) {
i++;
rejected_free_email_domain_count++;
return done(null, { ...data, result: "rejected_free_email_domain" });
}

// 2. get organizationInfo
const organizationInfo = await getOrganizationInfo(siret, access_token);
if (!isOrganizationInfo(organizationInfo)) {
throw new Error("empty organizationInfo");
await throttleApiCall(start, maxInseeCallRateInMs);
i++;
rejected_empty_org_count++;
return done(null, { ...data, result: "rejected_empty_org" });
}

// 2. update organizationInfo
// 3. update organizationInfo
const organization: Organization = await upsert({
siret: organizationInfo.siret,
organizationInfo,
});

// 3. check for free email providers
if (isAFreeEmailProvider(domain)) {
throw new Error("free email provider");
// 4. discard public services
if (isServicePublic(organization)) {
await throttleApiCall(start, maxInseeCallRateInMs);
i++;
rejected_service_public_count++;
return done(null, { ...data, result: "rejected_service_public" });
}

// 4. check if domain exists
if (!organization.authorized_email_domains.includes(domain)) {
await addAuthorizedDomain({ siret: organization.siret, domain });
} else {
logger.info("\x1b[31m", `domain already in database`, "\x1b[0m");
// 5. discard waste management organizations
if (isWasteManagementOrganization(organization)) {
await throttleApiCall(start, maxInseeCallRateInMs);
i++;
rejected_waste_management_organizations++;
return done(null, {
...data,
result: "rejected_waste_management_organizations",
});
}

await addTrackdechetsDomain({ siret, domain });
await throttleApiCall(start, maxInseeCallRateInMs);
i++;
return done(null, null);
success_count++;
return done(null, { ...data, result: "success" });
} catch (error) {
logger.info(
"\x1b[31m",
Expand All @@ -119,14 +163,60 @@ const maxInseeCallRateInMs = rateInMsFromArgs !== 0 ? rateInMsFromArgs : 125;

await throttleApiCall(start, maxInseeCallRateInMs);
i++;
// we put this in an output for future attempts
return done(null, data);
unexpected_error_count++;
return done(null, { ...data, result: "unexpected_error" });
}
},
{ parallel: 1 }, // avoid messing with line orders
).on("end", () =>
logger.info(`Import done! failed inputs are recorded in ${OUTPUT_FILE}.`),
);
).on("end", () => {
logger.info(`Import done! Import logs are recorded in ${OUTPUT_FILE}.`);
logger.info("");
logger.info(
"success_count: \x1b[32m",
success_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_domain_count: \x1b[33m",
rejected_invalid_domain_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_siret_count: \x1b[33m",
rejected_invalid_siret_count,
"\x1b[0m",
);
logger.info(
"rejected_free_email_domain_count: \x1b[33m",
rejected_free_email_domain_count,
"\x1b[0m",
);
logger.info(
"rejected_empty_org_count: \x1b[33m",
rejected_empty_org_count,
"\x1b[0m",
);
logger.info(
"rejected_service_public_count: \x1b[33m",
rejected_service_public_count,
"\x1b[0m",
);
logger.info(
"rejected_waste_management_organizations:\x1b[33m",
rejected_waste_management_organizations,
"\x1b[0m",
);
logger.info(
"unexpected_error_count: \x1b[31m",
unexpected_error_count,
"\x1b[0m",
);
logger.info(
"total: \x1b[1m",
i - 1,
"\x1b[21m",
);
});

logger.info(`Importing data from ${INPUT_FILE}`);

Expand Down
31 changes: 26 additions & 5 deletions src/repositories/organization/setters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,24 @@ RETURNING *

return rows.shift()!;
};
const addDomain = async ({
const upsertDomain = async ({
siret,
domain,
listName,
}: {
siret: string;
domain: string;
listName: "verified_email_domains" | "authorized_email_domains";
listName:
| "verified_email_domains"
| "authorized_email_domains"
| "trackdechets_email_domains";
}) => {
const connection = getDatabaseConnection();

const { rows }: QueryResult<Organization> = await connection.query(
`
UPDATE organizations
SET ${listName} = array_append(${listName}, $2)
SET ${listName} = ARRAY(SELECT DISTINCT UNNEST(${listName} || ARRAY[$2]))
, updated_at = $3
WHERE siret = $1
RETURNING *
Expand All @@ -163,7 +166,7 @@ export const addAuthorizedDomain = async ({
siret: string;
domain: string;
}) => {
return await addDomain({
return await upsertDomain({
siret,
domain,
listName: "authorized_email_domains",
Expand All @@ -177,7 +180,25 @@ export const addVerifiedDomain = async ({
siret: string;
domain: string;
}) => {
return await addDomain({ siret, domain, listName: "verified_email_domains" });
return await upsertDomain({
siret,
domain,
listName: "verified_email_domains",
});
};

export const addTrackdechetsDomain = async ({
siret,
domain,
}: {
siret: string;
domain: string;
}) => {
return await upsertDomain({
siret,
domain,
listName: "trackdechets_email_domains",
});
};

export const linkUserToOrganization = async ({
Expand Down
18 changes: 18 additions & 0 deletions src/services/organization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ export const hasLessThanFiftyEmployees = ({
);
};

export const isWasteManagementOrganization = ({
cached_libelle_activite_principale,
}: Organization): boolean => {
if (!cached_libelle_activite_principale) {
return false;
}

return [
"38.11Z - Collecte des déchets non dangereux",
"38.12Z - Collecte des déchets dangereux",
"38.21Z - Traitement et élimination des déchets non dangereux",
"38.22Z - Traitement et élimination des déchets dangereux",
"38.31Z - Démantèlement d’épaves",
"38.32Z - Récupération de déchets triés",
"39.00Z - Dépollution et autres services de gestion des déchets",
].includes(cached_libelle_activite_principale);
};

export const isEtablissementScolaireDuPremierEtSecondDegre = ({
cached_libelle_activite_principale,
cached_libelle_categorie_juridique,
Expand Down
19 changes: 16 additions & 3 deletions test/organization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import {
isEntrepriseUnipersonnelle,
isEtablissementScolaireDuPremierEtSecondDegre,
isServicePublic,
isWasteManagementOrganization,
} from "../src/services/organization";
import { describe } from "node:test";

const association_org_info: Organization = {
siret: "83511518900010",
Expand Down Expand Up @@ -111,7 +111,7 @@ describe("isCommune", () => {
});
});

const track_dechet_public_org_info: Organization = {
const trackdechets_public_org_info: Organization = {
siret: "25680169700010",
cached_tranche_effectifs: "NN",
cached_tranche_effectifs_unite_legale: "NN",
Expand Down Expand Up @@ -154,7 +154,20 @@ describe("isServicePublic", () => {
});

it("should return true for public etablissement", () => {
assert.equal(isServicePublic(track_dechet_public_org_info), true);
assert.equal(isServicePublic(trackdechets_public_org_info), true);
});
});

describe("isWasteManagementOrganization", () => {
it("should return false for collectivité territoriale", () => {
assert.equal(isWasteManagementOrganization(lamalou_org_info), false);
});

it("should return true for waste management organization", () => {
assert.equal(
isWasteManagementOrganization(trackdechets_public_org_info),
true,
);
});
});

Expand Down
Loading