forked from dptech-corp/dflow
-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathtest_dispatcher.py
109 lines (87 loc) · 2.83 KB
/
test_dispatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import time
from pathlib import Path
from typing import List
from dflow import Step, Workflow, download_artifact, upload_artifact
from dflow.plugins.dispatcher import DispatcherExecutor
from dflow.python import OP, OPIO, Artifact, OPIOSign, PythonOPTemplate
class Duplicate(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'msg': str,
'num': int,
'foo': Artifact(Path),
'idir': Artifact(Path),
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
'msg': List[str],
'bar': Artifact(Path),
'odir': Artifact(Path),
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
op_out = OPIO({
"msg": [op_in["msg"] * op_in["num"]],
"bar": Path("output.txt"),
"odir": Path("todir"),
})
content = open(op_in['foo'], "r").read()
open("output.txt", "w").write(content * op_in["num"])
Path(op_out['odir']).mkdir()
for ii in ['f1', 'f2']:
(op_out['odir']/ii).write_text(op_in['num']
* (op_in['idir']/ii).read_text())
return op_out
def make_idir():
idir = Path("tidir")
idir.mkdir(exist_ok=True)
(idir / "f1").write_text("foo")
(idir / "f2").write_text("bar")
if __name__ == "__main__":
wf = Workflow(name="dispatcher")
with open("foo.txt", "w") as f:
f.write("Hi")
make_idir()
artifact0 = upload_artifact("foo.txt")
artifact1 = upload_artifact("tidir")
print(artifact0)
print(artifact1)
# run ../scripts/start-slurm.sh first to start up a slurm cluster
import socket
def get_my_ip_address():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
dispatcher_executor = DispatcherExecutor(
host=get_my_ip_address(),
username="root",
port=31129,
queue_name="normal",
remote_root="/data",
password="password",
)
step = Step(
name="step",
template=PythonOPTemplate(Duplicate),
parameters={"msg": "Hello", "num": 3},
artifacts={"foo": artifact0, "idir": artifact1},
executor=dispatcher_executor
)
wf.add(step)
wf.submit()
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert(wf.query_status() == "Succeeded")
step = wf.query_step(name="step")[0]
assert(step.phase == "Succeeded")
print(download_artifact(step.outputs.artifacts["bar"]))
print(download_artifact(step.outputs.artifacts["odir"]))
print(step.outputs.parameters["msg"].value,
type(step.outputs.parameters["msg"].value))