From fd36539dd28961911e4fb3d488bd0ef1d50cee5d Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Mon, 8 Jan 2024 16:56:33 +0100 Subject: [PATCH 1/9] Add column inference when creating dataframe from table --- greenplumpython/dataframe.py | 43 ++++++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index e8be2dd1..5c67ea2c 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -973,6 +973,21 @@ def save_as( if drop_if_exists else "" ) + # if temp: + # temp_schema_name = next( + # iter( + # ( + # self._db._execute( + # f""" + # SELECT DISTINCT 'pg_temp_'||sess_id temp_schema + # FROM pg_stat_activity + # WHERE pid = pg_backend_pid(); + # """, + # has_results=True, + # ) + # ) + # ) + # )["temp_schema"] self._db._execute( f""" DO $$ @@ -988,7 +1003,7 @@ def save_as( """, has_results=False, ) - return DataFrame.from_table(table_name, self._db, schema=schema) + return DataFrame.from_table(table_name, self._db, schema=schema if not temp else "pg_temp") def create_index( self, @@ -1126,7 +1141,31 @@ def from_table(cls, table_name: str, db: Database, schema: Optional[str] = None) """ qualified_name = f'"{schema}"."{table_name}"' if schema is not None else f'"{table_name}"' - return cls(f"TABLE {qualified_name}", db=db, qualified_table_name=qualified_name) + table_schema_clause = ( + ( + " AND table_schema " + + (f"""like '{schema}_%'""" if schema == "pg_temp" else f"""= '{schema}'""") + ) + if schema + else "" + ) + + # table_schema_clause = f""" AND table_schema like '{schema}_%'""" if schema = "pg_temp" else if schema else "" + columns_query = f""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = '{table_name}' {table_schema_clause} + ORDER BY ordinal_position + """ + columns_inf_result = list(db._execute(columns_query, has_results=True)) + assert columns_inf_result, f"Table {qualified_name} does not exists" + columns_list = {d["column_name"]: d["data_type"] for d in columns_inf_result} + return cls( + f"TABLE {qualified_name}", + db=db, + qualified_table_name=qualified_name, + columns=columns_list, + ) @classmethod def from_rows( From 9ef85713f16b0a72b2f805525c350864f1e8c45d Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Mon, 8 Jan 2024 17:07:31 +0100 Subject: [PATCH 2/9] Fix lint --- greenplumpython/dataframe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 5c67ea2c..ee1b3df0 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -1157,15 +1157,15 @@ def from_table(cls, table_name: str, db: Database, schema: Optional[str] = None) WHERE table_name = '{table_name}' {table_schema_clause} ORDER BY ordinal_position """ - columns_inf_result = list(db._execute(columns_query, has_results=True)) + columns_inf_result = list(db._execute(columns_query, has_results=True)) # type: ignore reportUnknownVariableType assert columns_inf_result, f"Table {qualified_name} does not exists" - columns_list = {d["column_name"]: d["data_type"] for d in columns_inf_result} + columns_list: dict[str, str] = {d["column_name"]: d["data_type"] for d in columns_inf_result} # type: ignore reportUnknownVariableType return cls( f"TABLE {qualified_name}", db=db, qualified_table_name=qualified_name, columns=columns_list, - ) + ) # type: ignore reportUnknownVariableType @classmethod def from_rows( From cc56e9052b7e6f4b95e42767d83496e43cf10099 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Tue, 9 Jan 2024 10:16:45 +0100 Subject: [PATCH 3/9] Modify the way search column --- greenplumpython/dataframe.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index ee1b3df0..0a5d948e 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -1141,21 +1141,10 @@ def from_table(cls, table_name: str, db: Database, schema: Optional[str] = None) """ qualified_name = f'"{schema}"."{table_name}"' if schema is not None else f'"{table_name}"' - table_schema_clause = ( - ( - " AND table_schema " - + (f"""like '{schema}_%'""" if schema == "pg_temp" else f"""= '{schema}'""") - ) - if schema - else "" - ) - - # table_schema_clause = f""" AND table_schema like '{schema}_%'""" if schema = "pg_temp" else if schema else "" columns_query = f""" - SELECT column_name, data_type - FROM information_schema.columns - WHERE table_name = '{table_name}' {table_schema_clause} - ORDER BY ordinal_position + SELECT attname AS column_name, atttypid::regtype AS data_type + FROM pg_attribute + WHERE attrelid = '{qualified_name}'::regclass and attnum > 0; """ columns_inf_result = list(db._execute(columns_query, has_results=True)) # type: ignore reportUnknownVariableType assert columns_inf_result, f"Table {qualified_name} does not exists" From c48eb9f509d26518520f6e25f7dff7f1cdc11382 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Tue, 9 Jan 2024 10:53:55 +0100 Subject: [PATCH 4/9] Remove useless clause --- greenplumpython/dataframe.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 0a5d948e..6dd57ad8 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -973,21 +973,6 @@ def save_as( if drop_if_exists else "" ) - # if temp: - # temp_schema_name = next( - # iter( - # ( - # self._db._execute( - # f""" - # SELECT DISTINCT 'pg_temp_'||sess_id temp_schema - # FROM pg_stat_activity - # WHERE pid = pg_backend_pid(); - # """, - # has_results=True, - # ) - # ) - # ) - # )["temp_schema"] self._db._execute( f""" DO $$ From 5e9d988a464a16de61fab99069fdb837f54fbbe0 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Wed, 10 Jan 2024 16:37:56 +0100 Subject: [PATCH 5/9] Make column info extraction lazily done in df.describe() --- greenplumpython/dataframe.py | 38 +++++++++++++++++++++++------------- tests/test_dataframe.py | 17 ++++++++++++++++ 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 6dd57ad8..402f5678 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -1126,20 +1126,7 @@ def from_table(cls, table_name: str, db: Database, schema: Optional[str] = None) """ qualified_name = f'"{schema}"."{table_name}"' if schema is not None else f'"{table_name}"' - columns_query = f""" - SELECT attname AS column_name, atttypid::regtype AS data_type - FROM pg_attribute - WHERE attrelid = '{qualified_name}'::regclass and attnum > 0; - """ - columns_inf_result = list(db._execute(columns_query, has_results=True)) # type: ignore reportUnknownVariableType - assert columns_inf_result, f"Table {qualified_name} does not exists" - columns_list: dict[str, str] = {d["column_name"]: d["data_type"] for d in columns_inf_result} # type: ignore reportUnknownVariableType - return cls( - f"TABLE {qualified_name}", - db=db, - qualified_table_name=qualified_name, - columns=columns_list, - ) # type: ignore reportUnknownVariableType + return cls(f"TABLE {qualified_name}", db=db, qualified_table_name=qualified_name) @classmethod def from_rows( @@ -1277,3 +1264,26 @@ def from_files(cls, files: list[str], parser: "NormalFunction", db: Database) -> raise NotImplementedError( "Please import greenplumpython.experimental.file to load the implementation." ) + + def describe(self) -> dict[str, str]: + """ + Returns a dictionary summarising the column information of the dataframe, + conditional on the table existing in the database. + + Returns: + Dictionary containing the column names and types. + + """ + assert self._qualified_table_name is not None, f"Dataframe is not saved in database." + columns_query = f""" + SELECT attname AS column_name, atttypid::regtype AS data_type + FROM pg_attribute + WHERE attrelid = '{self._qualified_table_name}'::regclass and attnum > 0; + """ + assert self._db is not None + columns_inf_result = list(self._db._execute(columns_query, has_results=True)) # type: ignore reportUnknownVariableType + assert columns_inf_result, f"Table {self._qualified_table_name} does not exists." + columns_list: dict[str, str] = { + d["column_name"]: d["data_type"] for d in columns_inf_result # type: ignore reportUnknownVariableType + } # type: ignore reportUnknownVariableType + return columns_list diff --git a/tests/test_dataframe.py b/tests/test_dataframe.py index c7d8e8e3..d427f638 100644 --- a/tests/test_dataframe.py +++ b/tests/test_dataframe.py @@ -506,3 +506,20 @@ def test_const_non_ascii(db: gp.Database): df = db.create_dataframe(columns={"Ø": ["Ø"]}) for row in df[["Ø"]]: assert row["Ø"] == "Ø" + + +def test_table_describe(db: gp.Database): + df = db.create_dataframe(table_name="pg_class") + result = df.describe() + assert len(result) == 33 + df_not_exist = db.create_dataframe(table_name="not_exist_table") + with pytest.raises(Exception) as exc_info: + df_not_exist.describe() + assert 'relation "not_exist_table" does not exist' in str(exc_info.value) + + +def test_dataframe_describe(db: gp.Database): + df = db.create_dataframe(table_name="pg_class")[["relname", "relnamespace"]] + with pytest.raises(Exception) as exc_info: + df.describe() + assert "Dataframe is not saved in database" in str(exc_info.value) From 9a85c9ea19491e620acdec8aa1c98cd87711aafb Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Wed, 10 Jan 2024 16:50:12 +0100 Subject: [PATCH 6/9] Fix docstyle --- greenplumpython/dataframe.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 402f5678..8de8a202 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -1267,8 +1267,7 @@ def from_files(cls, files: list[str], parser: "NormalFunction", db: Database) -> def describe(self) -> dict[str, str]: """ - Returns a dictionary summarising the column information of the dataframe, - conditional on the table existing in the database. + Returns a dictionary summarising the column information of the dataframe, conditional on the table existing in the database. Returns: Dictionary containing the column names and types. From ff64e256bfcffbe53a9d0cf56ba2e89e37daff6e Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Wed, 10 Jan 2024 16:51:58 +0100 Subject: [PATCH 7/9] Fix docstyle --- greenplumpython/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 8de8a202..2e8812dc 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -1267,7 +1267,7 @@ def from_files(cls, files: list[str], parser: "NormalFunction", db: Database) -> def describe(self) -> dict[str, str]: """ - Returns a dictionary summarising the column information of the dataframe, conditional on the table existing in the database. + Return a dictionary summarising the column information of the dataframe, conditional on the table existing in the database. Returns: Dictionary containing the column names and types. From 2e051c61d692e02a0819ec7ce0d0245220dde1a9 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Thu, 11 Jan 2024 13:47:31 +0100 Subject: [PATCH 8/9] Modify testcase --- tests/test_dataframe.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/tests/test_dataframe.py b/tests/test_dataframe.py index d427f638..5fb2b473 100644 --- a/tests/test_dataframe.py +++ b/tests/test_dataframe.py @@ -475,6 +475,22 @@ def test_table_distributed_hash(db: gp.Database): assert row["distributedby"] == "DISTRIBUTED BY (id)" +def test_table_describe(db: gp.Database): + columns = {"a": [1, 2, 3], "b": [1, 2, 3]} + t = db.create_dataframe(columns=columns) + df = t.save_as("const_dataframe", column_names=["a", "b"], schema="test") + result = df.describe() + assert len(result) == 2 + df_s = df[["a", "b"]] + with pytest.raises(Exception) as exc_info: + df_s.describe() + assert "Dataframe is not saved in database" in str(exc_info.value) + df_not_exist = db.create_dataframe(table_name="not_exist_table") + with pytest.raises(Exception) as exc_info: + df_not_exist.describe() + assert 'relation "not_exist_table" does not exist' in str(exc_info.value) + + import pandas as pd @@ -506,20 +522,3 @@ def test_const_non_ascii(db: gp.Database): df = db.create_dataframe(columns={"Ø": ["Ø"]}) for row in df[["Ø"]]: assert row["Ø"] == "Ø" - - -def test_table_describe(db: gp.Database): - df = db.create_dataframe(table_name="pg_class") - result = df.describe() - assert len(result) == 33 - df_not_exist = db.create_dataframe(table_name="not_exist_table") - with pytest.raises(Exception) as exc_info: - df_not_exist.describe() - assert 'relation "not_exist_table" does not exist' in str(exc_info.value) - - -def test_dataframe_describe(db: gp.Database): - df = db.create_dataframe(table_name="pg_class")[["relname", "relnamespace"]] - with pytest.raises(Exception) as exc_info: - df.describe() - assert "Dataframe is not saved in database" in str(exc_info.value) From 6dfa4a53cece9b6287250d41d6507279e29a0a45 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Thu, 11 Jan 2024 14:16:38 +0100 Subject: [PATCH 9/9] Fix testcase --- tests/test_dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_dataframe.py b/tests/test_dataframe.py index 5fb2b473..72fc839b 100644 --- a/tests/test_dataframe.py +++ b/tests/test_dataframe.py @@ -478,7 +478,7 @@ def test_table_distributed_hash(db: gp.Database): def test_table_describe(db: gp.Database): columns = {"a": [1, 2, 3], "b": [1, 2, 3]} t = db.create_dataframe(columns=columns) - df = t.save_as("const_dataframe", column_names=["a", "b"], schema="test") + df = t.save_as("const_table_describe", column_names=["a", "b"], schema="test") result = df.describe() assert len(result) == 2 df_s = df[["a", "b"]]