event_trace
1from datetime import datetime, timedelta 2import simpy 3import pm4py 4import random 5from process import SimulationProcess 6from pm4py.objects.petri_net import semantics 7from parameters import Parameters 8from utility import Prefix 9from simpy.events import AnyOf, AllOf, Event 10import numpy as np 11import copy 12import csv 13from utility import Buffer, ParallelObject 14import custom_function as custom 15 16 17class Token(object): 18 19 def __init__(self, id: int, net: pm4py.objects.petri_net.obj.PetriNet, am: pm4py.objects.petri_net.obj.Marking, params: Parameters, process: SimulationProcess, prefix: Prefix, type: str, writer: csv.writer, parallel_object: ParallelObject, time: datetime, values=None): 20 self._id = id 21 self._process = process 22 self._start_time = params.START_SIMULATION 23 self._params = params 24 self._net = net 25 self._am = am 26 self._prefix = prefix 27 self._type = type 28 if type == 'sequential': 29 self.see_activity = False 30 else: 31 self.see_activity = True 32 self._writer = writer 33 self._parallel_object = parallel_object 34 self._buffer = Buffer(writer, values) 35 self._buffer.set_feature("attribute_case", custom.case_function_attribute(self._id, time)) 36 37 def _delete_places(self, places): 38 delete = [] 39 for place in places: 40 for p in self._net.places: 41 if str(place) in str(p.name): 42 delete.append(p) 43 return delete 44 45 def simulation(self, env: simpy.Environment): 46 """ 47 The main function to handle the simulation of a single trace 48 """ 49 trans = self.next_transition(env) 50 ### register trace in process ### 51 request_resource = None 52 resource_trace = self._process._get_resource_trace() 53 resource_trace_request = resource_trace.request() if self._type == 'sequential' else None 54 55 while trans is not None: 56 if not self.see_activity and self._type == 'sequential': 57 yield resource_trace_request 58 if type(trans) == list: 59 yield AllOf(env, trans) 60 am_after = self._parallel_object._get_last_events() 61 for d in self._delete_places(self._am): 62 del self._am[d] 63 for t in am_after: 64 self._am[t] = 1 65 trans = self.next_transition(env) 66 67 if trans and trans.label: 68 self._buffer.reset() 69 self._buffer.set_feature("id_case", self._id) 70 self._buffer.set_feature("activity", trans.label) 71 self._buffer.set_feature("prefix", self._prefix.get_prefix(self._start_time + timedelta(seconds=env.now))) 72 self._buffer.set_feature("attribute_event", custom.event_function_attribute(self._id, self._start_time + timedelta(seconds=env.now))) 73 74 ### call predictor for waiting time 75 if trans.label in self._params.ROLE_ACTIVITY: 76 resource = self._process._get_resource(self._params.ROLE_ACTIVITY[trans.label]) 77 else: 78 raise ValueError('Not resource/role defined for this activity', trans.label) 79 80 #self._buffer.set_feature("wip_wait", 0 if type != 'sequential' else resource_trace.count-1) 81 self._buffer.set_feature("wip_wait", resource_trace.count) 82 self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name())) 83 self._buffer.set_feature("ro_total", self._process.get_occupations_all_role()) 84 self._buffer.set_feature("role", resource._get_name()) 85 86 ### register event in process ### 87 resource_task = self._process._get_resource_event(trans.label) 88 self._buffer.set_feature("wip_activity", resource_task.count) 89 90 queue = 0 if len(resource._queue) == 0 else len(resource._queue[-1]) 91 self._buffer.set_feature("queue", queue) 92 self._buffer.set_feature("enabled_time", self._start_time + timedelta(seconds=env.now)) 93 94 waiting = self.define_waiting_time(trans.label) 95 if self.see_activity: 96 yield env.timeout(waiting) 97 98 request_resource = resource.request() 99 yield request_resource 100 single_resource = self._process._set_single_resource(resource._get_name()) 101 self._buffer.set_feature("resource", single_resource) 102 103 resource_task_request = resource_task.request() 104 yield resource_task_request 105 106 ### call predictor for processing time 107 self._buffer.set_feature("wip_start", resource_trace.count) 108 self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name())) 109 self._buffer.set_feature("ro_total", self._process.get_occupations_all_role()) 110 self._buffer.set_feature("wip_activity", resource_task.count) 111 112 stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now)) 113 yield env.timeout(stop) 114 self._buffer.set_feature("start_time", self._start_time + timedelta(seconds=env.now)) 115 duration = self.define_processing_time(trans.label) 116 117 yield env.timeout(duration) 118 119 self._buffer.set_feature("wip_end", resource_trace.count) 120 self._buffer.set_feature("end_time", self._start_time + timedelta(seconds=env.now)) 121 self._buffer.print_values() 122 self._prefix.add_activity(trans.label) 123 resource.release(request_resource) 124 self._process._release_single_resource(resource._get_name(), single_resource) 125 resource_task.release(resource_task_request) 126 127 self._update_marking(trans) 128 trans = self.next_transition(env) if self._am else None 129 130 if self._type == 'parallel': 131 self._parallel_object._set_last_events(self._am) 132 if self._type == 'sequential': 133 resource_trace.release(resource_trace_request) 134 135 def _get_resource_role(self, activity): 136 elements = self._params.ROLE_ACTIVITY[activity.label] 137 resource_object = [] 138 for e in elements: 139 resource_object.append(self._process._get_resource(e)) 140 return resource_object 141 142 def _update_marking(self, trans): 143 self._am = semantics.execute(trans, self._net, self._am) 144 145 def _delete_tokens(self, name): 146 to_delete = [] 147 for p in self._am: 148 if p.name != name: 149 to_delete.append(p) 150 return to_delete 151 152 def _check_probability(self, prob): 153 """Check if the sum of probabilities is 1 154 """ 155 if sum(prob) != 1: 156 print('WARNING: The sum of the probabilities associated with the paths is not 1, to run the simulation we define equal probability') 157 return False 158 else: 159 return True 160 161 def _check_type_paths(self, prob): 162 if type(prob[0]) is str: 163 if sum([x == prob[0] for x in prob]) != len(prob): 164 raise ValueError('ERROR: Not all path are defined as same type ', prob) 165 elif type(prob[0]) is float: 166 if sum([isinstance(x, float) for x in prob]) != len(prob): 167 raise ValueError('ERROR: Not all path are defined as same type (float number) ', prob) 168 else: 169 raise ValueError("ERROR: Invalid input, specify the probability as AUTO, float number or CUSTOM ", prob) 170 171 def _retrieve_check_paths(self, all_enabled_trans): 172 prob = [] 173 for trans in all_enabled_trans: 174 try: 175 if trans.label: 176 prob.append(self._params.PROBABILITY[trans.label]) 177 else: 178 prob.append(self._params.PROBABILITY[trans.name]) 179 except: 180 print('ERROR: Not all path probabilities are defined. Define all paths: ', all_enabled_trans) 181 182 return prob 183 184 def define_xor_next_activity(self, all_enabled_trans): 185 """ Three different methods to decide which path following from XOR gateway: 186 * Random choice: each path has equal probability to be chosen (AUTO) 187 ```json 188 "probability": { 189 "A_ACCEPTED": "AUTO", 190 "skip_2": "AUTO", 191 "A_FINALIZED": "AUTO", 192 } 193 ``` 194 * Defined probability: in the file json it is possible to define for each path a specific probability (PROBABILITY as value) 195 ```json 196 "probability": { 197 "A_PREACCEPTED": 0.20, 198 "skip_1": 0.80 199 } 200 ``` 201 * Custom method: it is possible to define a dedicate method that given the possible paths it returns the one to 202 follow, using whatever techniques the user prefers. (CUSTOM) 203 ```json 204 "probability": { 205 "A_CANCELLED": "CUSTOM", 206 "A_DECLINED": "CUSTOM", 207 "tauSplit_5": "CUSTOM" 208 } 209 ``` 210 """ 211 prob = ['AUTO'] if not self._params.PROBABILITY else self._retrieve_check_paths(all_enabled_trans) 212 self._check_type_paths(prob) 213 if prob[0] == 'AUTO': 214 next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0] 215 elif prob[0] == 'CUSTOM': 216 next = self.call_custom_xor_function(all_enabled_trans) 217 elif type(prob[0] == float()): 218 if not self._check_probability(prob): 219 value = [*range(0, len(prob), 1)] 220 next = int(random.choices(value, prob)[0]) 221 else: 222 next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0] 223 224 return all_enabled_trans[next] 225 226 def define_processing_time(self, activity): 227 """ Three different methods are available to define the processing time for each activity: 228 * Distribution function: specify in the json file the distribution with the right parameters for each 229 activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION). 230 **Be careful**: A negative value generated by the distribution is not valid for the simulator. 231 ```json 232 "processing_time": { 233 "A_FINALIZED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}, 234 } 235 ``` 236 * Custom method: it is possible to define a dedicated method that, given the activity and its 237 characteristics, returns the duration of processing time required. (CUSTOM) 238 ```json 239 "processing_time": { 240 "A_FINALIZED": { "name": "custom"} 241 } 242 ``` 243 * Mixed: It is possible to define a distribution function for some activities and a dedicated method for the others. 244 ```json 245 "processing_time": { 246 "A_FINALIZED": { "name": "custom"}, 247 "A_REGISTERED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}} 248 } 249 ``` 250 """ 251 try: 252 if self._params.PROCESSING_TIME[activity]["name"] == 'custom': 253 duration = self.call_custom_processing_time() 254 else: 255 distribution = self._params.PROCESSING_TIME[activity]['name'] 256 parameters = self._params.PROCESSING_TIME[activity]['parameters'] 257 duration = getattr(np.random, distribution)(**parameters, size=1)[0] 258 if duration < 0: 259 print("WARNING: Negative processing time", duration) 260 duration = 0 261 except: 262 raise ValueError("ERROR: The processing time of", activity, "is not defined in json file") 263 return duration 264 265 def define_waiting_time(self, next_act): 266 """ Three different methods are available to define the waiting time before each activity: 267 * Distribution function: specify in the json file the distribution with the right parameters for each 268 activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION). 269 **Be careful**: A negative value generated by the distribution is not valid for the simulator. 270 ```json 271 "waiting_time": { 272 "A_PARTLYSUBMITTED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}} 273 } 274 ``` 275 * Custom method: it is possible to define a dedicated method that, given the next activity with its 276 features, returns the duration of waiting time. (CUSTOM) 277 ```json 278 "waiting_time": { 279 "A_PARTLYSUBMITTED": { "name": "custom"} 280 } 281 ``` 282 * Mixed: As the processing time, it is possible to define a mix of methods for each activity. 283 ```json 284 "waiting_time": { 285 "A_PARTLYSUBMITTED": { "name": "custom"}, 286 "A_APPROVED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}} 287 } 288 ``` 289 """ 290 try: 291 if self._params.WAITING_TIME[next_act]["name"] == 'custom': 292 duration = self.call_custom_waiting_time() 293 else: 294 distribution = self._params.WAITING_TIME[next_act]['name'] 295 parameters = self._params.WAITING_TIME[next_act]['parameters'] 296 duration = getattr(np.random, distribution)(**parameters, size=1)[0] 297 if duration < 0: 298 print("WARNING: Negative waiting time", duration) 299 duration = 0 300 except: 301 duration = 0 302 303 return duration 304 305 def call_custom_processing_time(self): 306 """ 307 Call to the custom functions in the file *custom_function.py*. 308 """ 309 return custom.custom_processing_time(self._buffer) 310 311 def call_custom_waiting_time(self): 312 """ 313 Call to the custom functions in the file *custom_function.py*. 314 """ 315 return custom.custom_waiting_time(self._buffer) 316 317 def call_custom_xor_function(self, all_enabled_trans): 318 """ 319 Call to the custom functions in the file *custom_function.py*. 320 """ 321 return custom.custom_decision_mining(self._buffer) 322 323 def next_transition(self, env): 324 """ 325 Method to define the next activity in the petrinet. 326 """ 327 all_enabled_trans = semantics.enabled_transitions(self._net, self._am) 328 all_enabled_trans = list(all_enabled_trans) 329 all_enabled_trans.sort(key=lambda x: x.name) 330 if len(all_enabled_trans) == 0: 331 return None 332 elif len(all_enabled_trans) == 1: 333 return all_enabled_trans[0] 334 else: 335 if len(self._am) == 1: 336 return self.define_xor_next_activity(all_enabled_trans) 337 else: 338 events = [] 339 for token in self._am: 340 name = token.name 341 new_am = copy.copy(self._am) 342 tokens_to_delete = self._delete_tokens(name) 343 for p in tokens_to_delete: 344 del new_am[p] 345 path = env.process(Token(self._id, self._net, new_am, self._params, self._process, self._prefix, "parallel", self._writer, self._parallel_object, self._buffer._get_dictionary()).simulation(env)) 346 events.append(path) 347 return events
class
Token:
18class Token(object): 19 20 def __init__(self, id: int, net: pm4py.objects.petri_net.obj.PetriNet, am: pm4py.objects.petri_net.obj.Marking, params: Parameters, process: SimulationProcess, prefix: Prefix, type: str, writer: csv.writer, parallel_object: ParallelObject, time: datetime, values=None): 21 self._id = id 22 self._process = process 23 self._start_time = params.START_SIMULATION 24 self._params = params 25 self._net = net 26 self._am = am 27 self._prefix = prefix 28 self._type = type 29 if type == 'sequential': 30 self.see_activity = False 31 else: 32 self.see_activity = True 33 self._writer = writer 34 self._parallel_object = parallel_object 35 self._buffer = Buffer(writer, values) 36 self._buffer.set_feature("attribute_case", custom.case_function_attribute(self._id, time)) 37 38 def _delete_places(self, places): 39 delete = [] 40 for place in places: 41 for p in self._net.places: 42 if str(place) in str(p.name): 43 delete.append(p) 44 return delete 45 46 def simulation(self, env: simpy.Environment): 47 """ 48 The main function to handle the simulation of a single trace 49 """ 50 trans = self.next_transition(env) 51 ### register trace in process ### 52 request_resource = None 53 resource_trace = self._process._get_resource_trace() 54 resource_trace_request = resource_trace.request() if self._type == 'sequential' else None 55 56 while trans is not None: 57 if not self.see_activity and self._type == 'sequential': 58 yield resource_trace_request 59 if type(trans) == list: 60 yield AllOf(env, trans) 61 am_after = self._parallel_object._get_last_events() 62 for d in self._delete_places(self._am): 63 del self._am[d] 64 for t in am_after: 65 self._am[t] = 1 66 trans = self.next_transition(env) 67 68 if trans and trans.label: 69 self._buffer.reset() 70 self._buffer.set_feature("id_case", self._id) 71 self._buffer.set_feature("activity", trans.label) 72 self._buffer.set_feature("prefix", self._prefix.get_prefix(self._start_time + timedelta(seconds=env.now))) 73 self._buffer.set_feature("attribute_event", custom.event_function_attribute(self._id, self._start_time + timedelta(seconds=env.now))) 74 75 ### call predictor for waiting time 76 if trans.label in self._params.ROLE_ACTIVITY: 77 resource = self._process._get_resource(self._params.ROLE_ACTIVITY[trans.label]) 78 else: 79 raise ValueError('Not resource/role defined for this activity', trans.label) 80 81 #self._buffer.set_feature("wip_wait", 0 if type != 'sequential' else resource_trace.count-1) 82 self._buffer.set_feature("wip_wait", resource_trace.count) 83 self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name())) 84 self._buffer.set_feature("ro_total", self._process.get_occupations_all_role()) 85 self._buffer.set_feature("role", resource._get_name()) 86 87 ### register event in process ### 88 resource_task = self._process._get_resource_event(trans.label) 89 self._buffer.set_feature("wip_activity", resource_task.count) 90 91 queue = 0 if len(resource._queue) == 0 else len(resource._queue[-1]) 92 self._buffer.set_feature("queue", queue) 93 self._buffer.set_feature("enabled_time", self._start_time + timedelta(seconds=env.now)) 94 95 waiting = self.define_waiting_time(trans.label) 96 if self.see_activity: 97 yield env.timeout(waiting) 98 99 request_resource = resource.request() 100 yield request_resource 101 single_resource = self._process._set_single_resource(resource._get_name()) 102 self._buffer.set_feature("resource", single_resource) 103 104 resource_task_request = resource_task.request() 105 yield resource_task_request 106 107 ### call predictor for processing time 108 self._buffer.set_feature("wip_start", resource_trace.count) 109 self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name())) 110 self._buffer.set_feature("ro_total", self._process.get_occupations_all_role()) 111 self._buffer.set_feature("wip_activity", resource_task.count) 112 113 stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now)) 114 yield env.timeout(stop) 115 self._buffer.set_feature("start_time", self._start_time + timedelta(seconds=env.now)) 116 duration = self.define_processing_time(trans.label) 117 118 yield env.timeout(duration) 119 120 self._buffer.set_feature("wip_end", resource_trace.count) 121 self._buffer.set_feature("end_time", self._start_time + timedelta(seconds=env.now)) 122 self._buffer.print_values() 123 self._prefix.add_activity(trans.label) 124 resource.release(request_resource) 125 self._process._release_single_resource(resource._get_name(), single_resource) 126 resource_task.release(resource_task_request) 127 128 self._update_marking(trans) 129 trans = self.next_transition(env) if self._am else None 130 131 if self._type == 'parallel': 132 self._parallel_object._set_last_events(self._am) 133 if self._type == 'sequential': 134 resource_trace.release(resource_trace_request) 135 136 def _get_resource_role(self, activity): 137 elements = self._params.ROLE_ACTIVITY[activity.label] 138 resource_object = [] 139 for e in elements: 140 resource_object.append(self._process._get_resource(e)) 141 return resource_object 142 143 def _update_marking(self, trans): 144 self._am = semantics.execute(trans, self._net, self._am) 145 146 def _delete_tokens(self, name): 147 to_delete = [] 148 for p in self._am: 149 if p.name != name: 150 to_delete.append(p) 151 return to_delete 152 153 def _check_probability(self, prob): 154 """Check if the sum of probabilities is 1 155 """ 156 if sum(prob) != 1: 157 print('WARNING: The sum of the probabilities associated with the paths is not 1, to run the simulation we define equal probability') 158 return False 159 else: 160 return True 161 162 def _check_type_paths(self, prob): 163 if type(prob[0]) is str: 164 if sum([x == prob[0] for x in prob]) != len(prob): 165 raise ValueError('ERROR: Not all path are defined as same type ', prob) 166 elif type(prob[0]) is float: 167 if sum([isinstance(x, float) for x in prob]) != len(prob): 168 raise ValueError('ERROR: Not all path are defined as same type (float number) ', prob) 169 else: 170 raise ValueError("ERROR: Invalid input, specify the probability as AUTO, float number or CUSTOM ", prob) 171 172 def _retrieve_check_paths(self, all_enabled_trans): 173 prob = [] 174 for trans in all_enabled_trans: 175 try: 176 if trans.label: 177 prob.append(self._params.PROBABILITY[trans.label]) 178 else: 179 prob.append(self._params.PROBABILITY[trans.name]) 180 except: 181 print('ERROR: Not all path probabilities are defined. Define all paths: ', all_enabled_trans) 182 183 return prob 184 185 def define_xor_next_activity(self, all_enabled_trans): 186 """ Three different methods to decide which path following from XOR gateway: 187 * Random choice: each path has equal probability to be chosen (AUTO) 188 ```json 189 "probability": { 190 "A_ACCEPTED": "AUTO", 191 "skip_2": "AUTO", 192 "A_FINALIZED": "AUTO", 193 } 194 ``` 195 * Defined probability: in the file json it is possible to define for each path a specific probability (PROBABILITY as value) 196 ```json 197 "probability": { 198 "A_PREACCEPTED": 0.20, 199 "skip_1": 0.80 200 } 201 ``` 202 * Custom method: it is possible to define a dedicate method that given the possible paths it returns the one to 203 follow, using whatever techniques the user prefers. (CUSTOM) 204 ```json 205 "probability": { 206 "A_CANCELLED": "CUSTOM", 207 "A_DECLINED": "CUSTOM", 208 "tauSplit_5": "CUSTOM" 209 } 210 ``` 211 """ 212 prob = ['AUTO'] if not self._params.PROBABILITY else self._retrieve_check_paths(all_enabled_trans) 213 self._check_type_paths(prob) 214 if prob[0] == 'AUTO': 215 next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0] 216 elif prob[0] == 'CUSTOM': 217 next = self.call_custom_xor_function(all_enabled_trans) 218 elif type(prob[0] == float()): 219 if not self._check_probability(prob): 220 value = [*range(0, len(prob), 1)] 221 next = int(random.choices(value, prob)[0]) 222 else: 223 next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0] 224 225 return all_enabled_trans[next] 226 227 def define_processing_time(self, activity): 228 """ Three different methods are available to define the processing time for each activity: 229 * Distribution function: specify in the json file the distribution with the right parameters for each 230 activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION). 231 **Be careful**: A negative value generated by the distribution is not valid for the simulator. 232 ```json 233 "processing_time": { 234 "A_FINALIZED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}, 235 } 236 ``` 237 * Custom method: it is possible to define a dedicated method that, given the activity and its 238 characteristics, returns the duration of processing time required. (CUSTOM) 239 ```json 240 "processing_time": { 241 "A_FINALIZED": { "name": "custom"} 242 } 243 ``` 244 * Mixed: It is possible to define a distribution function for some activities and a dedicated method for the others. 245 ```json 246 "processing_time": { 247 "A_FINALIZED": { "name": "custom"}, 248 "A_REGISTERED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}} 249 } 250 ``` 251 """ 252 try: 253 if self._params.PROCESSING_TIME[activity]["name"] == 'custom': 254 duration = self.call_custom_processing_time() 255 else: 256 distribution = self._params.PROCESSING_TIME[activity]['name'] 257 parameters = self._params.PROCESSING_TIME[activity]['parameters'] 258 duration = getattr(np.random, distribution)(**parameters, size=1)[0] 259 if duration < 0: 260 print("WARNING: Negative processing time", duration) 261 duration = 0 262 except: 263 raise ValueError("ERROR: The processing time of", activity, "is not defined in json file") 264 return duration 265 266 def define_waiting_time(self, next_act): 267 """ Three different methods are available to define the waiting time before each activity: 268 * Distribution function: specify in the json file the distribution with the right parameters for each 269 activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION). 270 **Be careful**: A negative value generated by the distribution is not valid for the simulator. 271 ```json 272 "waiting_time": { 273 "A_PARTLYSUBMITTED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}} 274 } 275 ``` 276 * Custom method: it is possible to define a dedicated method that, given the next activity with its 277 features, returns the duration of waiting time. (CUSTOM) 278 ```json 279 "waiting_time": { 280 "A_PARTLYSUBMITTED": { "name": "custom"} 281 } 282 ``` 283 * Mixed: As the processing time, it is possible to define a mix of methods for each activity. 284 ```json 285 "waiting_time": { 286 "A_PARTLYSUBMITTED": { "name": "custom"}, 287 "A_APPROVED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}} 288 } 289 ``` 290 """ 291 try: 292 if self._params.WAITING_TIME[next_act]["name"] == 'custom': 293 duration = self.call_custom_waiting_time() 294 else: 295 distribution = self._params.WAITING_TIME[next_act]['name'] 296 parameters = self._params.WAITING_TIME[next_act]['parameters'] 297 duration = getattr(np.random, distribution)(**parameters, size=1)[0] 298 if duration < 0: 299 print("WARNING: Negative waiting time", duration) 300 duration = 0 301 except: 302 duration = 0 303 304 return duration 305 306 def call_custom_processing_time(self): 307 """ 308 Call to the custom functions in the file *custom_function.py*. 309 """ 310 return custom.custom_processing_time(self._buffer) 311 312 def call_custom_waiting_time(self): 313 """ 314 Call to the custom functions in the file *custom_function.py*. 315 """ 316 return custom.custom_waiting_time(self._buffer) 317 318 def call_custom_xor_function(self, all_enabled_trans): 319 """ 320 Call to the custom functions in the file *custom_function.py*. 321 """ 322 return custom.custom_decision_mining(self._buffer) 323 324 def next_transition(self, env): 325 """ 326 Method to define the next activity in the petrinet. 327 """ 328 all_enabled_trans = semantics.enabled_transitions(self._net, self._am) 329 all_enabled_trans = list(all_enabled_trans) 330 all_enabled_trans.sort(key=lambda x: x.name) 331 if len(all_enabled_trans) == 0: 332 return None 333 elif len(all_enabled_trans) == 1: 334 return all_enabled_trans[0] 335 else: 336 if len(self._am) == 1: 337 return self.define_xor_next_activity(all_enabled_trans) 338 else: 339 events = [] 340 for token in self._am: 341 name = token.name 342 new_am = copy.copy(self._am) 343 tokens_to_delete = self._delete_tokens(name) 344 for p in tokens_to_delete: 345 del new_am[p] 346 path = env.process(Token(self._id, self._net, new_am, self._params, self._process, self._prefix, "parallel", self._writer, self._parallel_object, self._buffer._get_dictionary()).simulation(env)) 347 events.append(path) 348 return events
Token( id: int, net: pm4py.objects.petri_net.obj.PetriNet, am: pm4py.objects.petri_net.obj.Marking, params: parameters.Parameters, process: process.SimulationProcess, prefix: utility.Prefix, type: str, writer: <built-in function writer>, parallel_object: utility.ParallelObject, time: datetime.datetime, values=None)
20 def __init__(self, id: int, net: pm4py.objects.petri_net.obj.PetriNet, am: pm4py.objects.petri_net.obj.Marking, params: Parameters, process: SimulationProcess, prefix: Prefix, type: str, writer: csv.writer, parallel_object: ParallelObject, time: datetime, values=None): 21 self._id = id 22 self._process = process 23 self._start_time = params.START_SIMULATION 24 self._params = params 25 self._net = net 26 self._am = am 27 self._prefix = prefix 28 self._type = type 29 if type == 'sequential': 30 self.see_activity = False 31 else: 32 self.see_activity = True 33 self._writer = writer 34 self._parallel_object = parallel_object 35 self._buffer = Buffer(writer, values) 36 self._buffer.set_feature("attribute_case", custom.case_function_attribute(self._id, time))
def
simulation(self, env: simpy.core.Environment):
46 def simulation(self, env: simpy.Environment): 47 """ 48 The main function to handle the simulation of a single trace 49 """ 50 trans = self.next_transition(env) 51 ### register trace in process ### 52 request_resource = None 53 resource_trace = self._process._get_resource_trace() 54 resource_trace_request = resource_trace.request() if self._type == 'sequential' else None 55 56 while trans is not None: 57 if not self.see_activity and self._type == 'sequential': 58 yield resource_trace_request 59 if type(trans) == list: 60 yield AllOf(env, trans) 61 am_after = self._parallel_object._get_last_events() 62 for d in self._delete_places(self._am): 63 del self._am[d] 64 for t in am_after: 65 self._am[t] = 1 66 trans = self.next_transition(env) 67 68 if trans and trans.label: 69 self._buffer.reset() 70 self._buffer.set_feature("id_case", self._id) 71 self._buffer.set_feature("activity", trans.label) 72 self._buffer.set_feature("prefix", self._prefix.get_prefix(self._start_time + timedelta(seconds=env.now))) 73 self._buffer.set_feature("attribute_event", custom.event_function_attribute(self._id, self._start_time + timedelta(seconds=env.now))) 74 75 ### call predictor for waiting time 76 if trans.label in self._params.ROLE_ACTIVITY: 77 resource = self._process._get_resource(self._params.ROLE_ACTIVITY[trans.label]) 78 else: 79 raise ValueError('Not resource/role defined for this activity', trans.label) 80 81 #self._buffer.set_feature("wip_wait", 0 if type != 'sequential' else resource_trace.count-1) 82 self._buffer.set_feature("wip_wait", resource_trace.count) 83 self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name())) 84 self._buffer.set_feature("ro_total", self._process.get_occupations_all_role()) 85 self._buffer.set_feature("role", resource._get_name()) 86 87 ### register event in process ### 88 resource_task = self._process._get_resource_event(trans.label) 89 self._buffer.set_feature("wip_activity", resource_task.count) 90 91 queue = 0 if len(resource._queue) == 0 else len(resource._queue[-1]) 92 self._buffer.set_feature("queue", queue) 93 self._buffer.set_feature("enabled_time", self._start_time + timedelta(seconds=env.now)) 94 95 waiting = self.define_waiting_time(trans.label) 96 if self.see_activity: 97 yield env.timeout(waiting) 98 99 request_resource = resource.request() 100 yield request_resource 101 single_resource = self._process._set_single_resource(resource._get_name()) 102 self._buffer.set_feature("resource", single_resource) 103 104 resource_task_request = resource_task.request() 105 yield resource_task_request 106 107 ### call predictor for processing time 108 self._buffer.set_feature("wip_start", resource_trace.count) 109 self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name())) 110 self._buffer.set_feature("ro_total", self._process.get_occupations_all_role()) 111 self._buffer.set_feature("wip_activity", resource_task.count) 112 113 stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now)) 114 yield env.timeout(stop) 115 self._buffer.set_feature("start_time", self._start_time + timedelta(seconds=env.now)) 116 duration = self.define_processing_time(trans.label) 117 118 yield env.timeout(duration) 119 120 self._buffer.set_feature("wip_end", resource_trace.count) 121 self._buffer.set_feature("end_time", self._start_time + timedelta(seconds=env.now)) 122 self._buffer.print_values() 123 self._prefix.add_activity(trans.label) 124 resource.release(request_resource) 125 self._process._release_single_resource(resource._get_name(), single_resource) 126 resource_task.release(resource_task_request) 127 128 self._update_marking(trans) 129 trans = self.next_transition(env) if self._am else None 130 131 if self._type == 'parallel': 132 self._parallel_object._set_last_events(self._am) 133 if self._type == 'sequential': 134 resource_trace.release(resource_trace_request)
The main function to handle the simulation of a single trace
def
define_xor_next_activity(self, all_enabled_trans):
185 def define_xor_next_activity(self, all_enabled_trans): 186 """ Three different methods to decide which path following from XOR gateway: 187 * Random choice: each path has equal probability to be chosen (AUTO) 188 ```json 189 "probability": { 190 "A_ACCEPTED": "AUTO", 191 "skip_2": "AUTO", 192 "A_FINALIZED": "AUTO", 193 } 194 ``` 195 * Defined probability: in the file json it is possible to define for each path a specific probability (PROBABILITY as value) 196 ```json 197 "probability": { 198 "A_PREACCEPTED": 0.20, 199 "skip_1": 0.80 200 } 201 ``` 202 * Custom method: it is possible to define a dedicate method that given the possible paths it returns the one to 203 follow, using whatever techniques the user prefers. (CUSTOM) 204 ```json 205 "probability": { 206 "A_CANCELLED": "CUSTOM", 207 "A_DECLINED": "CUSTOM", 208 "tauSplit_5": "CUSTOM" 209 } 210 ``` 211 """ 212 prob = ['AUTO'] if not self._params.PROBABILITY else self._retrieve_check_paths(all_enabled_trans) 213 self._check_type_paths(prob) 214 if prob[0] == 'AUTO': 215 next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0] 216 elif prob[0] == 'CUSTOM': 217 next = self.call_custom_xor_function(all_enabled_trans) 218 elif type(prob[0] == float()): 219 if not self._check_probability(prob): 220 value = [*range(0, len(prob), 1)] 221 next = int(random.choices(value, prob)[0]) 222 else: 223 next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0] 224 225 return all_enabled_trans[next]
Three different methods to decide which path following from XOR gateway:
- Random choice: each path has equal probability to be chosen (AUTO)
"probability": {
"A_ACCEPTED": "AUTO",
"skip_2": "AUTO",
"A_FINALIZED": "AUTO",
}
- Defined probability: in the file json it is possible to define for each path a specific probability (PROBABILITY as value)
"probability": {
"A_PREACCEPTED": 0.20,
"skip_1": 0.80
}
- Custom method: it is possible to define a dedicate method that given the possible paths it returns the one to follow, using whatever techniques the user prefers. (CUSTOM)
"probability": {
"A_CANCELLED": "CUSTOM",
"A_DECLINED": "CUSTOM",
"tauSplit_5": "CUSTOM"
}
def
define_processing_time(self, activity):
227 def define_processing_time(self, activity): 228 """ Three different methods are available to define the processing time for each activity: 229 * Distribution function: specify in the json file the distribution with the right parameters for each 230 activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION). 231 **Be careful**: A negative value generated by the distribution is not valid for the simulator. 232 ```json 233 "processing_time": { 234 "A_FINALIZED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}, 235 } 236 ``` 237 * Custom method: it is possible to define a dedicated method that, given the activity and its 238 characteristics, returns the duration of processing time required. (CUSTOM) 239 ```json 240 "processing_time": { 241 "A_FINALIZED": { "name": "custom"} 242 } 243 ``` 244 * Mixed: It is possible to define a distribution function for some activities and a dedicated method for the others. 245 ```json 246 "processing_time": { 247 "A_FINALIZED": { "name": "custom"}, 248 "A_REGISTERED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}} 249 } 250 ``` 251 """ 252 try: 253 if self._params.PROCESSING_TIME[activity]["name"] == 'custom': 254 duration = self.call_custom_processing_time() 255 else: 256 distribution = self._params.PROCESSING_TIME[activity]['name'] 257 parameters = self._params.PROCESSING_TIME[activity]['parameters'] 258 duration = getattr(np.random, distribution)(**parameters, size=1)[0] 259 if duration < 0: 260 print("WARNING: Negative processing time", duration) 261 duration = 0 262 except: 263 raise ValueError("ERROR: The processing time of", activity, "is not defined in json file") 264 return duration
Three different methods are available to define the processing time for each activity:
- Distribution function: specify in the json file the distribution with the right parameters for each activity, see the numpy_distribution distribution, (DISTRIBUTION). Be careful: A negative value generated by the distribution is not valid for the simulator.
"processing_time": {
"A_FINALIZED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}},
}
- Custom method: it is possible to define a dedicated method that, given the activity and its characteristics, returns the duration of processing time required. (CUSTOM)
"processing_time": {
"A_FINALIZED": { "name": "custom"}
}
- Mixed: It is possible to define a distribution function for some activities and a dedicated method for the others.
"processing_time": {
"A_FINALIZED": { "name": "custom"},
"A_REGISTERED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
}
def
define_waiting_time(self, next_act):
266 def define_waiting_time(self, next_act): 267 """ Three different methods are available to define the waiting time before each activity: 268 * Distribution function: specify in the json file the distribution with the right parameters for each 269 activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION). 270 **Be careful**: A negative value generated by the distribution is not valid for the simulator. 271 ```json 272 "waiting_time": { 273 "A_PARTLYSUBMITTED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}} 274 } 275 ``` 276 * Custom method: it is possible to define a dedicated method that, given the next activity with its 277 features, returns the duration of waiting time. (CUSTOM) 278 ```json 279 "waiting_time": { 280 "A_PARTLYSUBMITTED": { "name": "custom"} 281 } 282 ``` 283 * Mixed: As the processing time, it is possible to define a mix of methods for each activity. 284 ```json 285 "waiting_time": { 286 "A_PARTLYSUBMITTED": { "name": "custom"}, 287 "A_APPROVED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}} 288 } 289 ``` 290 """ 291 try: 292 if self._params.WAITING_TIME[next_act]["name"] == 'custom': 293 duration = self.call_custom_waiting_time() 294 else: 295 distribution = self._params.WAITING_TIME[next_act]['name'] 296 parameters = self._params.WAITING_TIME[next_act]['parameters'] 297 duration = getattr(np.random, distribution)(**parameters, size=1)[0] 298 if duration < 0: 299 print("WARNING: Negative waiting time", duration) 300 duration = 0 301 except: 302 duration = 0 303 304 return duration
Three different methods are available to define the waiting time before each activity:
- Distribution function: specify in the json file the distribution with the right parameters for each activity, see the numpy_distribution distribution, (DISTRIBUTION). Be careful: A negative value generated by the distribution is not valid for the simulator.
"waiting_time": {
"A_PARTLYSUBMITTED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
}
- Custom method: it is possible to define a dedicated method that, given the next activity with its features, returns the duration of waiting time. (CUSTOM)
"waiting_time": {
"A_PARTLYSUBMITTED": { "name": "custom"}
}
- Mixed: As the processing time, it is possible to define a mix of methods for each activity.
"waiting_time": {
"A_PARTLYSUBMITTED": { "name": "custom"},
"A_APPROVED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
}
def
call_custom_processing_time(self):
306 def call_custom_processing_time(self): 307 """ 308 Call to the custom functions in the file *custom_function.py*. 309 """ 310 return custom.custom_processing_time(self._buffer)
Call to the custom functions in the file custom_function.py.
def
call_custom_waiting_time(self):
312 def call_custom_waiting_time(self): 313 """ 314 Call to the custom functions in the file *custom_function.py*. 315 """ 316 return custom.custom_waiting_time(self._buffer)
Call to the custom functions in the file custom_function.py.
def
call_custom_xor_function(self, all_enabled_trans):
318 def call_custom_xor_function(self, all_enabled_trans): 319 """ 320 Call to the custom functions in the file *custom_function.py*. 321 """ 322 return custom.custom_decision_mining(self._buffer)
Call to the custom functions in the file custom_function.py.
def
next_transition(self, env):
324 def next_transition(self, env): 325 """ 326 Method to define the next activity in the petrinet. 327 """ 328 all_enabled_trans = semantics.enabled_transitions(self._net, self._am) 329 all_enabled_trans = list(all_enabled_trans) 330 all_enabled_trans.sort(key=lambda x: x.name) 331 if len(all_enabled_trans) == 0: 332 return None 333 elif len(all_enabled_trans) == 1: 334 return all_enabled_trans[0] 335 else: 336 if len(self._am) == 1: 337 return self.define_xor_next_activity(all_enabled_trans) 338 else: 339 events = [] 340 for token in self._am: 341 name = token.name 342 new_am = copy.copy(self._am) 343 tokens_to_delete = self._delete_tokens(name) 344 for p in tokens_to_delete: 345 del new_am[p] 346 path = env.process(Token(self._id, self._net, new_am, self._params, self._process, self._prefix, "parallel", self._writer, self._parallel_object, self._buffer._get_dictionary()).simulation(env)) 347 events.append(path) 348 return events
Method to define the next activity in the petrinet.