Skip to content

Commit

Permalink
Refactor create_db
Browse files Browse the repository at this point in the history
  • Loading branch information
m-appel committed Aug 21, 2024
1 parent 2f2c363 commit b38822a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 50 deletions.
6 changes: 6 additions & 0 deletions config.json.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
{
"archive" {
"host": "",
"user": "",
"base_path": ""
},

"cache": {
"directory": "tmp/",
"duration_in_days": 6
Expand Down
97 changes: 48 additions & 49 deletions create_db.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import argparse
import importlib
import json
import logging
import os
# import shutil
import sys
from datetime import datetime, timezone
from time import sleep
import datetime
import paramiko
from scp import SCPClient

import arrow
import docker
import paramiko
from scp import SCPClient

from send_email import send_email

NEO4J_VERSION = '5.16.0'

def create():

today = arrow.utcnow()
date = f'{today.year}-{today.month:02d}-{today.day:02d}'
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--archive', action='store_true', help='push dump to archive server')
args = parser.parse_args()

today = datetime.now(tz=timezone.utc)
date = today.strftime('%Y-%m-%d')

# Use the current directory as root.
root = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -28,20 +31,19 @@ def create():
if not root:
sys.exit('Please configure a root path.')

dump_dir = os.path.join(root, 'dumps', f'{today.year}/{today.month:02d}/{today.day:02d}', '')
dump_dir = os.path.join(root, 'dumps', today.strftime('%Y/%m/%d'))

os.makedirs(dump_dir, exist_ok=True)

# Initialize logging
scriptname = sys.argv[0].replace('/', '_')[0:-3]
FORMAT = '%(asctime)s %(processName)s %(message)s'
logging.basicConfig(
format=FORMAT,
filename=f'{dump_dir}iyp-{date}.log',
level=logging.WARNING,
filename=os.path.join(dump_dir, f'iyp-{date}.log'),
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.warning('Started: %s' % sys.argv)
logging.info(f'Started: {sys.argv}')

# Load configuration file
with open('config.json', 'r') as fp:
Expand Down Expand Up @@ -91,7 +93,7 @@ def create():
# overkill.
last_msg = container.logs(stderr=False, tail=1)
if last_msg.endswith(b'Started.\n'):
logging.warning('Container ready.')
logging.info('Container ready.')
container_ready = True
break

Expand All @@ -107,75 +109,70 @@ def create():
container.stop()
sys.exit('Problem while starting the container.')

# ######### Fetch data and feed to neo4j ##########

# ########## Fetch data and feed to neo4j ##########

class RelationCountError(Exception):
def __init__(self, message):
self.message = message
super().__init__(self.message)

logging.warning('Fetching data...')
logging.info('Fetching data...')
status = {}
no_error = True
for module_name in conf['iyp']['crawlers']:
try:
module = importlib.import_module(module_name)
logging.warning(f'start {module}')
logging.info(f'start {module}')
name = module_name.replace('iyp.crawlers.', '')
crawler = module.Crawler(module.ORG, module.URL, name)
crawler.run()
passed = crawler.unit_test()
crawler.close()
if not passed:
error_message = (
f'Did not receive data from the crawler "{name}": '
)
error_message = f'Did not receive data from crawler {name}'
raise RelationCountError(error_message)
status[module_name] = 'OK'
logging.warning(f'end {module}')
logging.info(f'end {module}')
except RelationCountError as relation_count_error:
no_error = False
logging.error(relation_count_error)
status[module_name] = relation_count_error
send_email(relation_count_error)
except Exception as e:
no_error = False
logging.exception('crawler crashed!!')
logging.error('crawler crashed!')
status[module_name] = e
send_email(e)


# ######### Post processing scripts ##########

logging.warning('Post-processing...')
logging.info('Post-processing...')
for module_name in conf['iyp']['post']:
module = importlib.import_module(module_name)

try:
logging.warning(f'start {module}')
logging.info(f'start {module}')
post = module.PostProcess()
post.run()
post.close()
status[module_name] = 'OK'
logging.warning(f'end {module}')
logging.info(f'end {module}')

except Exception as e:
no_error = False
logging.error('crawler crashed!!\n')
logging.error('crawler crashed!')
logging.error(e)
logging.error('\n')
status[module_name] = e


# ######### Stop container and dump DB ##########

logging.warning('Stopping container...')
logging.info('Stopping container...')
container.stop(timeout=180)

logging.warning('Dumping database...')
if os.path.exists(f'{dump_dir}/neo4j.dump'):
os.remove(f'{dump_dir}/neo4j.dump')
logging.info('Dumping database...')
dump_file = os.path.join(dump_dir, 'neo4j.dump')
if os.path.exists(dump_file):
os.remove(dump_file)

# make sure the directory is writable for any user
os.chmod(dump_dir, 0o777)
Expand All @@ -191,37 +188,39 @@ def __init__(self, message):
dump_dir: {'bind': '/dumps', 'mode': 'rw'},
}
)

# Delete the data volume once the dump been created
client.volumes.get(neo4j_volume).remove()

# rename dump
os.rename(f'{dump_dir}/neo4j.dump', f'{dump_dir}/iyp-{date}.dump')

final_words = ''
os.rename(dump_file, os.path.join(dump_dir, f'iyp-{date}.dump'))

if not no_error:
# TODO send an email

# Add the log line to indicate to autodeploy that there were errors
final_words += f'\nErrors: {" ".join((k for k in status))}'
logging.error(f'there was errors!\n')
final_words = f'\nErrors: {" ".join((k for k in status))}'
logging.error('There were errors!')
else:
final_words = 'No error :)'
# Delete tmp file in cron job
# shutil.rmtree(tmp_dir)

logging.warning(f'Finished: {sys.argv} {final_words}')
logging.info(f'Finished: {sys.argv} {final_words}')

if args.archive:
# Push the dump and log to ihr archive
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.connect(conf['archive']['host'], username=conf['archive']['user'])

# Push the dump and log to ihr archive
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.connect(conf['archive_server'], username=conf['archive_user'])
dest = os.path.join(conf['archive']['base_path'], today.strftime('%Y/%m/%d'))
ssh.exec_command(f'mkdir -p {dest}')

dest = os.path.join(conf['archive_base_path'], f'{today.year}/{today.month:02d}/{today.day:02d}', '')
ssh.exec_command(f'mkdir -p {dest}')
with SCPClient(ssh.get_transport()) as scp:
scp.put(dump_dir, recursive=True, remote_path=dest)

with SCPClient(ssh.get_transport()) as scp:
scp.put(dump_dir, recursive=True, remote_path=dest)

if __name__ == '__main__':
create()
main()
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ flake8
pre-commit
PyGithub
clickhouse_driver
pyspark
pyspark
paramiko
scp

0 comments on commit b38822a

Please sign in to comment.