-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathheat_transfer_simulation_utilities.py
183 lines (156 loc) · 7.34 KB
/
heat_transfer_simulation_utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""
This module is defining general infrastructure for the simulations
It can be easily imported and used for any simulation (Sim) that
is supporting the public API as defined here:
- Sim.current_t ... storing the current simulation time
- Sim.simulation_has_finished ... boolean if simulation is over or not
- Sim.error_norm ... determining the simulation error
- Sim.plot() ... updating the results on the plots
- Sim.evaluate_one_step() ... moving one step further in the simulation
- Sim.after_simulation_action() ... what should happen after the simulation
- Sim.save_results() ... custom method for saving results
"""
import time
class Callback:
"""
Class defining the actions that should happen when the simulation
should share the results with the outside world
It is also a communication channel between outside world and simulation,
as it is listening for commands on a shared queue object
"""
def __init__(self,
call_at: float = 500.0,
heat_flux_plot=None,
temperature_plot=None,
queue=None,
progress_callback=None) -> None:
"""
Args:
call_at ... the frequency in simulation second when to be called
heat_flux_plot ... reference of heat flux plot
temperature_plot ... reference of temperature plot
queue ... reference of the shared queue
progress_callback ... reference of progress callback
"""
self.call_at = call_at # how often to be called
self.last_call = 0.0 # time in which the callback was last called
# Takes reference of some plot, which it should update, and some queue,
# through which the GUI will send information.
# Also save the progress communication channel to update time in GUI
self.temperature_plot = temperature_plot
self.heat_flux_plot = heat_flux_plot
self.queue = queue
self.progress_callback = progress_callback
# Setting the starting simulation state as "running" - this can be
# changed only through the queue by input from GUI
self.simulation_state = "running"
def __repr__(self) -> str:
"""
Defining what should be displayed when we print the
object of this class.
Very useful for debugging purposes.
"""
return f"""
self.call_at: {self.call_at},
self.temperature_plot: {self.temperature_plot},
self.heat_flux_plot: {self.heat_flux_plot},
self.queue: {self.queue},
self.progress_callback: {self.progress_callback},
"""
def __call__(self,
Sim,
force_update: bool = False) -> str:
"""
Defining what should happend when the callback will be called
Args:
Sim ... the whole simulation object
force_update ... whether to perform actions regardless of current time
"""
# Doing something when it is time to comunicate with GUI
if Sim.current_t > self.last_call + self.call_at or force_update:
# When both the plots are defined, update them
if self.temperature_plot is not None and self.heat_flux_plot is not None:
Sim.plot(temperature_plot=self.temperature_plot,
heat_flux_plot=self.heat_flux_plot)
# Updating the time the callback was last called
self.last_call += self.call_at
# Getting the connection with GUI and listening for commands
if self.queue is not None and not self.queue.empty():
# Getting and parsing the message from shared queue
# Changing simulation state according that message
msg = self.queue.get_nowait()
if msg == "stop":
self.simulation_state = "stopped"
elif msg == "pause":
self.simulation_state = "paused"
elif msg == "continue":
self.simulation_state = "running"
# According to the current state, emit positive or zero
# value to give information to the GUI if we are running or not
if self.progress_callback is not None:
value_to_emit = 1 if self.simulation_state == "running" else 0
self.progress_callback.emit(value_to_emit)
# Returning the current simulation state to be handled by higher function
return self.simulation_state
class SimulationController:
"""
Holding variables and defining methods for the simulation
"""
def __init__(self,
Sim,
parameters: dict,
progress_callback=None,
queue=None,
temperature_plot=None,
heat_flux_plot=None,
save_results: bool = False):
"""
Args:
Sim ... whole simulation object
parameters ... all defined parameters of simulation
heat_flux_plot ... reference of heat flux plot
temperature_plot ... reference of temperature plot
queue ... reference of the shared queue
progress_callback ... reference of progress callback
save_results ... whether to save results at the end or not
"""
self.Sim = Sim
self.MyCallBack = Callback(progress_callback=progress_callback,
call_at=parameters["callback_period"],
temperature_plot=temperature_plot,
heat_flux_plot=heat_flux_plot,
queue=queue)
self.save_results = save_results
self.progress_callback = progress_callback
self.temperature_plot = temperature_plot
self.heat_flux_plot = heat_flux_plot
self.queue = queue
def complete_simulation(self) -> dict:
"""
Running all the simulation steps, if not stopped
"""
# Calling the evaluating function as long as the simulation has not finished
while not self.Sim.simulation_has_finished:
# Processing the callback and getting the simulation state at the same time
# Then acting accordingly to the current state
simulation_state = self.MyCallBack(self.Sim)
if simulation_state == "running":
# Calling the function that is determining next step
self.Sim.evaluate_one_step()
elif simulation_state == "paused":
# Sleeping for some little time before checking again, to save CPU
time.sleep(0.1)
elif simulation_state == "stopped":
# Breaking out of the loop - finishing simulation
print("stopping")
break
# Calling callback at the very end, to update the plot with complete results
self.MyCallBack(self.Sim, force_update=True)
# Invoking whatever action should happen after simulation finishes
# Passing whole self object, so Simulation has access to all
# connections here and can communicate with GUI
self.Sim.after_simulation_action(self)
# If wanted to, save the results by a custom function
if self.save_results:
self.Sim.save_results()
return {"error_value": self.Sim.error_norm}