-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
61 lines (50 loc) · 2.21 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import os
import traceback
import robonomicsinterface as RI
import requests
import docker
client = docker.from_env()
ROBONOMICS_LISTEN_ROBOT_ACCOUNT = os.environ.get("ROBONOMICS_LISTEN_ROBOT_ACCOUNT",
"4FNQo2tK6PLeEhNEUuPePs8B8xKNwx15fX7tC2XnYpkC8W1j")
IPFS_COMMAND_GATEWAY = os.getenv('IPFS_COMMAND_GATEWAY', 'https://merklebot.mypinata.cloud/ipfs')
def robonomics_transaction_callback(data, launch_event_id):
sender, recipient, command_params_32_bytes = data
command_params_ipfs_hash = RI.ipfs_32_bytes_to_qm_hash(command_params_32_bytes)
task = requests.get(f'{IPFS_COMMAND_GATEWAY}/{command_params_ipfs_hash}').json()
print('Got task from', sender)
if task['task_type']=='code_execution':
code = task['code']
execute_code(code, launch_event_id)
def execute_code(code, launch_event_id):
print('Starting container with code for launch', launch_event_id)
output = start_container(code)
def launch_robonomics_subsciber():
interface = RI.Account(remote_ws="wss://kusama.rpc.robonomics.network")
print("Robonomics subscriber starting...")
subscriber = RI.Subscriber(interface, RI.SubEvent.NewLaunch, robonomics_transaction_callback,
ROBONOMICS_LISTEN_ROBOT_ACCOUNT)
def build_image():
try:
image = client.images.get('code_executor:latest')
except:
print('Image not found. Building...')
client.images.build(path='container', tag='code_executor')
def start_container(code):
with open("code/python_code_for_execution.py", "w") as f:
f.write(code)
try:
output = client.containers.run("code_executor:latest", f"python3.8 /code/python_code_for_execution.py", detach=False, environment=open('envs.txt', 'r').read().split('\n'),
volumes={"/home/spot/robonomics-remote-launch/code": {'bind': '/code', 'mode': 'rw'}})
print('Code executed')
print('Output:\n___________')
print(output)
print('___________')
return output
except:
traceback.print_exc()
def main():
print('Getting image...')
build_image()
launch_robonomics_subsciber()
if __name__=='__main__':
main()