
  1from datetime import datetime, timedelta
  2import simpy
  3import pm4py
  4import random
  5from process import SimulationProcess
  6from pm4py.objects.petri_net import semantics
  7from parameters import Parameters
  8from utility import Prefix
  9from simpy.events import AnyOf, AllOf, Event
 10import numpy as np
 11import copy
 12import csv
 13from utility import Buffer, ParallelObject
 14import custom_function as custom
 17class Token(object):
 19    def __init__(self, id: int, net: pm4py.objects.petri_net.obj.PetriNet, am: pm4py.objects.petri_net.obj.Marking, params: Parameters, process: SimulationProcess, prefix: Prefix, type: str, writer: csv.writer, parallel_object: ParallelObject, time: datetime, values=None):
 20        self._id = id
 21        self._process = process
 22        self._start_time = params.START_SIMULATION
 23        self._params = params
 24        self._net = net
 25        self._am = am
 26        self._prefix = prefix
 27        self._type = type
 28        if type == 'sequential':
 29            self.see_activity = False
 30        else:
 31            self.see_activity = True
 32        self._writer = writer
 33        self._parallel_object = parallel_object
 34        self._buffer = Buffer(writer, values)
 35        self._buffer.set_feature("attribute_case", custom.case_function_attribute(self._id, time))
 37    def _delete_places(self, places):
 38        delete = []
 39        for place in places:
 40            for p in self._net.places:
 41                if str(place) in str(p.name):
 42                    delete.append(p)
 43        return delete
 45    def simulation(self, env: simpy.Environment):
 46        """
 47            The main function to handle the simulation of a single trace
 48        """
 49        trans = self.next_transition(env)
 50        ### register trace in process ###
 51        request_resource = None
 52        resource_trace = self._process._get_resource_trace()
 53        resource_trace_request = resource_trace.request() if self._type == 'sequential' else None
 55        while trans is not None:
 56            if not self.see_activity and self._type == 'sequential':
 57                yield resource_trace_request
 58            if type(trans) == list:
 59                yield AllOf(env, trans)
 60                am_after = self._parallel_object._get_last_events()
 61                for d in self._delete_places(self._am):
 62                    del self._am[d]
 63                for t in am_after:
 64                    self._am[t] = 1
 65                trans = self.next_transition(env)
 67            if trans and trans.label:
 68                self._buffer.reset()
 69                self._buffer.set_feature("id_case", self._id)
 70                self._buffer.set_feature("activity", trans.label)
 71                self._buffer.set_feature("prefix", self._prefix.get_prefix(self._start_time + timedelta(seconds=env.now)))
 72                self._buffer.set_feature("attribute_event", custom.event_function_attribute(self._id, self._start_time + timedelta(seconds=env.now)))
 74                ### call predictor for waiting time
 75                if trans.label in self._params.ROLE_ACTIVITY:
 76                    resource = self._process._get_resource(self._params.ROLE_ACTIVITY[trans.label])
 77                else:
 78                    raise ValueError('Not resource/role defined for this activity', trans.label)
 80                #self._buffer.set_feature("wip_wait", 0 if type != 'sequential' else resource_trace.count-1)
 81                self._buffer.set_feature("wip_wait", resource_trace.count)
 82                self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name()))
 83                self._buffer.set_feature("ro_total", self._process.get_occupations_all_role())
 84                self._buffer.set_feature("role", resource._get_name())
 86                ### register event in process ###
 87                resource_task = self._process._get_resource_event(trans.label)
 88                self._buffer.set_feature("wip_activity", resource_task.count)
 90                queue = 0 if len(resource._queue) == 0 else len(resource._queue[-1])
 91                self._buffer.set_feature("queue", queue)
 92                self._buffer.set_feature("enabled_time", self._start_time + timedelta(seconds=env.now))
 94                waiting = self.define_waiting_time(trans.label)
 95                if self.see_activity:
 96                    yield env.timeout(waiting)
 98                request_resource = resource.request()
 99                yield request_resource
