inter_trigger_timer
Class to manage the arrivals times of tokes in the process.
1""" 2Class to manage the arrivals times of tokes in the process. 3""" 4 5import numpy as np 6from datetime import datetime, timedelta 7from parameters import Parameters 8from process import SimulationProcess 9import custom_function as custom 10 11 12class InterTriggerTimer(object): 13 14 def __init__(self, params: Parameters, process: SimulationProcess, start: datetime): 15 self._process = process 16 self._start_time = start 17 self._type = params.INTER_TRIGGER['type'] 18 if self._type == 'distribution': 19 """Define the distribution of token arrivals from specified in the file json""" 20 self.name_distribution = params.INTER_TRIGGER['name'] 21 self.params = params.INTER_TRIGGER['parameters'] 22 23 def get_next_arrival(self, env, case): 24 """Generate a new arrival from the distribution and check if the new token arrival is inside calendar, 25 otherwise wait for a suitable time.""" 26 if self._type == 'distribution': 27 resource = self._process._get_resource('TRIGGER_TIMER') 28 arrival = getattr(np.random, self.name_distribution)(**self.params, size=1)[0] 29 if resource._get_calendar(): 30 stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now + arrival)) 31 return stop + arrival 32 else: 33 return arrival 34 elif self._type == 'custom': 35 return self.custom_arrival(case) 36 else: 37 raise ValueError('ERROR: Invalid arrival times generator') 38 39 def custom_arrival(self, case): 40 """ 41 Call to the custom functions in the file custom_function.py. 42 """ 43 return custom.custom_arrivals_time(case)
class
InterTriggerTimer:
13class InterTriggerTimer(object): 14 15 def __init__(self, params: Parameters, process: SimulationProcess, start: datetime): 16 self._process = process 17 self._start_time = start 18 self._type = params.INTER_TRIGGER['type'] 19 if self._type == 'distribution': 20 """Define the distribution of token arrivals from specified in the file json""" 21 self.name_distribution = params.INTER_TRIGGER['name'] 22 self.params = params.INTER_TRIGGER['parameters'] 23 24 def get_next_arrival(self, env, case): 25 """Generate a new arrival from the distribution and check if the new token arrival is inside calendar, 26 otherwise wait for a suitable time.""" 27 if self._type == 'distribution': 28 resource = self._process._get_resource('TRIGGER_TIMER') 29 arrival = getattr(np.random, self.name_distribution)(**self.params, size=1)[0] 30 if resource._get_calendar(): 31 stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now + arrival)) 32 return stop + arrival 33 else: 34 return arrival 35 elif self._type == 'custom': 36 return self.custom_arrival(case) 37 else: 38 raise ValueError('ERROR: Invalid arrival times generator') 39 40 def custom_arrival(self, case): 41 """ 42 Call to the custom functions in the file custom_function.py. 43 """ 44 return custom.custom_arrivals_time(case)
InterTriggerTimer( params: parameters.Parameters, process: process.SimulationProcess, start: datetime.datetime)
15 def __init__(self, params: Parameters, process: SimulationProcess, start: datetime): 16 self._process = process 17 self._start_time = start 18 self._type = params.INTER_TRIGGER['type'] 19 if self._type == 'distribution': 20 """Define the distribution of token arrivals from specified in the file json""" 21 self.name_distribution = params.INTER_TRIGGER['name'] 22 self.params = params.INTER_TRIGGER['parameters']
def
get_next_arrival(self, env, case):
25 def get_next_arrival(self, env, case): 26 """Generate a new arrival from the distribution and check if the new token arrival is inside calendar, 27 otherwise wait for a suitable time.""" 28 next = 0 29 if self._type == 'distribution': 30 resource = self._process._get_resource('TRIGGER_TIMER') 31 arrival = getattr(np.random, self.name_distribution)(**self.params, size=1)[0] 32 if resource._get_calendar(): 33 stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now + arrival)) 34 next = stop + arrival 35 else: 36 next = arrival 37 elif self._type == 'custom': 38 next = self.custom_arrival(case, self._previous) 39 else: 40 raise ValueError('ERROR: Invalid arrival times generator') 41 self._previous = self._start_time + timedelta(seconds=env.now + next) 42 return next
Generate a new arrival from the distribution and check if the new token arrival is inside calendar, otherwise wait for a suitable time.