-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdaemon.py
69 lines (58 loc) · 2.05 KB
/
daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import os
import traceback
import redis
import json
import requests
from dotenv import load_dotenv
from engine import Matrix
from concurrent.futures import ThreadPoolExecutor
load_dotenv()
REDIS_URL = os.getenv('REDIS_QUEUE_URL', 'redis://localhost:6379')
QUEUE_NAME = os.getenv('QUEUE_NAME', 'simulation_jobs')
DISCORD_URL = os.getenv('DISCORD_URL')
# Connect to Redis
redis_conn = redis.from_url(REDIS_URL)
def notify_discord(msg):
if DISCORD_URL:
requests.post(DISCORD_URL, json={'content': msg})
# get this out of here or refactor with engine
def update_from_env(config):
for key, value in config.items():
env_var = os.getenv(key)
if env_var is not None:
# Update the value from the environment variable
config[key] = type(value)(env_var) if value is not None else env_var
return config
def load_config():
filename = "configs/defaults.json"
with open(filename, 'r') as file:
config = json.load(file)
config = update_from_env(config)
return config
def process_simulation(data):
try:
#config = load_config()
config = data
config['scenario'] = data.copy()
config['environment'] = "configs/largev2.tmj"
notify_discord(f"starting simulation: #{config}")
matrix = Matrix(config)
matrix.boot()
matrix.run_singlethread()
notify_discord(f"finished simulation: #{config}")
except Exception as e:
print(f'Error processing simulation: {e}')
traceback.print_exc()
def main():
print('Starting simulation job daemon...')
max_concurrent_jobs = 2
with ThreadPoolExecutor(max_workers=max_concurrent_jobs) as executor:
while True:
# Fetch a job from the Redis queue
_, job = redis_conn.blpop(QUEUE_NAME)
job_data = json.loads(job)
# Submit the job to the ThreadPoolExecutor
future = executor.submit(process_simulation, job_data)
future.add_done_callback(lambda _: print('Job completed.'))
if __name__ == '__main__':
main()