Skip to content

feat: Add support for Microsoft Fabric Warehouse #4751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions docs/guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ These pages describe the connection configuration options for each execution eng
* [BigQuery](../integrations/engines/bigquery.md)
* [Databricks](../integrations/engines/databricks.md)
* [DuckDB](../integrations/engines/duckdb.md)
* [Fabric](../integrations/engines/fabric.md)
* [MotherDuck](../integrations/engines/motherduck.md)
* [MySQL](../integrations/engines/mysql.md)
* [MSSQL](../integrations/engines/mssql.md)
Expand Down
32 changes: 32 additions & 0 deletions docs/integrations/engines/fabric.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Fabric

## Local/Built-in Scheduler
**Engine Adapter Type**: `fabric`

NOTE: Fabric Warehouse is not recommended to be used for the SQLMesh [state connection](../../reference/configuration.md#connections).

### Installation
#### Microsoft Entra ID / Azure Active Directory Authentication:
```
pip install "sqlmesh[mssql-odbc]"
```

### Connection options

| Option | Description | Type | Required |
| ----------------- | ------------------------------------------------------------ | :----------: | :------: |
| `type` | Engine type name - must be `fabric` | string | Y |
| `host` | The hostname of the Fabric Warehouse server | string | Y |
| `user` | The client id to use for authentication with the Fabric Warehouse server | string | N |
| `password` | The client secret to use for authentication with the Fabric Warehouse server | string | N |
| `port` | The port number of the Fabric Warehouse server | int | N |
| `database` | The target database | string | N |
| `charset` | The character set used for the connection | string | N |
| `timeout` | The query timeout in seconds. Default: no timeout | int | N |
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N |
| `appname` | The application name to use for the connection | string | N |
| `conn_properties` | The list of connection properties | list[string] | N |
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
| `driver` | The driver to use for the connection. Default: pyodbc | string | N |
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N |
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N |
1 change: 1 addition & 0 deletions docs/integrations/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ SQLMesh supports the following execution engines for running SQLMesh projects (e
* [ClickHouse](./engines/clickhouse.md) (clickhouse)
* [Databricks](./engines/databricks.md) (databricks)
* [DuckDB](./engines/duckdb.md) (duckdb)
* [Fabric](./engines/fabric.md) (fabric)
* [MotherDuck](./engines/motherduck.md) (motherduck)
* [MSSQL](./engines/mssql.md) (mssql)
* [MySQL](./engines/mysql.md) (mysql)
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ nav:
- integrations/engines/clickhouse.md
- integrations/engines/databricks.md
- integrations/engines/duckdb.md
- integrations/engines/fabric.md
- integrations/engines/motherduck.md
- integrations/engines/mssql.md
- integrations/engines/mysql.md
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ markers = [
"clickhouse_cloud: test for Clickhouse (cloud mode)",
"databricks: test for Databricks",
"duckdb: test for DuckDB",
"fabric: test for Fabric",
"motherduck: test for MotherDuck",
"mssql: test for MSSQL",
"mysql: test for MySQL",
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ConnectionConfig as ConnectionConfig,
DatabricksConnectionConfig as DatabricksConnectionConfig,
DuckDBConnectionConfig as DuckDBConnectionConfig,
FabricConnectionConfig as FabricConnectionConfig,
GCPPostgresConnectionConfig as GCPPostgresConnectionConfig,
MotherDuckConnectionConfig as MotherDuckConnectionConfig,
MSSQLConnectionConfig as MSSQLConnectionConfig,
Expand Down
36 changes: 35 additions & 1 deletion sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@

logger = logging.getLogger(__name__)

RECOMMENDED_STATE_SYNC_ENGINES = {"postgres", "gcp_postgres", "mysql", "mssql", "azuresql"}
RECOMMENDED_STATE_SYNC_ENGINES = {
"postgres",
"gcp_postgres",
"mysql",
"mssql",
"azuresql",
}
FORBIDDEN_STATE_SYNC_ENGINES = {
# Do not support row-level operations
"spark",
Expand Down Expand Up @@ -1635,6 +1641,34 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY}


class FabricConnectionConfig(MSSQLConnectionConfig):
"""
Fabric Connection Configuration.
Inherits most settings from MSSQLConnectionConfig and sets the type to 'fabric'.
It is recommended to use the 'pyodbc' driver for Fabric.
"""

type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore
DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore
DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore
DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore
driver: t.Literal["pyodbc"] = "pyodbc"
autocommit: t.Optional[bool] = True

@property
def _engine_adapter(self) -> t.Type[EngineAdapter]:
from sqlmesh.core.engine_adapter.fabric import FabricAdapter

return FabricAdapter

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {
"database": self.database,
"catalog_support": CatalogSupport.REQUIRES_SET_CATALOG,
}


class SparkConnectionConfig(ConnectionConfig):
"""
Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks.
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/engine_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter
from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter
from sqlmesh.core.engine_adapter.fabric import FabricAdapter

DIALECT_TO_ENGINE_ADAPTER = {
"hive": SparkEngineAdapter,
Expand All @@ -35,6 +36,7 @@
"trino": TrinoEngineAdapter,
"athena": AthenaEngineAdapter,
"risingwave": RisingwaveEngineAdapter,
"fabric": FabricAdapter,
}

DIALECT_ALIASES = {
Expand Down
46 changes: 46 additions & 0 deletions sqlmesh/core/engine_adapter/fabric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from __future__ import annotations

import typing as t
from sqlglot import exp
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery
from sqlmesh.core.engine_adapter.base import EngineAdapter

if t.TYPE_CHECKING:
from sqlmesh.core._typing import TableName


class FabricAdapter(MSSQLEngineAdapter):
"""
Adapter for Microsoft Fabric.
"""

DIALECT = "fabric"
SUPPORTS_INDEXES = False
SUPPORTS_TRANSACTIONS = False
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT

def _insert_overwrite_by_condition(
self,
table_name: TableName,
source_queries: t.List[SourceQuery],
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
where: t.Optional[exp.Condition] = None,
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
**kwargs: t.Any,
) -> None:
"""
Implements the insert overwrite strategy for Fabric using DELETE and INSERT.

This method is overridden to avoid the MERGE statement from the parent
MSSQLEngineAdapter, which is not fully supported in Fabric.
"""
return EngineAdapter._insert_overwrite_by_condition(
self,
table_name=table_name,
source_queries=source_queries,
columns_to_types=columns_to_types,
where=where,
insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT,
**kwargs,
)
83 changes: 83 additions & 0 deletions tests/core/engine_adapter/test_fabric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# type: ignore

import typing as t

import pytest
from sqlglot import exp, parse_one

from sqlmesh.core.engine_adapter import FabricAdapter
from tests.core.engine_adapter import to_sql_calls

pytestmark = [pytest.mark.engine, pytest.mark.fabric]


@pytest.fixture
def adapter(make_mocked_engine_adapter: t.Callable) -> FabricAdapter:
return make_mocked_engine_adapter(FabricAdapter)


def test_columns(adapter: FabricAdapter):
adapter.cursor.fetchall.return_value = [
("decimal_ps", "decimal", None, 5, 4),
("decimal", "decimal", None, 18, 0),
("float", "float", None, 53, None),
("char_n", "char", 10, None, None),
("varchar_n", "varchar", 10, None, None),
("nvarchar_max", "nvarchar", -1, None, None),
]

assert adapter.columns("db.table") == {
"decimal_ps": exp.DataType.build("decimal(5, 4)", dialect=adapter.dialect),
"decimal": exp.DataType.build("decimal(18, 0)", dialect=adapter.dialect),
"float": exp.DataType.build("float(53)", dialect=adapter.dialect),
"char_n": exp.DataType.build("char(10)", dialect=adapter.dialect),
"varchar_n": exp.DataType.build("varchar(10)", dialect=adapter.dialect),
"nvarchar_max": exp.DataType.build("nvarchar(max)", dialect=adapter.dialect),
}

# Verify that the adapter queries the uppercase INFORMATION_SCHEMA
adapter.cursor.execute.assert_called_once_with(
"""SELECT [COLUMN_NAME], [DATA_TYPE], [CHARACTER_MAXIMUM_LENGTH], [NUMERIC_PRECISION], [NUMERIC_SCALE] FROM [INFORMATION_SCHEMA].[COLUMNS] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';"""
)


def test_table_exists(adapter: FabricAdapter):
adapter.cursor.fetchone.return_value = (1,)
assert adapter.table_exists("db.table")
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA
adapter.cursor.execute.assert_called_once_with(
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';"""
)

adapter.cursor.fetchone.return_value = None
assert not adapter.table_exists("db.table")


def test_insert_overwrite_by_time_partition(adapter: FabricAdapter):
adapter.insert_overwrite_by_time_partition(
"test_table",
parse_one("SELECT a, b FROM tbl"),
start="2022-01-01",
end="2022-01-02",
time_column="b",
time_formatter=lambda x, _: exp.Literal.string(x.strftime("%Y-%m-%d")),
columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")},
)

# Fabric adapter should use DELETE/INSERT strategy, not MERGE.
assert to_sql_calls(adapter) == [
"""DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""",
"""INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a] AS [a], [b] AS [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""",
]


def test_replace_query(adapter: FabricAdapter):
adapter.cursor.fetchone.return_value = (1,)
adapter.replace_query("test_table", parse_one("SELECT a FROM tbl"), {"a": "int"})

# This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT
assert to_sql_calls(adapter) == [
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'test_table';""",
"TRUNCATE TABLE [test_table];",
"INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];",
]
83 changes: 83 additions & 0 deletions tests/core/test_connection_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ConnectionConfig,
DatabricksConnectionConfig,
DuckDBAttachOptions,
FabricConnectionConfig,
DuckDBConnectionConfig,
GCPPostgresConnectionConfig,
MotherDuckConnectionConfig,
Expand Down Expand Up @@ -1417,3 +1418,85 @@ def test_mssql_pymssql_connection_factory():
# Clean up the mock module
if "pymssql" in sys.modules:
del sys.modules["pymssql"]


def test_fabric_connection_config_defaults(make_config):
"""Test Fabric connection config defaults to pyodbc and autocommit=True."""
config = make_config(type="fabric", host="localhost", check_import=False)
assert isinstance(config, FabricConnectionConfig)
assert config.driver == "pyodbc"
assert config.autocommit is True

# Ensure it creates the FabricAdapter
from sqlmesh.core.engine_adapter.fabric import FabricAdapter

assert isinstance(config.create_engine_adapter(), FabricAdapter)


def test_fabric_connection_config_parameter_validation(make_config):
"""Test Fabric connection config parameter validation."""
# Test that FabricConnectionConfig correctly handles pyodbc-specific parameters.
config = make_config(
type="fabric",
host="localhost",
driver_name="ODBC Driver 18 for SQL Server",
trust_server_certificate=True,
encrypt=False,
odbc_properties={"Authentication": "ActiveDirectoryServicePrincipal"},
check_import=False,
)
assert isinstance(config, FabricConnectionConfig)
assert config.driver == "pyodbc" # Driver is fixed to pyodbc
assert config.driver_name == "ODBC Driver 18 for SQL Server"
assert config.trust_server_certificate is True
assert config.encrypt is False
assert config.odbc_properties == {"Authentication": "ActiveDirectoryServicePrincipal"}

# Test that specifying a different driver for Fabric raises an error
with pytest.raises(ConfigError, match=r"Input should be 'pyodbc'"):
make_config(type="fabric", host="localhost", driver="pymssql", check_import=False)


def test_fabric_pyodbc_connection_string_generation():
"""Test that the Fabric pyodbc connection gets invoked with the correct ODBC connection string."""
with patch("pyodbc.connect") as mock_pyodbc_connect:
# Create a Fabric config
config = FabricConnectionConfig(
host="testserver.datawarehouse.fabric.microsoft.com",
port=1433,
database="testdb",
user="testuser",
password="testpass",
driver_name="ODBC Driver 18 for SQL Server",
trust_server_certificate=True,
encrypt=True,
login_timeout=30,
check_import=False,
)

# Get the connection factory with kwargs and call it
factory_with_kwargs = config._connection_factory_with_kwargs
connection = factory_with_kwargs()

# Verify pyodbc.connect was called with the correct connection string
mock_pyodbc_connect.assert_called_once()
call_args = mock_pyodbc_connect.call_args

# Check the connection string (first argument)
conn_str = call_args[0][0]
expected_parts = [
"DRIVER={ODBC Driver 18 for SQL Server}",
"SERVER=testserver.datawarehouse.fabric.microsoft.com,1433",
"DATABASE=testdb",
"Encrypt=YES",
"TrustServerCertificate=YES",
"Connection Timeout=30",
"UID=testuser",
"PWD=testpass",
]

for part in expected_parts:
assert part in conn_str

# Check autocommit parameter, should default to True for Fabric
assert call_args[1]["autocommit"] is True