diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index a4567a5b3e..31017ff048 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -290,6 +290,8 @@ def _verify_schema(self) -> None: logger.warning(f"A column {column_name} in table {table_name} in schema {self.schema.name} is incomplete. It was not bound to the data during normalizations stage and its data type is unknown. Did you add this column manually in code ie. as a merge key?") def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema: + if not table_name in self.schema.tables: + return None try: # make a copy of the schema so modifications do not affect the original document table = deepcopy(self.schema.tables[table_name]) diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 0124a277d3..b54748eca2 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -326,16 +326,17 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc # build sql canonical_name = self.sql_client.make_qualified_table_name(table_name) table = self.get_load_table(table_name) + table_format = table.get("table_format") if table else None sql_result: List[str] = [] if not generate_alter: # build CREATE sql = f"CREATE TABLE {canonical_name} (\n" - sql += ",\n".join([self._get_column_def_sql(c, table.get("table_format")) for c in new_columns]) + sql += ",\n".join([self._get_column_def_sql(c, table_format) for c in new_columns]) sql += ")" sql_result.append(sql) else: sql_base = f"ALTER TABLE {canonical_name}\n" - add_column_statements = self._make_add_column_sql(new_columns, table.get("table_format")) + add_column_statements = self._make_add_column_sql(new_columns, table_format) if self.capabilities.alter_add_multi_column: column_sql = ",\n" sql_result.append(sql_base + column_sql.join(add_column_statements))