Skip to content

Commit

Permalink
email alert changes 2 (#76)
Browse files Browse the repository at this point in the history
* Grant Pipeline-New GARD Mapping Process

* Change get_node_counts

* Removal of leftover merge text

* More merge text removal

* gfkg and pakg bug fixes

* New CTKG pipeline

* aact edit

* changes for e alert

* email alert chges 2

---------

Co-authored-by: Devon Joseph Leadman <[email protected]>
  • Loading branch information
devonleadman and Devon Joseph Leadman authored Oct 22, 2024
1 parent 130529c commit a34b110
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 53 deletions.
41 changes: 41 additions & 0 deletions RDAS_MEMGRAPH_APP/Dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import os
import sys
workspace = os.path.dirname(os.path.abspath(__file__))
sys.path.append(workspace)
sys.path.append('/home/leadmandj/RDAS/')
sys.path.append(os.getcwd())
import sysvars
from AlertCypher import AlertCypher
from subprocess import *
from time import sleep
from datetime import datetime

class Dump ():
def __init__(self, mode='dev'):
if mode in ['dev','test','prod']:
self.mode = mode
else:
raise Exception

self.db = AlertCypher('system')

def dump_file (self, path, db_name):
self.db.run(f'STOP DATABASE {db_name}')
print(f'DATABASE {db_name} STOPPED')

p = Popen(['sudo', '/opt/neo4j/bin/neo4j-admin', 'database', 'dump', f'{db_name}', f'--to-path={path}', '--overwrite-destination'], encoding='utf8')
p.wait()
print(f'DATABASE {db_name} DUMPED AT {path}')

self.db.run(f'START DATABASE {db_name}')
print(f'DATABASE {db_name} RESTARTED')

def generate_backup_name (self, dump_name):
cur_date = datetime.now().strftime("%m-%d-%y")
return f'{self.mode}-{dump_name}-{cur_date}'

def copy_to_backup(self, dump_name):
filename = self.generate_backup_name(dump_name)
p = Popen(['sudo', 'cp', f'{sysvars.transfer_path}{dump_name}.dump', f'{sysvars.backup_path}{dump_name}/{filename}.dump'])
p.wait()
print(f'DATABASE DUMP PUT INTO BACKUP FOLDER AT {sysvars.backup_path}{dump_name}.dump')
91 changes: 91 additions & 0 deletions RDAS_MEMGRAPH_APP/Transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os
import sys
workspace = os.path.dirname(os.path.abspath(__file__))
sys.path.append(workspace)
sys.path.append('/home/leadmandj/RDAS/')
sys.path.append(os.getcwd())
import sysvars
from AlertCypher import AlertCypher
from subprocess import *
from time import sleep

class Transfer:
def __init__ (self, mode=None):
if mode in ['test','prod']:
self.mode = mode
else:
raise Exception

self.db = AlertCypher('system')

def seed(self, dump_name, dump_folder):
# dump_name = name of database (rdas.ctkg)
# dump_folder = path to database files directory (RDAS/transfer/)
try:
self.db.run(f'DROP DATABASE {dump_name}')
print('Dropped database...')

p = Popen(['sudo', '/opt/neo4j/bin/neo4j-admin', 'database', 'load', f'{dump_name}', f'--from-path={dump_folder}', '--overwrite-destination'], encoding='utf8')
p.wait()
print('Waiting 20 seconds for database load')
sleep(20)

p = Popen(['sudo', '/opt/neo4j/bin/neo4j', 'restart'], encoding='utf-8')
p.wait()
print('Waiting 20 seconds for database restart')
sleep(20)

server_id = self.db.run(f"SHOW servers YIELD * WHERE name = \'{self.mode}1\' RETURN serverId").data()[0]['serverId']
print(f'SERVER ID LOCATED:: {server_id}')

seed_query = f'CREATE DATABASE {dump_name} OPTIONS {{existingData: \'use\', existingDataSeedInstance: \'{server_id}\'}}'
print(seed_query)
print('Waiting 20 seconds for database seeding')
sleep(20)

except Exception as e:
print('Did not drop or load database...')
print(e)

def detect(self, path):
server = None
config_title = None
transfer_detection = {k:False for k in sysvars.dump_dirs}
last_updates = {k:str() for k in sysvars.dump_dirs}

if self.mode == 'test':
server = 'TEST'
config_title = 'TRANSFER'
elif self.mode == 'prod':
server = 'PROD'
else:
raise Exception

if path == sysvars.transfer_path:
config_title = 'TRANSFER'
elif path == sysvars.approved_path:
config_title = 'APPROVED'
else:
raise Exception

for db_name in sysvars.dump_dirs:
try:
# Gets current last update timestamp from config
last_mod_date = self.db.getConf(f'{server}_{config_title}_DETECTION', db_name)
# stores value to return later
last_updates[db_name] = last_mod_date
print(f'{db_name} last mod:: {last_mod_date}')
# Gets current modification timestamp of the targeted database dump file
cur_mod_date = os.path.getmtime(f"{path}{db_name}.dump")
print(f'{db_name} cur mod:: {cur_mod_date}')
cur_mod_date = str(cur_mod_date)

# If the config file last update and the current last modification date is different, detect as new transfer and update the config
if not cur_mod_date == last_mod_date:
transfer_detection[db_name] = True
self.db.setConf(f'{server}_{config_title}_DETECTION', db_name, cur_mod_date)
except Exception as e:
print(e)
transfer_detection[db_name] = False

return [transfer_detection,last_updates]
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@
import datetime
from subprocess import *
from time import sleep
print(os.getcwd())
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("-dir", "--dump_dir", dest = "dump_dir", help="directory name within the directory that stores the dump files")
args = parser.parse_args()

print('DUMPING DATABASE ON REMOTE SERVER')
p = Popen(['/opt/neo4j/bin/neo4j-admin', 'database', 'dump', f'{args.dump_dir}', f'--to-path={sysvars.transfer_path}', '--overwrite-destination'], encoding='utf8')
Expand Down
File renamed without changes.
9 changes: 6 additions & 3 deletions detect_approval.py → start_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from AlertCypher import AlertCypher
from subprocess import *
from time import sleep
import argparse
from RDAS_MEMGRAPH_APP.Dump import Dump


dump_module = Dump('test')
while True:
for db_name in sysvars.dump_dirs:
db = AlertCypher(db_name)
Expand All @@ -23,12 +25,13 @@
print(f'Database dump approved for {db_name}')
db.run('MATCH (x:UserTesting) DETACH DELETE x')

p = Popen(['sudo', 'cp', f'{sysvars.transfer_path}{db_name}.dump', f'{sysvars.approved_path}{db_name}.dump'], encoding='utf8')
p.wait()
dump_module.dump_file(sysvars.approved_path, db_name)

p = Popen(['sudo', 'chmod', '777', f'{sysvars.approved_path}{db_name}.dump'], encoding='utf8')
p.wait()



except Exception:
print(f'{db_name} read error')

Expand Down
4 changes: 0 additions & 4 deletions start_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
from datetime import date,datetime
from AlertCypher import AlertCypher
from RDAS_GARD.methods import get_node_counts
import firebase_admin
from firebase_admin import auth
from firebase_admin import credentials
from firebase_admin import firestore

prefix=sysvars.db_prefix
config_selection = {'ct':[prefix+'rdas.ctkg_update', 'ct_interval'], 'pm':[prefix+'rdas.pakg_update', 'pm_interval'], 'gnt':[prefix+'rdas.gfkg_update', 'gnt_interval']}
Expand Down
46 changes: 34 additions & 12 deletions start_prod.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,44 @@
import detect_transfer
import seed_cluster
from RDAS_MEMGRAPH_APP.Alert import Alert
from RDAS_MEMGRAPH_APP.Transfer import Transfer
from datetime import datetime

#!!! Script has to be ran with SUDO !!!

email_client = Alert()
transfer_module = Transfer('prod')
db = AlertCypher('system')
init = True

email_client = Alert()
db = AlertCypher('system')
while True:
# Detect new dumps in the production server
transfer_detection,last_updates = detect_transfer.detect('prod',sysvars.transfer_path)
new_dumps = [k for (k,v) in transfer_detection.items() if v]

# Seed the seed cluster with the new dumps
for db_name in new_dumps:
last_update_obj = datetime.fromtimestamp(last_updates[db_name])
seed_cluster.seed(db_name,sysvars.transfer_path,'prod')
email_client.trigger_email([db_name], date_start=datetime.strftime(last_update_obj, "%m/%d/%y"))

# Sleep for a minute before checking for new dumps again
sleep(60)
try:
print('checking for update...')
# Detect new dumps in the production server
transfer_detection,last_updates = transfer_module.detect(sysvars.transfer_path)
approval_detection,last_updates_approval = transfer_module.detect(sysvars.approved_path)
new_dumps = [k for (k,v) in transfer_detection.items() if v]
new_dumps_approval = [k for (k,v) in approval_detection.items() if v]

# Sets the current db files last transfer date to today so it doesnt load and send emails upon script starting
if init:
init = False
continue

# Seed the seed cluster with the new dumps
for db_name in new_dumps:
print('update found::', db_name)
last_update_obj = datetime.fromtimestamp(float(last_updates[db_name]))
print('starting database loading')
transfer_module.seed(db_name, sysvars.transfer_path)
print('starting email service')
email_client.trigger_email([db_name], date_start=datetime.strftime(last_update_obj, "%m/%d/%y"))

# Sleep for a minute before checking for new dumps again
sleep(60)
except Exception as e:
print(e)


43 changes: 24 additions & 19 deletions start_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,50 @@
from subprocess import *
from time import sleep
import argparse
import detect_transfer
from seed_cluster import seed
from emails.alert import send_email,setup_email_client
#import file_transfer
from RDAS_MEMGRAPH_APP.Alert import Alert
from RDAS_MEMGRAPH_APP.Transfer import Transfer
from RDAS_MEMGRAPH_APP.Dump import Dump

"""
This script constantly checks the test server for 2 things: whether a dump file was sent to the test server,
or if a dump file was sent to the approved folder.
The file sent TO the test server will be sent manually while the approval file will be sent automatically via a script (start_approval.py)
"""

email_client = Alert('test')
transfer_module = Transfer('test')
dump_module = Dump('test')
while True:
# Detects all new dump files in the transfer folder of the TEST server
transfer_detection = detect_transfer.detect('test', sysvars.transfer_path)
new_dumps = [k for (k,v) in transfer_detection.items() if v]
#new_dumps = ['gard']
# Seeds all 3 clusters in the TEST server so that the databases will be visible

transfer_detection = transfer_module.detect('test', sysvars.transfer_path)
new_dumps,lastupdates = [k for (k,v) in transfer_detection.items() if v]
for db_name in new_dumps:
seed(db_name,sysvars.transfer_path,'test')
transfer_module.seed(db_name,sysvars.transfer_path)
print('database seeded within cluster')

for recip in sysvars.contacts:
for recip in sysvars.contacts['test']:
sub = '[RDAS] ACTION REQUIRED - New Dump Uploaded to Test Server'
msg = f'New dump uploaded to test for database {db_name}'
html = f'''<p>A new dump file has been uploaded to the test databases</p>
<p>database effected: {db_name}</p>
<p>To approve the database to be transfered to production, log in to the databases browser and select the effected database</p>
<p>Run the following Cypher Query:</p>
<p>MATCH (x:UserTesting) SET x.Approved = \"True\"</p>'''
send_email(sub,msg,recip,html=html,client=setup_email_client())
email_client.send_email(sub,msg,recip,html=html)
print(f'Notification email sent to {recip}')

print('Waiting for 1 minute before checking for approval...')
sleep(60)

transfer_detection = detect_transfer.detect('test', sysvars.approved_path)
new_dumps = [k for (k,v) in transfer_detection.items() if v]
# Detects if a new dump file was loaded into the approved folder
transfer_detection = transfer_module.detect('test', sysvars.approved_path)
new_dumps,lastupdates = [k for (k,v) in transfer_detection.items() if v]
print(new_dumps)

for db_name in new_dumps:
print(f'Update approved for {db_name}')
db = AlertCypher(db_name)

print(f'Update approved for {db_name}')

send_url = sysvars.rdas_urls['prod']
p = Popen(['scp', f'{sysvars.approved_path}{db_name}.dump', f'{sysvars.current_user}@{send_url}:{sysvars.transfer_path}{db_name}.dump'], encoding='utf8')
p.wait()
Expand All @@ -59,9 +64,9 @@
html = f'''<p>A dump file on the test server has been approved to move to production</p>
<p>Database effected: {db_name}</p>
<p>There are no other actions required on your end</p>'''
send_email(sub,msg,recip,html=html,client=setup_email_client())
email_client.send_email(sub,msg,recip,html=html)
print(f'Notification email sent to {recip}')

# Waits one hour before retrying process
sleep(3600)
# Waits one minute before retrying process
sleep(60)

16 changes: 6 additions & 10 deletions sysvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,13 @@
# base_path = '/home/{current_user}/RDAS_master/{base_directory_name}/'.format(current_user=current_user, base_directory_name=base_directory_name)

# RDAS Team contacts for emails
contacts = [
"[email protected]"
]
contacts = {
"dev": ["[email protected]"],
"test": ["[email protected]","[email protected]"]
}

base_path = '/home/{current_user}/{base_directory_name}/'.format(current_user=current_user, base_directory_name=base_directory_name)

# RDAS Team contacts for emails
contacts = [
"[email protected]"
]

# Folder paths
backup_path = '{base_path}backup/'.format(base_path=base_path)
transfer_path = '{base_path}transfer/'.format(base_path=base_path)
Expand All @@ -41,8 +37,8 @@
gard_db = db_prefix+gard_db_name

# Conversions
dump_dirs = ['RDAS.CTKG','RDAS.PAKG','RDAS.GFKG','RDAS.GARD']
db_abbrevs = {'ct':'RDAS.CTKG', 'pm':'RDAS.PAKG', 'gnt':'RDAS.GFKG'}
dump_dirs = ['rdas.ctkg','rdas.pakg','rdas.gfkg','rdas.gard']
db_abbrevs = {'ct':'rdas.ctkg', 'pm':'rdas.pakg', 'gnt':'rdas.gfkg'}
db_abbrevs2 = {ct_db:'ct', pm_db:'pm', gnt_db:'gnt'}

# Paths to database creation and update source files
Expand Down

0 comments on commit a34b110

Please sign in to comment.