100                single_resource = self._process._set_single_resource(resource._get_name())
101                self._buffer.set_feature("resource", single_resource)
103                resource_task_request = resource_task.request()
104                yield resource_task_request
106                ### call predictor for processing time
107                self._buffer.set_feature("wip_start", resource_trace.count)
108                self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name()))
109                self._buffer.set_feature("ro_total", self._process.get_occupations_all_role())
110                self._buffer.set_feature("wip_activity", resource_task.count)
112                stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now))
113                yield env.timeout(stop)
114                self._buffer.set_feature("start_time", self._start_time + timedelta(seconds=env.now))
115                duration = self.define_processing_time(trans.label)
117                yield env.timeout(duration)
119                self._buffer.set_feature("wip_end", resource_trace.count)
120                self._buffer.set_feature("end_time", self._start_time + timedelta(seconds=env.now))
121                self._buffer.print_values()
122                self._prefix.add_activity(trans.label)
123                resource.release(request_resource)
124                self._process._release_single_resource(resource._get_name(), single_resource)
125                resource_task.release(resource_task_request)
127            self._update_marking(trans)
128            trans = self.next_transition(env) if self._am else None
130        if self._type == 'parallel':
131            self._parallel_object._set_last_events(self._am)
132        if self._type == 'sequential':
133            resource_trace.release(resource_trace_request)
135    def _get_resource_role(self, activity):
136        elements = self._params.ROLE_ACTIVITY[activity.label]
137        resource_object = []
138        for e in elements:
139            resource_object.append(self._process._get_resource(e))
140        return resource_object
142    def _update_marking(self, trans):
143        self._am = semantics.execute(trans, self._net, self._am)
145    def _delete_tokens(self, name):
146        to_delete = []
147        for p in self._am:
148            if p.name != name:
149                to_delete.append(p)
150        return to_delete
152    def _check_probability(self, prob):
153        """Check if the sum of probabilities is 1
154        """
155        if sum(prob) != 1:
156            print('WARNING: The sum of the probabilities associated with the paths is not 1, to run the simulation we define equal probability')
157            return False
158        else:
159            return True
161    def _check_type_paths(self, prob):
162        if type(prob[0]) is str:
163            if sum([x == prob[0] for x in prob]) != len(prob):
164                raise ValueError('ERROR: Not all path are defined as same type ', prob)
165        elif type(prob[0]) is float:
166            if sum([isinstance(x, float) for x in prob]) != len(prob):
167                raise ValueError('ERROR: Not all path are defined as same type (float number) ', prob)
168        else:
169            raise ValueError("ERROR: Invalid input, specify the probability as AUTO, float number or CUSTOM ", prob)
171    def _retrieve_check_paths(self, all_enabled_trans):
172        prob = []
173        for trans in all_enabled_trans:
174            try:
175                if trans.label:
176                    prob.append(self._params.PROBABILITY[trans.label])
177                else:
178                    prob.append(self._params.PROBABILITY[trans.name])
179            except:
180                print('ERROR: Not all path probabilities are defined. Define all paths: ', all_enabled_trans)
182        return prob
184    def define_xor_next_activity(self, all_enabled_trans):
185        """ Three different methods to decide which path following from XOR gateway:
186        * Random choice: each path has equal probability to be chosen (AUTO)
187        ```json
188        "probability": {
189            "A_ACCEPTED": "AUTO",
190            "skip_2": "AUTO",
191            "A_FINALIZED": "AUTO",
192        }
193        ```
194        * Defined probability: in the file json it is possible to define for each path a specific probability (PROBABILITY as value)
195        ```json
196        "probability": {
197            "A_PREACCEPTED": 0.20,
198            "skip_1": 0.80
199        }
200        ```
201        * Custom method: it is possible to define a dedicate method that given the possible paths it returns the one to
202        follow, using whatever techniques the user prefers. (CUSTOM)
203        ```json
204        "probability": {
205            "A_CANCELLED": "CUSTOM",
206            "A_DECLINED": "CUSTOM",
207            "tauSplit_5": "CUSTOM"
208        }
209        ```
210        """
211        prob = ['AUTO'] if not self._params.PROBABILITY else self._retrieve_check_paths(all_enabled_trans)
212        self._check_type_paths(prob)
213        if prob[0] == 'AUTO':
214                next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0]
215        elif prob[0] == 'CUSTOM':
216            next = self.call_custom_xor_function(all_enabled_trans)
217        elif type(prob[0] == float()):
218            if not self._check_probability(prob):
219                value = [*range(0, len(prob), 1)]
220                next = int(random.choices(value, prob)[0])
221            else:
222                next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0]
224        return all_enabled_trans[next]
226    def define_processing_time(self, activity):
227        """ Three different methods are available to define the processing time for each activity:
228            * Distribution function: specify in the json file the distribution with the right parameters for each
229            activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION).
230            **Be careful**: A negative value generated by the distribution is not valid for the simulator.
231            ```json
232             "processing_time": {
233                 "A_FINALIZED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}},
234             }
235            ```
236            * Custom method: it is possible to define a dedicated method that, given the activity and its
237            characteristics, returns the duration of processing time required. (CUSTOM)
238            ```json
239            "processing_time": {
240                 "A_FINALIZED":  { "name": "custom"}
241            }
242            ```
243            * Mixed: It is possible to define a distribution function for some activities and a dedicated method for the others.
244            ```json
245            "processing_time": {
246                 "A_FINALIZED":  { "name": "custom"},
247                 "A_REGISTERED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
248            }
249            ```
250        """
251        try:
252            if self._params.PROCESSING_TIME[activity]["name"] == 'custom':
253                duration = self.call_custom_processing_time()
254            else:
255                distribution = self._params.PROCESSING_TIME[activity]['name']
256                parameters = self._params.PROCESSING_TIME[activity]['parameters']
257                duration = getattr(np.random, distribution)(**parameters, size=1)[0]
258                if duration < 0:
259                    print("WARNING: Negative processing time",  duration)
260                    duration = 0
261        except:
262            raise ValueError("ERROR: The processing time of", activity, "is not defined in json file")
263        return duration
265    def define_waiting_time(self, next_act):
266        """ Three different methods are available to define the waiting time before each activity:
267            * Distribution function: specify in the json file the distribution with the right parameters for each
268            activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION).
269            **Be careful**: A negative value generated by the distribution is not valid for the simulator.
270            ```json
271             "waiting_time": {
272                 "A_PARTLYSUBMITTED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
273             }
274            ```
275            * Custom method: it is possible to define a dedicated method that, given the next activity with its
276            features, returns the duration of waiting time. (CUSTOM)
277            ```json
278            "waiting_time": {
279                 "A_PARTLYSUBMITTED": { "name": "custom"}
280            }
281            ```
282            * Mixed: As the processing time, it is possible to define a mix of methods for each activity.
283            ```json
284            "waiting_time": {
285                 "A_PARTLYSUBMITTED":  { "name": "custom"},
286                 "A_APPROVED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
287            }
288            ```
289        """
290        try:
291            if self._params.WAITING_TIME[next_act]["name"] == 'custom':
292                duration = self.call_custom_waiting_time()
293            else:
294                distribution = self._params.WAITING_TIME[next_act]['name']
295                parameters = self._params.WAITING_TIME[next_act]['parameters']
296                duration = getattr(np.random, distribution)(**parameters, size=1)[0]
297                if duration < 0:
298                    print("WARNING: Negative waiting time",  duration)
299                    duration = 0
300        except:
301            duration = 0
303        return duration
305    def call_custom_processing_time(self):
306        """
307        Call to the custom functions in the file *custom_function.py*.
308        """
309        return custom.custom_processing_time(self._buffer)
311    def call_custom_waiting_time(self):
312        """
313            Call to the custom functions in the file *custom_function.py*.
314        """
315        return custom.custom_waiting_time(self._buffer)
317    def call_custom_xor_function(self, all_enabled_trans):
318        """
319            Call to the custom functions in the file *custom_function.py*.
320        """
321        return custom.custom_decision_mining(self._buffer)
323    def next_transition(self, env):
324        """
325        Method to define the next activity in the petrinet.
326        """
327        all_enabled_trans = semantics.enabled_transitions(self._net, self._am)
328        all_enabled_trans = list(all_enabled_trans)
329        all_enabled_trans.sort(key=lambda x: x.name)
330        if len(all_enabled_trans) == 0:
331            return None
332        elif len(all_enabled_trans) == 1:
333            return all_enabled_trans[0]
334        else:
335            if len(self._am) == 1:
336                return self.define_xor_next_activity(all_enabled_trans)
337            else:
338                events = []
339                for token in self._am:
340                    name = token.name
341                    new_am = copy.copy(self._am)
342                    tokens_to_delete = self._delete_tokens(name)
343                    for p in tokens_to_delete:
344                        del new_am[p]
345                    path = env.process(Token(self._id, self._net, new_am, self._params, self._process, self._prefix, "parallel", self._writer, self._parallel_object, self._buffer._get_dictionary()).simulation(env))
346                    events.append(path)
347                return events
class Token:
 18class Token(object):
 20    def __init__(self, id: int, net: pm4py.objects.petri_net.obj.PetriNet, am: pm4py.objects.petri_net.obj.Marking, params: Parameters, process: SimulationProcess, prefix: Prefix, type: str, writer: csv.writer, parallel_object: ParallelObject, time: datetime, values=None):
 21        self._id = id
 22        self._process = process
 23        self._start_time = params.START_SIMULATION
 24        self._params = params
 25        self._net = net
 26        self._am = am
 27        self._prefix = prefix
 28        self._type = type
 29        if type == 'sequential':
 30            self.see_activity = False
 31        else:
 32            self.see_activity = True
 33        self._writer = writer
 34        self._parallel_object = parallel_object
 35        self._buffer = Buffer(writer, values)
 36        self._buffer.set_feature("attribute_case", custom.case_function_attribute(self._id, time))
 38    def _delete_places(self, places):
 39        delete = []
 40        for place in places:
 41            for p in self._net.places:
 42                if str(place) in str(p.name):
 43                    delete.append(p)
 44        return delete
 46    def simulation(self, env: simpy.Environment):
 47        """
 48            The main function to handle the simulation of a single trace
 49        """
 50        trans = self.next_transition(env)
 51        ### register trace in process ###
 52        request_resource = None
 53        resource_trace = self._process._get_resource_trace()
 54        resource_trace_request = resource_trace.request() if self._type == 'sequential' else None
 56        while trans is not None:
 57            if not self.see_activity and self._type == 'sequential':
 58                yield resource_trace_request
 59            if type(trans) == list:
 60                yield AllOf(env, trans)
 61                am_after = self._parallel_object._get_last_events()
 62                for d in self._delete_places(self._am):
 63                    del self._am[d]
 64                for t in am_after:
 65                    self._am[t] = 1
 66                trans = self.next_transition(env)
 68            if trans and trans.label:
 69                self._buffer.reset()
 70                self._buffer.set_feature("id_case", self._id)
 71                self._buffer.set_feature("activity", trans.label)
 72                self._buffer.set_feature("prefix", self._prefix.get_prefix(self._start_time + timedelta(seconds=env.now)))
 73                self._buffer.set_feature("attribute_event", custom.event_function_attribute(self._id, self._start_time + timedelta(seconds=env.now)))
 75                ### call predictor for waiting time
 76                if trans.label in self._params.ROLE_ACTIVITY:
 77                    resource = self._process._get_resource(self._params.ROLE_ACTIVITY[trans.label])
 78                else:
 79                    raise ValueError('Not resource/role defined for this activity', trans.label)
 81                #self._buffer.set_feature("wip_wait", 0 if type != 'sequential' else resource_trace.count-1)
 82                self._buffer.set_feature("wip_wait", resource_trace.count)
 83                self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name()))
 84                self._buffer.set_feature("ro_total", self._process.get_occupations_all_role())
 85                self._buffer.set_feature("role", resource._get_name())
 87                ### register event in process ###
 88                resource_task = self._process._get_resource_event(trans.label)
 89                self._buffer.set_feature("wip_activity", resource_task.count)
 91                queue = 0 if len(resource._queue) == 0 else len(resource._queue[-1])
 92                self._buffer.set_feature("queue", queue)
 93                self._buffer.set_feature("enabled_time", self._start_time + timedelta(seconds=env.now))
 95                waiting = self.define_waiting_time(trans.label)
 96                if self.see_activity:
 97                    yield env.timeout(waiting)
 99                request_resource = resource.request()
