Skip to content

Commit

Permalink
working on jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
frapercan committed Jul 1, 2024
1 parent eab7536 commit a6e3ceb
Show file tree
Hide file tree
Showing 11 changed files with 500 additions and 337 deletions.
2 changes: 1 addition & 1 deletion data/uniprot_ids.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
UniProt IDs
ID
A0ZZM2
A5GD93
Q864Q6
Expand Down
109 changes: 109 additions & 0 deletions protein_metamorphisms_is/base/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import multiprocessing
import queue
import time
from abc import ABC, abstractmethod

from redis import Redis
from rq import Queue

from protein_metamorphisms_is.helpers.logger.logger import setup_logger
from protein_metamorphisms_is.sql.base.database_manager import DatabaseManager


class PipelineBase(ABC):
"""
An abstract base class for operating bioinformatics data.
This class provides a framework for connecting to and interacting with various
bioinformatics data sources. It is designed to be subclassed with specific
implementations for different data sources.
Attributes:
conf (dict): Configuration parameters for the extractor.
logger (Logger): Logger object for logging information.
session (Session, optional): A SQLAlchemy session for database interactions.
Args:
conf (dict): Configuration dictionary containing necessary parameters.
session_required (bool): Flag to indicate if a database session is required.
"""

def __init__(self, conf, session_required=False):
"""
Initialize the ExtractorBase class.
Sets up the configuration and logger. Initializes the database session if required.
"""
self.conf = conf
self.logger = setup_logger(self.__class__.__name__)
self.logger.info(f"Initializing {self.__class__.__name__}")

self.redis_conn = Redis(host='localhost', port=6379, db=0)
self.process_queue = Queue(connection=self.redis_conn)
self.data_queue = multiprocessing.Queue()

self.queues = {}

if session_required:
self.session_init()

def session_init(self):
"""
Initialize the database session using DatabaseManager.
Sets up the database connection and session using the DatabaseManager class.
"""
self.logger.info("Initializing database session using DatabaseManager")
db_manager = DatabaseManager(self.conf)
self.engine = db_manager.get_engine()
self.session = db_manager.get_session()

@abstractmethod
def set_tasks(self):
pass

def start(self):
"""
Start the data extraction process.
This abstract method should be implemented by all subclasses to define
the specific data extraction logic for each bioinformatics data source.
"""
self.set_tasks()
# self.set_targets()
# self.start_db_process()
# self.fetch()
# self.data_queue.put(None) # Señal de terminación para el proceso de base de datos
# self.db_process.join() # Esperar a que el proceso de base de datos finalice

def start_db_process(self):
"""Inicia el proceso que maneja la inserción en la base de datos."""
self.db_process = multiprocessing.Process(target=self.add_to_db)
self.db_process.start()

def add_to_db(self):
"""
Procesa elementos de la cola y los añade a la base de datos.
"""
while True:
data = self.data_queue.get()
if data is None: # Verificar si es la señal de terminación
break
try:
self.store_entry(data)
except Exception as e:
self.logger.error(f"Error processing data: {str(e)}")
self.session.close()
self.logger.info("Database session closed and process ending.")

# @abstractmethod
# def set_targets(self):
# pass
#
# @abstractmethod
# def fetch(self):
# pass
#
# @abstractmethod
# def store_entry(self, data):
# pass
File renamed without changes.
8 changes: 4 additions & 4 deletions protein_metamorphisms_is/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#System
max_workers: 6
max_workers: 25
binaries_path: '../binaries'

# DB CONFIGURATION
Expand All @@ -12,8 +12,8 @@ DB_NAME: BioData
## Information System/
# Uniprot Extraction

load_accesion_csv: false
load_accesion_column: false
load_accesion_csv: ../data/uniprot_ids.csv
load_accesion_column: ID
tag: 'GOA'

search_criteria: '(taxonomy_id:212035) AND (structure_3d:true)'
Expand Down Expand Up @@ -54,7 +54,7 @@ structural_alignment:
# Embedding
embedding:
types:
- 1 # ESM
# - 1 # ESM
- 2 # Prost

# GO Metrics
Expand Down
5 changes: 5 additions & 0 deletions protein_metamorphisms_is/information_system/base/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import time
from abc import ABC, abstractmethod

from redis import Redis
from rq import Queue

from protein_metamorphisms_is.helpers.logger.logger import setup_logger
from protein_metamorphisms_is.sql.base.database_manager import DatabaseManager

Expand Down Expand Up @@ -35,6 +38,8 @@ def __init__(self, conf, session_required=False):
self.logger = setup_logger(self.__class__.__name__)
self.logger.info(f"Initializing {self.__class__.__name__}")

self.redis_conn = Redis(host='localhost', port=6379, db=0) # Asegúrate de configurar según tu entorno de Redis
self.process_queue = Queue(connection=self.redis_conn)
self.data_queue = multiprocessing.Queue()

if session_required:
Expand Down
Loading

0 comments on commit a6e3ceb

Please sign in to comment.