From e0b92206f7398740af2f5be765b41d0394330c23 Mon Sep 17 00:00:00 2001 From: Eric Sivonxay Date: Thu, 18 May 2023 13:49:11 -0700 Subject: [PATCH 1/7] Use a DataServer and MultiStore to pool connections --- src/jobflow/managers/fireworks.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/jobflow/managers/fireworks.py b/src/jobflow/managers/fireworks.py index 5cc32129..aac34953 100644 --- a/src/jobflow/managers/fireworks.py +++ b/src/jobflow/managers/fireworks.py @@ -5,6 +5,10 @@ import typing from fireworks import FiretaskBase, Firework, FWAction, Workflow, explicit_serialize +from fireworks.utilities.fw_utilities import DataServer +from fireworks.fw_config import DS_PASSWORD +from maggma.stores.shared_stores import MultiStoreFacade +import os if typing.TYPE_CHECKING: from typing import Sequence @@ -151,7 +155,14 @@ def run_task(self, fw_spec): if store is None: store = SETTINGS.JOB_STORE - store.connect() + if os.environ["FW_DATASERVER_PORT"]: + ds = DataServer(address=("127.0.0.1", int(os.environ["FW_DATASERVER_PORT"])), + authkey=DS_PASSWORD) + ds.connect() + multistore = ds.MultiStore() + store = MultiStoreFacade(store, multistore) + else: + store.connect() if hasattr(self, "fw_id"): job.metadata.update({"fw_id": self.fw_id}) From 5d0697dcf9bb322f4393e064718e80b8f7357a98 Mon Sep 17 00:00:00 2001 From: Eric Sivonxay Date: Thu, 18 May 2023 17:13:47 -0700 Subject: [PATCH 2/7] Update src/jobflow/managers/fireworks.py Co-authored-by: Janosh Riebesell --- src/jobflow/managers/fireworks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jobflow/managers/fireworks.py b/src/jobflow/managers/fireworks.py index aac34953..274660dd 100644 --- a/src/jobflow/managers/fireworks.py +++ b/src/jobflow/managers/fireworks.py @@ -155,7 +155,7 @@ def run_task(self, fw_spec): if store is None: store = SETTINGS.JOB_STORE - if os.environ["FW_DATASERVER_PORT"]: + if "FW_DATASERVER_PORT" in os.environ: ds = DataServer(address=("127.0.0.1", int(os.environ["FW_DATASERVER_PORT"])), authkey=DS_PASSWORD) ds.connect() From 21ebef7c52a23b77536a293877725b1807468efe Mon Sep 17 00:00:00 2001 From: Eric Sivonxay Date: Tue, 23 May 2023 16:56:39 -0700 Subject: [PATCH 3/7] Add __eq__ function to the jobstore --- src/jobflow/core/store.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/jobflow/core/store.py b/src/jobflow/core/store.py index fc68e223..e673b631 100644 --- a/src/jobflow/core/store.py +++ b/src/jobflow/core/store.py @@ -444,6 +444,30 @@ def remove_docs(self, criteria: dict): store.remove_docs({"job_uuid": doc["uuid"], "job_index": doc["index"]}) self.docs_store.remove_docs(criteria) + + def __eq__(self, other: object) -> bool: + """ + Check equality for JobStore + Args: + other: other JobStore to compare with + """ + if not isinstance(other, JobStore): + return False + + fields = [ + "docs_store", + "save", + "load" + ] + + # Check equality of all additional_stores + if len(self.additional_stores) == len(other.additional_stores): + if self.additional_stores == other.additional_stores: + return all(getattr(self, f) == getattr(other, f) for f in fields) + + return False + + def get_output( self, uuid: str, From 8718eef26ee3ea445a9d63d313dc82eba296a116 Mon Sep 17 00:00:00 2001 From: Eric Sivonxay Date: Tue, 23 May 2023 16:57:29 -0700 Subject: [PATCH 4/7] Update class names to reflect what was merged into maggma --- src/jobflow/managers/fireworks.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/jobflow/managers/fireworks.py b/src/jobflow/managers/fireworks.py index 274660dd..16b0725d 100644 --- a/src/jobflow/managers/fireworks.py +++ b/src/jobflow/managers/fireworks.py @@ -7,7 +7,7 @@ from fireworks import FiretaskBase, Firework, FWAction, Workflow, explicit_serialize from fireworks.utilities.fw_utilities import DataServer from fireworks.fw_config import DS_PASSWORD -from maggma.stores.shared_stores import MultiStoreFacade +from maggma.stores.shared_stores import StoreFacade import os if typing.TYPE_CHECKING: @@ -156,11 +156,13 @@ def run_task(self, fw_spec): if store is None: store = SETTINGS.JOB_STORE if "FW_DATASERVER_PORT" in os.environ: - ds = DataServer(address=("127.0.0.1", int(os.environ["FW_DATASERVER_PORT"])), - authkey=DS_PASSWORD) + ds = DataServer( + address=("127.0.0.1", int(os.environ["FW_DATASERVER_PORT"])), + authkey=DS_PASSWORD + ) ds.connect() multistore = ds.MultiStore() - store = MultiStoreFacade(store, multistore) + store = StoreFacade(store, multistore) else: store.connect() From c5e0ab42f912c87b5f3a9682b5599cbb53494cc3 Mon Sep 17 00:00:00 2001 From: Eric Sivonxay Date: Tue, 23 May 2023 17:02:35 -0700 Subject: [PATCH 5/7] Fix linting errors --- src/jobflow/core/store.py | 14 +++++--------- src/jobflow/managers/fireworks.py | 8 ++++---- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/jobflow/core/store.py b/src/jobflow/core/store.py index e673b631..d49a5483 100644 --- a/src/jobflow/core/store.py +++ b/src/jobflow/core/store.py @@ -444,27 +444,23 @@ def remove_docs(self, criteria: dict): store.remove_docs({"job_uuid": doc["uuid"], "job_index": doc["index"]}) self.docs_store.remove_docs(criteria) - + def __eq__(self, other: object) -> bool: """ Check equality for JobStore Args: - other: other JobStore to compare with + other: other JobStore to compare with. """ if not isinstance(other, JobStore): return False - fields = [ - "docs_store", - "save", - "load" - ] + fields = ["docs_store", "save", "load"] # Check equality of all additional_stores if len(self.additional_stores) == len(other.additional_stores): if self.additional_stores == other.additional_stores: - return all(getattr(self, f) == getattr(other, f) for f in fields) - + return all(getattr(self, f) == getattr(other, f) for f in fields) + return False diff --git a/src/jobflow/managers/fireworks.py b/src/jobflow/managers/fireworks.py index 16b0725d..9407c02e 100644 --- a/src/jobflow/managers/fireworks.py +++ b/src/jobflow/managers/fireworks.py @@ -2,13 +2,13 @@ from __future__ import annotations +import os import typing from fireworks import FiretaskBase, Firework, FWAction, Workflow, explicit_serialize -from fireworks.utilities.fw_utilities import DataServer from fireworks.fw_config import DS_PASSWORD +from fireworks.utilities.fw_utilities import DataServer from maggma.stores.shared_stores import StoreFacade -import os if typing.TYPE_CHECKING: from typing import Sequence @@ -158,12 +158,12 @@ def run_task(self, fw_spec): if "FW_DATASERVER_PORT" in os.environ: ds = DataServer( address=("127.0.0.1", int(os.environ["FW_DATASERVER_PORT"])), - authkey=DS_PASSWORD + authkey=DS_PASSWORD, ) ds.connect() multistore = ds.MultiStore() store = StoreFacade(store, multistore) - else: + else: store.connect() if hasattr(self, "fw_id"): From 361c129446fdb10ec96f6616e7de07ad9d7ef6ac Mon Sep 17 00:00:00 2001 From: Eric Sivonxay Date: Tue, 23 May 2023 22:05:18 -0700 Subject: [PATCH 6/7] Remove an if statement and add a blank line --- src/jobflow/core/store.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/jobflow/core/store.py b/src/jobflow/core/store.py index d49a5483..647fb220 100644 --- a/src/jobflow/core/store.py +++ b/src/jobflow/core/store.py @@ -448,6 +448,7 @@ def remove_docs(self, criteria: dict): def __eq__(self, other: object) -> bool: """ Check equality for JobStore + Args: other: other JobStore to compare with. """ @@ -457,9 +458,8 @@ def __eq__(self, other: object) -> bool: fields = ["docs_store", "save", "load"] # Check equality of all additional_stores - if len(self.additional_stores) == len(other.additional_stores): - if self.additional_stores == other.additional_stores: - return all(getattr(self, f) == getattr(other, f) for f in fields) + if self.additional_stores == other.additional_stores: + return all(getattr(self, f) == getattr(other, f) for f in fields) return False From 21df1a6ad5539abc94389e612b07fc74ef117ee3 Mon Sep 17 00:00:00 2001 From: Eric Sivonxay Date: Wed, 24 May 2023 11:17:20 -0700 Subject: [PATCH 7/7] Update store.py --- src/jobflow/core/store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jobflow/core/store.py b/src/jobflow/core/store.py index 647fb220..3b3bb770 100644 --- a/src/jobflow/core/store.py +++ b/src/jobflow/core/store.py @@ -447,7 +447,7 @@ def remove_docs(self, criteria: dict): def __eq__(self, other: object) -> bool: """ - Check equality for JobStore + Check equality for JobStore. Args: other: other JobStore to compare with.