100                yield request_resource
101                single_resource = self._process._set_single_resource(resource._get_name())
102                self._buffer.set_feature("resource", single_resource)
104                resource_task_request = resource_task.request()
105                yield resource_task_request
107                ### call predictor for processing time
108                self._buffer.set_feature("wip_start", resource_trace.count)
109                self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name()))
110                self._buffer.set_feature("ro_total", self._process.get_occupations_all_role())
111                self._buffer.set_feature("wip_activity", resource_task.count)
113                stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now))
114                yield env.timeout(stop)
115                self._buffer.set_feature("start_time", self._start_time + timedelta(seconds=env.now))
116                duration = self.define_processing_time(trans.label)
118                yield env.timeout(duration)
120                self._buffer.set_feature("wip_end", resource_trace.count)
121                self._buffer.set_feature("end_time", self._start_time + timedelta(seconds=env.now))
122                self._buffer.print_values()
123                self._prefix.add_activity(trans.label)
124                resource.release(request_resource)
125                self._process._release_single_resource(resource._get_name(), single_resource)
126                resource_task.release(resource_task_request)
128            self._update_marking(trans)
129            trans = self.next_transition(env) if self._am else None
131        if self._type == 'parallel':
132            self._parallel_object._set_last_events(self._am)
133        if self._type == 'sequential':
134            resource_trace.release(resource_trace_request)
136    def _get_resource_role(self, activity):
137        elements = self._params.ROLE_ACTIVITY[activity.label]
138        resource_object = []
139        for e in elements:
140            resource_object.append(self._process._get_resource(e))
141        return resource_object
143    def _update_marking(self, trans):
144        self._am = semantics.execute(trans, self._net, self._am)
146    def _delete_tokens(self, name):
147        to_delete = []
148        for p in self._am:
149            if p.name != name:
150                to_delete.append(p)
151        return to_delete
153    def _check_probability(self, prob):
154        """Check if the sum of probabilities is 1
155        """
156        if sum(prob) != 1:
157            print('WARNING: The sum of the probabilities associated with the paths is not 1, to run the simulation we define equal probability')
158            return False
159        else:
160            return True
162    def _check_type_paths(self, prob):
163        if type(prob[0]) is str:
164            if sum([x == prob[0] for x in prob]) != len(prob):
165                raise ValueError('ERROR: Not all path are defined as same type ', prob)
166        elif type(prob[0]) is float:
167            if sum([isinstance(x, float) for x in prob]) != len(prob):
168                raise ValueError('ERROR: Not all path are defined as same type (float number) ', prob)
169        else:
170            raise ValueError("ERROR: Invalid input, specify the probability as AUTO, float number or CUSTOM ", prob)
172    def _retrieve_check_paths(self, all_enabled_trans):
173        prob = []
174        for trans in all_enabled_trans:
175            try:
176                if trans.label:
177                    prob.append(self._params.PROBABILITY[trans.label])
178                else:
179                    prob.append(self._params.PROBABILITY[trans.name])
180            except:
181                print('ERROR: Not all path probabilities are defined. Define all paths: ', all_enabled_trans)
183        return prob
185    def define_xor_next_activity(self, all_enabled_trans):
186        """ Three different methods to decide which path following from XOR gateway:
187        * Random choice: each path has equal probability to be chosen (AUTO)
188        ```json
189        "probability": {
190            "A_ACCEPTED": "AUTO",
191            "skip_2": "AUTO",
192            "A_FINALIZED": "AUTO",
193        }
194        ```
195        * Defined probability: in the file json it is possible to define for each path a specific probability (PROBABILITY as value)
196        ```json
197        "probability": {
198            "A_PREACCEPTED": 0.20,
199            "skip_1": 0.80
200        }
201        ```
202        * Custom method: it is possible to define a dedicate method that given the possible paths it returns the one to
203        follow, using whatever techniques the user prefers. (CUSTOM)
204        ```json
205        "probability": {
206            "A_CANCELLED": "CUSTOM",
207            "A_DECLINED": "CUSTOM",
208            "tauSplit_5": "CUSTOM"
209        }
210        ```
211        """
212        prob = ['AUTO'] if not self._params.PROBABILITY else self._retrieve_check_paths(all_enabled_trans)
213        self._check_type_paths(prob)
214        if prob[0] == 'AUTO':
215                next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0]
216        elif prob[0] == 'CUSTOM':
217            next = self.call_custom_xor_function(all_enabled_trans)
218        elif type(prob[0] == float()):
219            if not self._check_probability(prob):
220                value = [*range(0, len(prob), 1)]
221                next = int(random.choices(value, prob)[0])
222            else:
223                next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0]
225        return all_enabled_trans[next]
227    def define_processing_time(self, activity):
228        """ Three different methods are available to define the processing time for each activity:
229            * Distribution function: specify in the json file the distribution with the right parameters for each
230            activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION).
231            **Be careful**: A negative value generated by the distribution is not valid for the simulator.
232            ```json
233             "processing_time": {
234                 "A_FINALIZED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}},
235             }
236            ```
237            * Custom method: it is possible to define a dedicated method that, given the activity and its
238            characteristics, returns the duration of processing time required. (CUSTOM)
239            ```json
240            "processing_time": {
241                 "A_FINALIZED":  { "name": "custom"}
242            }
243            ```
244            * Mixed: It is possible to define a distribution function for some activities and a dedicated method for the others.
245            ```json
246            "processing_time": {
247                 "A_FINALIZED":  { "name": "custom"},
248                 "A_REGISTERED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
249            }
250            ```
251        """
252        try:
253            if self._params.PROCESSING_TIME[activity]["name"] == 'custom':
254                duration = self.call_custom_processing_time()
255            else:
256                distribution = self._params.PROCESSING_TIME[activity]['name']
257                parameters = self._params.PROCESSING_TIME[activity]['parameters']
258                duration = getattr(np.random, distribution)(**parameters, size=1)[0]
259                if duration < 0:
260                    print("WARNING: Negative processing time",  duration)
261                    duration = 0
262        except:
263            raise ValueError("ERROR: The processing time of", activity, "is not defined in json file")
264        return duration
266    def define_waiting_time(self, next_act):
267        """ Three different methods are available to define the waiting time before each activity:
268            * Distribution function: specify in the json file the distribution with the right parameters for each
269            activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION).
270            **Be careful**: A negative value generated by the distribution is not valid for the simulator.
271            ```json
272             "waiting_time": {
273                 "A_PARTLYSUBMITTED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
274             }
275            ```
276            * Custom method: it is possible to define a dedicated method that, given the next activity with its
277            features, returns the duration of waiting time. (CUSTOM)
278            ```json
279            "waiting_time": {
280                 "A_PARTLYSUBMITTED": { "name": "custom"}
281            }
282            ```
283            * Mixed: As the processing time, it is possible to define a mix of methods for each activity.
284            ```json
285            "waiting_time": {
286                 "A_PARTLYSUBMITTED":  { "name": "custom"},
287                 "A_APPROVED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
288            }
289            ```
290        """
291        try:
292            if self._params.WAITING_TIME[next_act]["name"] == 'custom':
293                duration = self.call_custom_waiting_time()
294            else:
295                distribution = self._params.WAITING_TIME[next_act]['name']
296                parameters = self._params.WAITING_TIME[next_act]['parameters']
297                duration = getattr(np.random, distribution)(**parameters, size=1)[0]
298                if duration < 0:
299                    print("WARNING: Negative waiting time",  duration)
300                    duration = 0
301        except:
302            duration = 0
304        return duration
306    def call_custom_processing_time(self):
307        """
308        Call to the custom functions in the file *custom_function.py*.
309        """
310        return custom.custom_processing_time(self._buffer)
312    def call_custom_waiting_time(self):
313        """
314            Call to the custom functions in the file *custom_function.py*.
315        """
316        return custom.custom_waiting_time(self._buffer)
318    def call_custom_xor_function(self, all_enabled_trans):
319        """
320            Call to the custom functions in the file *custom_function.py*.
321        """
322        return custom.custom_decision_mining(self._buffer)
324    def next_transition(self, env):
325        """
326        Method to define the next activity in the petrinet.
327        """
328        all_enabled_trans = semantics.enabled_transitions(self._net, self._am)
329        all_enabled_trans = list(all_enabled_trans)
330        all_enabled_trans.sort(key=lambda x: x.name)
331        if len(all_enabled_trans) == 0:
332            return None
333        elif len(all_enabled_trans) == 1:
334            return all_enabled_trans[0]
335        else:
336            if len(self._am) == 1:
337                return self.define_xor_next_activity(all_enabled_trans)
338            else:
339                events = []
340                for token in self._am:
341                    name = token.name
342                    new_am = copy.copy(self._am)
343                    tokens_to_delete = self._delete_tokens(name)
344                    for p in tokens_to_delete:
345                        del new_am[p]
346                    path = env.process(Token(self._id, self._net, new_am, self._params, self._process, self._prefix, "parallel", self._writer, self._parallel_object, self._buffer._get_dictionary()).simulation(env))
347                    events.append(path)
348                return events
Token( id: int, net: pm4py.objects.petri_net.obj.PetriNet, am: pm4py.objects.petri_net.obj.Marking, params: parameters.Parameters, process: process.SimulationProcess, prefix: utility.Prefix, type: str, writer: <built-in function writer>, parallel_object: utility.ParallelObject, time: datetime.datetime, values=None)
20    def __init__(self, id: int, net: pm4py.objects.petri_net.obj.PetriNet, am: pm4py.objects.petri_net.obj.Marking, params: Parameters, process: SimulationProcess, prefix: Prefix, type: str, writer: csv.writer, parallel_object: ParallelObject, time: datetime, values=None):
21        self._id = id
22        self._process = process
23        self._start_time = params.START_SIMULATION
24        self._params = params
25        self._net = net
26        self._am = am
27        self._prefix = prefix
28        self._type = type
29        if type == 'sequential':
30            self.see_activity = False
31        else:
32            self.see_activity = True
33        self._writer = writer
34        self._parallel_object = parallel_object
35        self._buffer = Buffer(writer, values)
36        self._buffer.set_feature("attribute_case", custom.case_function_attribute(self._id, time))
def simulation(self, env: simpy.core.Environment):
 46    def simulation(self, env: simpy.Environment):
 47        """
 48            The main function to handle the simulation of a single trace
 49        """
 50        trans = self.next_transition(env)
 51        ### register trace in process ###
 52        request_resource = None
 53        resource_trace = self._process._get_resource_trace()
 54        resource_trace_request = resource_trace.request() if self._type == 'sequential' else None
 56        while trans is not None:
 57            if not self.see_activity and self._type == 'sequential':
 58                yield resource_trace_request
 59            if type(trans) == list:
 60                yield AllOf(env, trans)
 61                am_after = self._parallel_object._get_last_events()
 62                for d in self._delete_places(self._am):
 63                    del self._am[d]
 64                for t in am_after:
 65                    self._am[t] = 1
 66                trans = self.next_transition(env)
 68            if trans and trans.label:
 69                self._buffer.reset()
 70                self._buffer.set_feature("id_case", self._id)
 71                self._buffer.set_feature("activity", trans.label)
 72                self._buffer.set_feature("prefix", self._prefix.get_prefix(self._start_time + timedelta(seconds=env.now)))
 73                self._buffer.set_feature("attribute_event", custom.event_function_attribute(self._id, self._start_time + timedelta(seconds=env.now)))
 75                ### call predictor for waiting time
 76                if trans.label in self._params.ROLE_ACTIVITY:
 77                    resource = self._process._get_resource(self._params.ROLE_ACTIVITY[trans.label])
 78                else:
 79                    raise ValueError('Not resource/role defined for this activity', trans.label)
 81                #self._buffer.set_feature("wip_wait", 0 if type != 'sequential' else resource_trace.count-1)
 82                self._buffer.set_feature("wip_wait", resource_trace.count)
 83                self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name()))
 84                self._buffer.set_feature("ro_total", self._process.get_occupations_all_role())
 85                self._buffer.set_feature("role", resource._get_name())
 87                ### register event in process ###
 88                resource_task = self._process._get_resource_event(trans.label)
 89                self._buffer.set_feature("wip_activity", resource_task.count)
 91                queue = 0 if len(resource._queue) == 0 else len(resource._queue[-1])
 92                self._buffer.set_feature("queue", queue)
 93                self._buffer.set_feature("enabled_time", self._start_time + timedelta(seconds=env.now))
 95                waiting = self.define_waiting_time(trans.label)
 96                if self.see_activity:
 97                    yield env.timeout(waiting)
 99                request_resource = resource.request()
