From e55ecb12b717966d0306ab74d5aa9828bb472f96 Mon Sep 17 00:00:00 2001 From: Robert Phamle Date: Thu, 21 Nov 2024 13:50:38 -0800 Subject: [PATCH] handle get_storage_block --- block_cascade/prefect/v2/__init__.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/block_cascade/prefect/v2/__init__.py b/block_cascade/prefect/v2/__init__.py index 4c03261..53cd7f4 100644 --- a/block_cascade/prefect/v2/__init__.py +++ b/block_cascade/prefect/v2/__init__.py @@ -38,6 +38,12 @@ async def _fetch_block(block_id: str) -> Optional[BlockDocument]: async with get_client() as client: return await client.read_block_document(block_id) +async def _fetch_block_by_name(block_name: str, block_type_slug: str = "gcs-bucket") -> Optional[BlockDocument]: + async with get_client() as client: + return await client.read_block_document_by_name( + name=block_name, + block_type_slug=block_type_slug, + ) def get_from_prefect_context(attr: str, default: str = "") -> str: flow_context = FlowRunContext.get() @@ -80,8 +86,13 @@ def get_storage_block() -> Optional[BlockDocument]: global _CACHED_STORAGE # noqa: PLW0603 if not _CACHED_STORAGE: - _CACHED_STORAGE = run_async( - _fetch_block(current_deployment.storage_document_id) + if current_deployment.pull_steps: + _CACHED_STORAGE = run_async( + _fetch_block_by_name(block_name=current_deployment.pull_steps[0]["prefect.deployments.steps.pull_with_block"]["block_document_name"]) + ) + else: + _CACHED_STORAGE = run_async( + _fetch_block(block_id=current_deployment.storage_document_id) ) return _CACHED_STORAGE