Skip to content

Commit

Permalink
WIP additional modes #91
Browse files Browse the repository at this point in the history
  • Loading branch information
dantheta committed Mar 20, 2021
1 parent 6607e55 commit 5bd72af
Showing 1 changed file with 53 additions and 27 deletions.
80 changes: 53 additions & 27 deletions importers/nominet/import-nominet.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,29 @@
import psycopg2
import paramiko

cfg = None
args = None

def get_parser():
parser = argparse.ArgumentParser(description="Import and process nominet zone additions")
parser.add_argument('--config', '-c', default='import-nominet.cfg', help="Path to config file")
parser.add_argument('--verbose', '-v', action='store_true', default=False, help="Verbose mode")
parser.add_argument('--no-submit', '-N', action='store_true', default=False, help="Do not submit to API")
parser.add_argument('--debug', action='store_true', default=False, help="Verbose mode")

parsers = parser.add_subparsers(dest='mode', help="Select mode")

subp = parsers.add_parser('fetch')
subp.add_argument('--no-submit', '-N', action='store_true', default=False, help="Do not submit to API")
subp.add_argument('--keep', '-K', action='store_true', default=False, help="Keep old downloaded files")

subp2 = parsers.add_parser('resubmit')
subp2.add_argument('--age', '-d', type=int, help="Resubmit at age")

return parser


def connect(cfg):
def connect_ssh():
transport = paramiko.Transport((cfg.get('sftp', 'host'), 22))
transport.connect(None,
cfg.get('sftp', 'user'),
Expand All @@ -36,16 +48,16 @@ def connect(cfg):
return sftp


def connect_db(cfg):
def connect_db():
return psycopg2.connect(host=cfg.get('db', 'host'),
user=cfg.get('db', 'user'),
password=cfg.get('db', 'password'),
dbname=cfg.get('db', 'dbname'),
)


def unpack(cfg, filename):
workdir = get_workdir(cfg, filename)
def unpack(filename):
workdir = get_workdir(filename)
try:
os.mkdir(workdir)
except:
Expand All @@ -66,15 +78,15 @@ def sortfile(filename):
return filename + '.sorted'


def get_workdir(cfg, filename):
def get_workdir(filename):
tmpname, _ = os.path.splitext(filename)
return os.path.join(cfg.get('paths', 'download'), tmpname)


def compare(cfg, filename):
def compare(filename):

dbdumpname = 'db-dump-' + getdate().strftime('%Y%m%d') + '.csv'
dbdumppath = os.path.join(get_workdir(cfg, filename), dbdumpname)
dbdumppath = os.path.join(get_workdir(filename), dbdumpname)

prevdump = glob.glob(os.path.join(cfg.get('paths', 'download'), 'prev', 'db-dump*.csv'))[0]

Expand Down Expand Up @@ -104,6 +116,7 @@ def resolve(name):
logging.warn("Error resolving: %s: %s", prefix+name, str(err))
return name, None


def resolve_iter(it):
for name in it:
res = resolve(name)
Expand All @@ -124,7 +137,7 @@ def dbstore(conn, name, resolved):
conn.commit()


def submit_api(cfg, domain):
def submit_api(domain):
opts = dict(cfg.items('api'))

req = requests.post(opts['baseurl'] + "submit/url",
Expand All @@ -137,7 +150,7 @@ def submit_api(cfg, domain):
logging.debug("API post result: %s", req.status_code)


def relink_prev(cfg, filename):
def relink_prev(filename):
tmpname, _ = os.path.splitext(filename)

try:
Expand All @@ -148,52 +161,65 @@ def relink_prev(cfg, filename):
os.symlink(tmpname, os.path.join(cfg.get('paths', 'download'), 'prev'))


def main():
parser = get_parser()
args = parser.parse_args()

logging.basicConfig(level=logging.DEBUG if args.debug else
logging.INFO if args.verbose else logging.WARN,
format="%(asctime)s\t%(levelname)s\t%(message)s",
datefmt="[%Y-%m-%d %H:%M:%S")

cfg = configparser.RawConfigParser()
cfg.read([args.config])

def fetch():
try:
os.makedirs(cfg.get('paths', 'download'))
except:
pass
pass

dt = getdate()
filename = dt.strftime('ukdata-%Y%m%d.zip')
logging.info("Bundle: %s", filename)

destpath = os.path.join(cfg.get('paths', 'download'), filename)
if not os.path.isfile(destpath):
sftp = connect(cfg)
sftp = connect_ssh()
logging.info("Retrieving: %s", filename)
sftp.get(filename, destpath)
sftp.close()
else:
logging.info("Already downloaded: %s", filename)


unpack(cfg, filename)
unpack(filename)

pool = multiprocessing.Pool(cfg.getint('worker', 'threads'))

conn = connect_db(cfg)
conn = connect_db()

for (name, resolvedname) in pool.imap_unordered(resolve, compare(cfg, filename), chunksize=16):
dbstore(conn, resolvedname or name, resolvedname is not None)
logging.info("Got: %s %s", name, resolvedname)
if resolvedname:
submit_api(cfg, resolvedname)
submit_api(resolvedname)
if args.debug:
break

relink_prev(cfg, filename)
relink_prev(filename)


def resubmit():
pass


def main():
global args
global cfg
parser = get_parser()
args = parser.parse_args()

logging.basicConfig(level=logging.DEBUG if args.debug else
logging.INFO if args.verbose else logging.WARN,
format="%(asctime)s\t%(levelname)s\t%(message)s",
datefmt="[%Y-%m-%d %H:%M:%S")

cfg = configparser.RawConfigParser()
cfg.read([args.config])

if args.mode == 'fetch':
fetch()
elif args.mode == 'resubmit':
resubmit()


if __name__ == '__main__':
Expand Down

0 comments on commit 5bd72af

Please sign in to comment.