100                yield request_resource
101                single_resource = self._process._set_single_resource(resource._get_name())
102                self._buffer.set_feature("resource", single_resource)
104                resource_task_request = resource_task.request()
105                yield resource_task_request
107                ### call predictor for processing time
108                self._buffer.set_feature("wip_start", resource_trace.count)
109                self._buffer.set_feature("ro_single", self._process.get_occupations_single_role(resource._get_name()))
110                self._buffer.set_feature("ro_total", self._process.get_occupations_all_role())
111                self._buffer.set_feature("wip_activity", resource_task.count)
113                stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now))
114                yield env.timeout(stop)
115                self._buffer.set_feature("start_time", self._start_time + timedelta(seconds=env.now))
116                duration = self.define_processing_time(trans.label)
118                yield env.timeout(duration)
120                self._buffer.set_feature("wip_end", resource_trace.count)
121                self._buffer.set_feature("end_time", self._start_time + timedelta(seconds=env.now))
122                self._buffer.print_values()
123                self._prefix.add_activity(trans.label)
124                resource.release(request_resource)
125                self._process._release_single_resource(resource._get_name(), single_resource)
126                resource_task.release(resource_task_request)
128            self._update_marking(trans)
129            trans = self.next_transition(env) if self._am else None
131        if self._type == 'parallel':
132            self._parallel_object._set_last_events(self._am)
133        if self._type == 'sequential':
134            resource_trace.release(resource_trace_request)

