-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplexil_plan_translator.py
347 lines (280 loc) · 13 KB
/
plexil_plan_translator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# Translate a plan, which is synthesized using a PRISM model, to a PLEXIL plan.
import os
import sys
import json
from math import sqrt
class Scenario:
EXCAVATION = 1
UNKNOWN = 2
num_str_map = {
1: "Excavation",
2: "Unknown"}
@classmethod
def get_scenario_str(cls, scenario_num):
# ToDo
assert scenario_num <= cls.UNKNOWN
return cls.num_str_map[scenario_num]
class PlexilPlanTranslator():
def __init__(self):
self.scenarios_map = {
"Excavation": Scenario.EXCAVATION,
"Unknown": Scenario.UNKNOWN
}
# prism_syn_plan: [action1, action2, ...]
# plexil_plan_name is the name of the plexil node.
def translate(self, scenario_ID, runtime_info, plexil_plan_name, plexil_plan_dir, prism_syn_plan, faults):
scenario = self.scenarios_map[scenario_ID]
# Parse prism_syn_plan
print("Parsing the synthesized plan by PRISM...")
plan_info = self.parse_syn_plan(scenario, runtime_info, prism_syn_plan)
plan_info["node_name"] = plexil_plan_name
# Generate the PLEXIL code
print("Generating the PLEXIL code...")
code = self.generate_code(scenario, plan_info, faults)
plexil_plan_fp = os.path.join(plexil_plan_dir, plexil_plan_name+".plp")
with open(plexil_plan_fp, "w") as outfile:
outfile.write(code)
# [Section: Parse Synthesized Plan By PRISM Model]
def parse_syn_plan(self, scenario, runtime_info, prism_syn_plan):
plan_info = {}
if scenario == Scenario.EXCAVATION:
plan_info = self.parse_syn_plan_exca(runtime_info, prism_syn_plan)
else:
print("[Plan Translation] Unknown scenario: " + Scenario.get_scenario_str(scenario))
return plan_info
# Excavation
def parse_syn_plan_exca(self, runtime_info, prism_syn_plan):
plan_info = {'sel_xloc': {}, 'sel_dloc': {}}
syn_plan = prism_syn_plan[1:-1].split(', ')
# Currently only support the case of max_tried=1,
# which assumes that each action starts with "select_"
sel_xloc_ID = syn_plan[0][7:]
sel_dloc_ID = syn_plan[1][7:]
sel_xloc = runtime_info['xloc_list'][sel_xloc_ID]
sel_dloc = runtime_info['dloc_list'][sel_dloc_ID]
print("[Synthesized Excavation Plan]:")
print(">>> selected excavation location: " + sel_xloc["name"] + ":")
print(json.dumps(sel_xloc, indent=4, sort_keys=True))
print(">>> selected dump location: " + sel_dloc["name"] + ":")
print(json.dumps(sel_dloc, indent=4, sort_keys=True))
plan_info['sel_xloc'] = sel_xloc
plan_info['sel_dloc'] = sel_dloc
return plan_info
# [Section: PLEXIL Code Generation]
def generate_code(self, scenario, plan_info, faults):
code = ""
if scenario == Scenario.EXCAVATION:
code = self.gen_exca_scenario(plan_info, faults)
else:
print("[Plan Translation] Unknown scenario: " + scenario_ID)
return code
# Excavation
def gen_exca_scenario(self, plan_info, faults):
node_name = plan_info["node_name"]
sel_xloc = plan_info["sel_xloc"]
sel_dloc = plan_info["sel_dloc"]
xloc_x = sel_xloc['position']['x']
xloc_y = sel_xloc['position']['y']
xloc_sv = sel_xloc['sci_val']
xloc_ep = sel_xloc['ex_prob']
dloc_x = sel_dloc['position']['x']
dloc_y = sel_dloc['position']['y']
plan_status_varname = "planSuccess"
faulty_ex_prob = -1
if len(faults) != 0:
faulty_ex_prob = faults['excavation']['ex_prob']
print("Generating the PLEXIL code for excavation scenario...")
code = ""
code += self.gen_plan_desc()
code += self.gen_include_files(Scenario.EXCAVATION)
code += node_name + ":\n"
code += "{\n"
# ToDo: implement gen_variables() to handel variable declarations
code += "\tBoolean " + plan_status_varname + ";\n\n"
code += self.gen_plan_info(1, plan_info)
# Simulate the fault introduced into the excavatability
if (faulty_ex_prob >=0 ) and (faulty_ex_prob <= 1):
msg = "[Fault Injection] A fault has been introduced to the lander system that causes the excavatability, the probability of successful excavation, to drop."
msg += " The excavatability drops from " + str(xloc_ep) + " to " + str(xloc_ep*faulty_ex_prob) + "."
code += self.gen_info(1, msg=msg)
xloc_ep = xloc_ep * faulty_ex_prob
code += self.gen_unstow(1)
code += self.gen_guarded_move(1, xloc_x, xloc_y)
msg = "Start the excavation using the grind at " + str(sel_xloc) + "..."
body_code = self.gen_grind(2, xloc_x, xloc_y, msg=msg)
body_code_ds = ""
msg = "Removing tailing...\\n\\tCollecting the tailing using the scoop...\\n"
body_code_ds += self.gen_dig_circular(3, xloc_x, xloc_y, msg=msg)
msg = "\\tMoving the tailing to " + str(sel_dloc)
body_code_ds += self.gen_deliver(3, dloc_x, dloc_y, msg=msg)
msg = "Digging failed."
# Simulate the status of digging operation
dig_success_condition = "Lookup(DiggingSuccess(" + str(xloc_ep) + "))"
body_code += self.gen_dig_success_cond(2, dig_success_condition, plan_status_varname, body_code_ds, msg=msg)
msg = "Skipping grind and dig."
ground_found_cond = "Lookup(GroundFound)"
code += self.gen_ground_found_cond(1, body_code, ground_found_cond, plan_status_varname, msg=msg)
code += self.gen_stow(1)
update_node_name = "ExcavationStatus"
update_msg = {}
# ToDo: handel literals and variables appropriately
update_msg['plan_name'] = "\""+node_name+"\"" # a string literal in plexil plan
update_msg['exec_status'] = plan_status_varname # a variable in plexil plan
code += self.gen_update_node(1, update_node_name, update_msg)
code += "\tlog_info (\"Excavation is finished.\");\n"
code += "}\n"
return code
# [Section: Code Generation For Each Primitive Part]
def gen_plan_desc(self):
code = "// The following PLEXIL codes are automatically generated during planning.\n\n"
return code
def gen_include_files(self, scenario):
code = "#include \"plan-interface.h\"\n"
if scenario == Scenario.EXCAVATION:
pass
code += "\n\n"
return code
def gen_plan_info(self, tab_num, plan_info):
pre_dents = self.__gen_ident_str(tab_num)
code = pre_dents + "log_info (\"" + str(plan_info) + "\");\n\n"
return code
def gen_unstow(self, tab_num):
pre_dents = self.__gen_ident_str(tab_num)
code = ["log_info (\"Unstowing arm...\");\n"]
code.append("LibraryCall Unstow();\n\n")
return self.__ident_code(code, pre_dents)
def gen_stow(self, tab_num):
pre_dents = self.__gen_ident_str(tab_num)
code = ["log_info (\"Stowing arm...\");\n"]
code.append("LibraryCall Stow();\n\n")
return self.__ident_code(code, pre_dents)
def gen_guarded_move(self, tab_num, x, y, z=0.05, dir_x=0, dir_y=0, dir_z=1, search_dist=0.25):
pre_dents = self.__gen_ident_str(tab_num)
code = []
code.append("log_info (\"A guarded move to find out the ground position of the location (x="+str(x)+", y="+str(y)+")...\");\n")
code.append("LibraryCall GuardedMove (\n")
code.append("\tX = " + str(x) + ", Y = " + str(y) + ", Z = " + str(z) + ",\n")
code.append("\tDirX = " + str(dir_x) + ", DirY = " + str(dir_y) + ", DirZ = " + str(dir_z) + ",\n")
code.append("\tSearchDistance = " + str(search_dist) + ");\n\n")
return self.__ident_code(code, pre_dents)
def gen_ground_found_cond(self, tab_num, body_code, condition, plan_status_varname, msg=""):
pre_dents = self.__gen_ident_str(tab_num)
code = pre_dents + plan_status_varname + "=" + condition + ";\n"
code += pre_dents + "if (" + plan_status_varname + ") {\n"
#code = pre_dents + "if (Lookup(GroundFound)) {\n"
code += body_code
code += pre_dents + "}\n"
code += pre_dents + "else log_error (\"Failed to find ground"
if msg != "":
code += ": " + msg
code += "\");\n"
code += pre_dents + "endif\n\n"
return code
def gen_dig_success_cond(self, tab_num, condition, plan_status_varname, body_code, msg=""):
pre_dents = self.__gen_ident_str(tab_num)
code = pre_dents + plan_status_varname + "=" + condition + ";\n"
code += pre_dents + "if (" + plan_status_varname + ") {\n"
code += pre_dents + "\tlog_info (\"Digging successed!\");\n"
code += body_code
code += pre_dents + "}\n"
code += pre_dents + "else log_error (\"[Condition Failed] " + condition
if msg != "":
code += ": " + msg
code += "\");\n"
code += pre_dents + "endif\n\n"
return code
def gen_grind(self, tab_num, x, y, depth=0.05, length=0.2, parallel=True, msg=""):
pre_dents = self.__gen_ident_str(tab_num)
code = []
if msg != "":
code.append("log_info (\""+ msg + "\");\n")
else:
code.append("log_info (\"Start grinding...\");\n")
code.append("LibraryCall Grind (\n")
code.append("\tX = " + str(x) + ", Y = " + str(y) + ", Depth = " + str(depth) + ",\n")
code.append("\tLength = " + str(length) + ", Parallel = " + self.__get_bool_str(parallel) + ",\n")
code.append("\tGroundPos = Lookup(GroundPosition));\n\n")
return self.__ident_code(code, pre_dents)
def gen_dig_circular(self, tab_num, x, y, depth=0.05, parallel=True, msg=""):
pre_dents = self.__gen_ident_str(tab_num)
code = []
if msg != "":
code.append("log_info (\""+ msg + "\");\n")
else:
code.append("log_info (\"Start dig_circular...\");\n")
code.append("LibraryCall DigCircular (\n")
code.append("\tX = " + str(x) + ", Y = " + str(y) + ", Depth = " + str(depth) + ",\n")
code.append("\tGroundPos = Lookup(GroundPosition),\n")
code.append("\tParallel = " + self.__get_bool_str(parallel) + ");\n\n")
return self.__ident_code(code, pre_dents)
def gen_deliver(self, tab_num, x, y, z=0.5, msg=""):
pre_dents = self.__gen_ident_str(tab_num)
code = []
if msg != "":
code.append("log_info (\""+ msg + "\");\n")
else:
code.append("log_info (\"Delivering the sample...\");\n")
code.append("LibraryCall Deliver (X = " + str(x) + ", Y = " + str(y) + ", Z = " + str(z) + ");\n\n")
return self.__ident_code(code, pre_dents)
# Currently only support the Update statement in the Update node
# Input:
# update_msg: a dict of messages, each of which has the format 'variable_name=value' in the Update Statement
def gen_update_node(self, tab_num, update_node_name, update_msg):
pre_dents = self.__gen_ident_str(tab_num)
code = []
code.append(update_node_name+":\n")
code.append("{\n")
cnt = 0
update_content = "\tUpdate "
for variable_key in update_msg:
if cnt != 0:
update_content += ", "
update_content += variable_key + "=" + update_msg[variable_key]
cnt += 1
update_content += ";\n"
code.append(update_content)
code.append("}\n\n")
return self.__ident_code(code, pre_dents)
# [Section: utility functions]
#
# Python and Plexil have different boolean literals
# Language Boolean Literals
# Python True | False
# Plexil true | false
def __get_bool_str(self, bool_var):
if bool_var:
return "true"
else:
return "false"
def __ident_code(self, code, pre_dents):
code_str = ""
for line in code:
code_str += pre_dents+line
return code_str
def __gen_ident_str(self, tab_num):
return "\t"*tab_num
def gen_info(self, tab_num, msg=""):
code = ["log_info (\"" + msg + "\");\n"]
pre_dents = self.__gen_ident_str(tab_num)
return self.__ident_code(code, pre_dents)
##################
def main(argv):
print("args:"+str(argv))
runtime_info_fp = argv[0]
prism_model_fp = argv[1]
with open(runtime_info_fp) as infile:
runtime_info = json.load(infile)
plexil_plan_translator = PlexilPlanTranslator()
scenario_ID = "Excavation"
plexil_plan_name = "Exca"
plexil_plan_dir = "./"
# The testing value of prism_syn_plan below
# is for the execavation secnario.
prism_syn_plan = "[select_xloc1, select_dloc1]"
faults = {}
plexil_plan_translator.translate(
scenario_ID, runtime_info,
plexil_plan_name, plexil_plan_dir,
prism_syn_plan, faults)
if __name__ == "__main__":
main(sys.argv[1:])