Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anonymizer): base on pg_anonymizer, use detect to check columns. #18

Merged
merged 5 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dblinter/default_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ table:
message: "Uppercase used on table {0}.{1}.{2}."
fixes:
- Do not use uppercase for any database objects
- name: TableWithSensibleColumn
ruleid: T012
enabled: True
context:
desc: Base on the extension anon (https://postgresql-anonymizer.readthedocs.io/en/stable/detection), show sensitive column.
message: "{0} have column {1} (category {2}) that can be consider has sensitive. It should be masked for non data-operator users."
fixes:
- Install extension anon, and create some masking rules on.
schema:
- name: SchemaWithDefaultRoleNotGranted
ruleid: S001
Expand Down
43 changes: 43 additions & 0 deletions dblinter/rules/T012/TableWithSensibleColumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging

from dblinter.database_connection import DatabaseConnection

LOGGER = logging.getLogger("dblinter")


def table_with_sensible_column(
self, db: DatabaseConnection, _, context, table, sarif_document
):
LOGGER.debug(
"table_with_sensible_column for %s.%s in db %s", table[0], table[1], db.database
)
CHECK_EXTENSION = "select count(*) as nb from pg_extension where extname='anon'"
anon = db.query(CHECK_EXTENSION)[0][0]
if anon == 0:
LOGGER.info(
"TableWithSensibleColumn is enabled, but anon extension not found. in db %s. see https://postgresql-anonymizer.readthedocs.io to install",
db.database,
)
return
SENSITIVE_COLS = f"""with coltable as (SELECT column_name,
identifiers_category from
anon.detect('en_US')
join pg_class c on oid=table_name
where c.relname='{table[1]}'
union
SELECT column_name,
identifiers_category from
anon.detect('fr_FR')
join pg_class c on oid=table_name
where c.relname='{table[1]}')
select distinct column_name,identifiers_category from coltable
"""

uri = f"{db.database}.{table[0]}.{table[1]}"
sensitive_cols = db.query(SENSITIVE_COLS)
if sensitive_cols:
for elt in sensitive_cols:
message_args = (uri, elt[0], elt[1])
sarif_document.add_check(
self.get_ruleid_from_function_name(), message_args, uri, context
)
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from testcontainers.core.waiting_utils import wait_container_is_ready
from testcontainers.postgres import PostgresContainer

# PG_IMAGE = "registry.gitlab.com/dalibo/postgresql_anonymizer:latest"
PG_IMAGE = "postgres:14"
PG_PORT = 5432
PG_USER = "postgres"
Expand Down
61 changes: 61 additions & 0 deletions tests/rules/T012/test_TableWithSensibleColumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from dblinter.configuration_model import Context
from dblinter.database_connection import DatabaseConnection
from dblinter.function_library import FunctionLibrary
from dblinter.sarif_document import SarifDocument


def test_table_with_sensitive_column(postgres_instance_args) -> None:
args = postgres_instance_args
db = DatabaseConnection(args)
CHECK_EXTENSION = "select count(*) as nb from pg_extension where extname='anon'"
anon = db.query(CHECK_EXTENSION)[0][0]
if anon == 0:
assert True
return
context = Context(
desc="Base on the extension anon (https://postgresql-anonymizer.readthedocs.io/en/stable/detection), show sensitive column.",
fixes=[
"Install extension anon, and create some masking rules on.",
],
message="{0} have column {1} (category {2}) that can be consider has sensitive. It should be masked for non data-operator users.",
)
function_library = FunctionLibrary()
db.query("select anon.init()")
db.query("CREATE TABLE test (id integer, creditcard text)")
sarif_document = SarifDocument()
function_library.get_function_by_function_name("table_with_sensible_column")(
function_library, db, [], context, ("public", "test"), sarif_document
)
assert (
sarif_document.sarif_doc.runs[0].results[0].message.text
== "postgres.public.test have column creditcard (category creditcard) that can be consider has sensitive. It should be masked for non data-operator users."
)
assert (
sarif_document.sarif_doc.runs[0].results[1].message.text
== "postgres.public.test have column id (category account_id) that can be consider has sensitive. It should be masked for non data-operator users."
)


def test_table_without_sensitive_column(postgres_instance_args) -> None:
args = postgres_instance_args
db = DatabaseConnection(args)
CHECK_EXTENSION = "select count(*) as nb from pg_extension where extname='anon'"
anon = db.query(CHECK_EXTENSION)[0][0]
if anon == 0:
assert True
return
context = Context(
desc="Base on the extension anon (https://postgresql-anonymizer.readthedocs.io/en/stable/detection), show sensitive column.",
fixes=[
"Install extension anon, and create some masking rules on.",
],
message="{0} have column {1} (category {2}) that can be consider has sensitive. It should be masked for non data-operator users.",
)
function_library = FunctionLibrary()
db.query("select anon.init()")
db.query("CREATE TABLE test (test_id integer, description text)")
sarif_document = SarifDocument()
function_library.get_function_by_function_name("table_with_sensible_column")(
function_library, db, [], context, ("public", "test"), sarif_document
)
assert sarif_document.sarif_doc.runs[0].results == []