The main function to handle the simulation of a single trace

def define_xor_next_activity(self, all_enabled_trans):
185    def define_xor_next_activity(self, all_enabled_trans):
186        """ Three different methods to decide which path following from XOR gateway:
187        * Random choice: each path has equal probability to be chosen (AUTO)
188        ```json
189        "probability": {
190            "A_ACCEPTED": "AUTO",
191            "skip_2": "AUTO",
192            "A_FINALIZED": "AUTO",
193        }
194        ```
195        * Defined probability: in the file json it is possible to define for each path a specific probability (PROBABILITY as value)
196        ```json
197        "probability": {
198            "A_PREACCEPTED": 0.20,
199            "skip_1": 0.80
200        }
201        ```
202        * Custom method: it is possible to define a dedicate method that given the possible paths it returns the one to
203        follow, using whatever techniques the user prefers. (CUSTOM)
204        ```json
205        "probability": {
206            "A_CANCELLED": "CUSTOM",
207            "A_DECLINED": "CUSTOM",
208            "tauSplit_5": "CUSTOM"
209        }
210        ```
211        """
212        prob = ['AUTO'] if not self._params.PROBABILITY else self._retrieve_check_paths(all_enabled_trans)
213        self._check_type_paths(prob)
214        if prob[0] == 'AUTO':
215                next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0]
216        elif prob[0] == 'CUSTOM':
217            next = self.call_custom_xor_function(all_enabled_trans)
218        elif type(prob[0] == float()):
219            if not self._check_probability(prob):
220                value = [*range(0, len(prob), 1)]
221                next = int(random.choices(value, prob)[0])
222            else:
223                next = random.choices(list(range(0, len(all_enabled_trans), 1)))[0]
225        return all_enabled_trans[next]

