diff --git a/tm_admin/campaigns/campaigns.py b/tm_admin/campaigns/campaigns.py index 16fd9906..69ac1882 100755 --- a/tm_admin/campaigns/campaigns.py +++ b/tm_admin/campaigns/campaigns.py @@ -32,11 +32,12 @@ from cpuinfo import get_cpu_info from tm_admin.dbsupport import DBSupport from tm_admin.users.users_class import UsersTable -from osm_rawdata.postgres import uriParser, PostgresClient +from osm_rawdata.pgasync import PostgresClient from tm_admin.types_tm import Userrole from tqdm import tqdm +import tqdm.asyncio from codetiming import Timer -import threading +import asyncio # Instantiate logger log = logging.getLogger(__name__) @@ -61,75 +62,64 @@ def __init__(self, self.pg = None self.profile = UsersTable() self.types = dir(tm_admin.types_tm) - super().__init__('campaigns', dburi) + super().__init__('campaigns') - def mergeOrganizations(self): + async def mergeOrganizations(self, + inpg: PostgresClient, + ): """ A method to merge the contents of the TM campaign_organizations into the campaigns table as an array. + + Args: + inpg (PostgresClient): The input database """ + # FIXME: this is a weird table, and only has 4 entries, none of which appear + # to be in the other tables, so nothing updates. table = 'campaign_organisations' - pg = PostgresClient("localhost/tm4") - sql = f"SELECT row_to_json({table}) as row FROM {table} ORDER BY campaign_id" + sql = f"SELECT * FROM {table} ORDER BY campaign_id" # print(sql) - try: - result = pg.dbcursor.execute(sql) - except: - log.error(f"Couldn't execute query! {sql}") - return False + result = await inpg.execute(sql) - result = pg.dbcursor.fetchall() - index = 0 data = dict() - # pbar = tqdm(result) + pbar = tqdm.tqdm(result) for record in result: - entry = record[0] # there's only one item in the input data - if entry['campaign_id'] not in data: - data[entry['campaign_id']] = list() - data[entry['campaign_id']].append(entry['organisation_id']) - - # pbar = tqdm(result) - for cid, value in data.items(): - sql = f" UPDATE campaigns SET organizations = ARRAY{str(value)} WHERE id={cid};" + sql = f" UPDATE campaigns SET organizations = organizations||{record['organisation_id']} WHERE id={record['campaign_id']};" # print(sql) - self.pg.dbcursor.execute(sql) + await self.pg.execute(sql) - def mergeProjects(self): + async def mergeProjects(self, + inpg: PostgresClient, + ): """ A method to merge the contents of the TM campaign_projects into the campaigns table as an array. + + Args: + inpg (PostgresClient): The input database """ table = 'campaign_projects' - pg = PostgresClient("localhost/tm4") - sql = f"SELECT row_to_json({table}) as row FROM {table} ORDER BY campaign_id" + sql = f"SELECT * FROM {table} ORDER BY campaign_id" # print(sql) - try: - result = pg.dbcursor.execute(sql) - except: - log.error(f"Couldn't execute query! {sql}") - return False + result = await inpg.execute(sql) - result = pg.dbcursor.fetchall() index = 0 data = dict() - pbar = tqdm(result) + pbar = tqdm.tqdm(result) + for record in pbar: - entry = record[0] # there's only one item in the input data - if entry['campaign_id'] not in data: - data[entry['campaign_id']] = list() - data[entry['campaign_id']].append(entry['project_id']) - - # pbar = tqdm(result) - for cid, value in data.items(): - sql = f" UPDATE campaigns SET projects = ARRAY{str(value)} WHERE id={cid};" + sql = f" UPDATE campaigns SET projects = projects||{record['project_id']} WHERE id={record['campaign_id']};" # print(sql) - self.pg.dbcursor.execute(sql) + await self.pg.execute(sql) -def main(): +async def main(): """This main function lets this class be run standalone by a bash script.""" parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", nargs="?", const="0", help="verbose output") - parser.add_argument("-u", "--uri", default='localhost/tm_admin', help="Database URI") + parser.add_argument("-i", "--inuri", default='localhost/tm4', + help="Input database URI") + parser.add_argument("-o", "--outuri", default='localhost/tm_admin', + help="Output database URI") # parser.add_argument("-r", "--reset", help="Reset Sequences") args = parser.parse_args() @@ -149,10 +139,17 @@ def main(): stream=sys.stdout, ) - camp = CampaignsDB(args.uri) - camp.mergeProjects() - camp.mergeOrganizations() + inpg = PostgresClient() + await inpg.connect(args.inuri) + + camp = camp = CampaignsDB(args.inuri) + await camp.connect(args.outuri) + + await camp.mergeProjects(inpg) + await camp.mergeOrganizations(inpg) if __name__ == "__main__": """This is just a hook so this file can be run standalone during development.""" - main() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(main())