Three different methods to decide which path following from XOR gateway:

  • Random choice: each path has equal probability to be chosen (AUTO)
"probability": {
    "skip_2": "AUTO",
  • Defined probability: in the file json it is possible to define for each path a specific probability (PROBABILITY as value)
"probability": {
    "A_PREACCEPTED": 0.20,
    "skip_1": 0.80
  • Custom method: it is possible to define a dedicate method that given the possible paths it returns the one to follow, using whatever techniques the user prefers. (CUSTOM)
"probability": {
    "tauSplit_5": "CUSTOM"
def define_processing_time(self, activity):
227    def define_processing_time(self, activity):
228        """ Three different methods are available to define the processing time for each activity:
229            * Distribution function: specify in the json file the distribution with the right parameters for each
230            activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION).
231            **Be careful**: A negative value generated by the distribution is not valid for the simulator.
232            ```json
233             "processing_time": {
234                 "A_FINALIZED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}},
235             }
236            ```
237            * Custom method: it is possible to define a dedicated method that, given the activity and its
238            characteristics, returns the duration of processing time required. (CUSTOM)
239            ```json
240            "processing_time": {
241                 "A_FINALIZED":  { "name": "custom"}
242            }
243            ```
244            * Mixed: It is possible to define a distribution function for some activities and a dedicated method for the others.
245            ```json
246            "processing_time": {
247                 "A_FINALIZED":  { "name": "custom"},
248                 "A_REGISTERED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
249            }
250            ```
251        """
252        try:
253            if self._params.PROCESSING_TIME[activity]["name"] == 'custom':
254                duration = self.call_custom_processing_time()
255            else:
256                distribution = self._params.PROCESSING_TIME[activity]['name']
257                parameters = self._params.PROCESSING_TIME[activity]['parameters']
258                duration = getattr(np.random, distribution)(**parameters, size=1)[0]
259                if duration < 0:
260                    print("WARNING: Negative processing time",  duration)
261                    duration = 0
262        except:
263            raise ValueError("ERROR: The processing time of", activity, "is not defined in json file")
264        return duration

Three different methods are available to define the processing time for each activity:

  • Distribution function: specify in the json file the distribution with the right parameters for each activity, see the numpy_distribution distribution, (DISTRIBUTION). Be careful: A negative value generated by the distribution is not valid for the simulator.
 "processing_time": {
     "A_FINALIZED": { "name": "uniform", "parameters": { "low": 3600, "high": 7200}},
  • Custom method: it is possible to define a dedicated method that, given the activity and its characteristics, returns the duration of processing time required. (CUSTOM)
"processing_time": {
     "A_FINALIZED":  { "name": "custom"}
  • Mixed: It is possible to define a distribution function for some activities and a dedicated method for the others.
"processing_time": {
     "A_FINALIZED":  { "name": "custom"},
     "A_REGISTERED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
def define_waiting_time(self, next_act):
266    def define_waiting_time(self, next_act):
267        """ Three different methods are available to define the waiting time before each activity:
268            * Distribution function: specify in the json file the distribution with the right parameters for each
269            activity, see the [numpy_distribution](https://numpy.org/doc/stable/reference/random/generator.html) distribution, (DISTRIBUTION).
270            **Be careful**: A negative value generated by the distribution is not valid for the simulator.
271            ```json
272             "waiting_time": {
273                 "A_PARTLYSUBMITTED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
274             }
275            ```
276            * Custom method: it is possible to define a dedicated method that, given the next activity with its
277            features, returns the duration of waiting time. (CUSTOM)
278            ```json
279            "waiting_time": {
280                 "A_PARTLYSUBMITTED": { "name": "custom"}
281            }
282            ```
283            * Mixed: As the processing time, it is possible to define a mix of methods for each activity.
284            ```json
285            "waiting_time": {
286                 "A_PARTLYSUBMITTED":  { "name": "custom"},
287                 "A_APPROVED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
288            }
289            ```
290        """
291        try:
292            if self._params.WAITING_TIME[next_act]["name"] == 'custom':
293                duration = self.call_custom_waiting_time()
294            else:
295                distribution = self._params.WAITING_TIME[next_act]['name']
296                parameters = self._params.WAITING_TIME[next_act]['parameters']
297                duration = getattr(np.random, distribution)(**parameters, size=1)[0]
298                if duration < 0:
299                    print("WARNING: Negative waiting time",  duration)
300                    duration = 0
301        except:
302            duration = 0
304        return duration

Three different methods are available to define the waiting time before each activity:

  • Distribution function: specify in the json file the distribution with the right parameters for each activity, see the numpy_distribution distribution, (DISTRIBUTION). Be careful: A negative value generated by the distribution is not valid for the simulator.
 "waiting_time": {
     "A_PARTLYSUBMITTED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
  • Custom method: it is possible to define a dedicated method that, given the next activity with its features, returns the duration of waiting time. (CUSTOM)
"waiting_time": {
     "A_PARTLYSUBMITTED": { "name": "custom"}
  • Mixed: As the processing time, it is possible to define a mix of methods for each activity.
"waiting_time": {
     "A_PARTLYSUBMITTED":  { "name": "custom"},
     "A_APPROVED":  { "name": "uniform", "parameters": { "low": 3600, "high": 7200}}
def call_custom_processing_time(self):
306    def call_custom_processing_time(self):
307        """
308        Call to the custom functions in the file *custom_function.py*.
309        """
310        return custom.custom_processing_time(self._buffer)

Call to the custom functions in the file custom_function.py.

def call_custom_waiting_time(self):
312    def call_custom_waiting_time(self):
313        """
314            Call to the custom functions in the file *custom_function.py*.
315        """
316        return custom.custom_waiting_time(self._buffer)

Call to the custom functions in the file custom_function.py.

def call_custom_xor_function(self, all_enabled_trans):
318    def call_custom_xor_function(self, all_enabled_trans):
319        """
320            Call to the custom functions in the file *custom_function.py*.
321        """
322        return custom.custom_decision_mining(self._buffer)

Call to the custom functions in the file custom_function.py.

def next_transition(self, env):
324    def next_transition(self, env):
325        """
326        Method to define the next activity in the petrinet.
327        """
328        all_enabled_trans = semantics.enabled_transitions(self._net, self._am)
329        all_enabled_trans = list(all_enabled_trans)
330        all_enabled_trans.sort(key=lambda x: x.name)
331        if len(all_enabled_trans) == 0:
332            return None
333        elif len(all_enabled_trans) == 1:
334            return all_enabled_trans[0]
335        else:
336            if len(self._am) == 1:
337                return self.define_xor_next_activity(all_enabled_trans)
338            else:
339                events = []
340                for token in self._am:
341                    name = token.name
342                    new_am = copy.copy(self._am)
343                    tokens_to_delete = self._delete_tokens(name)
344                    for p in tokens_to_delete:
345                        del new_am[p]
346                    path = env.process(Token(self._id, self._net, new_am, self._params, self._process, self._prefix, "parallel", self._writer, self._parallel_object, self._buffer._get_dictionary()).simulation(env))
347                    events.append(path)
348                return events

Method to define the next activity in the petrinet.