inter_trigger_timer

Class to manage the arrivals times of tokes in the process.

 1"""
 2Class to manage the arrivals times of tokes in the process.
 3"""
 4
 5import numpy as np
 6from datetime import datetime, timedelta
 7from parameters import Parameters
 8from process import SimulationProcess
 9import custom_function as custom
10
11
12class InterTriggerTimer(object):
13
14    def __init__(self, params: Parameters, process: SimulationProcess, start: datetime):
15        self._process = process
16        self._start_time = start
17        self._type = params.INTER_TRIGGER['type']
18        if self._type == 'distribution':
19            """Define the distribution of token arrivals from specified in the file json"""
20            self.name_distribution = params.INTER_TRIGGER['name']
21            self.params = params.INTER_TRIGGER['parameters']
22
23    def get_next_arrival(self, env, case):
24        """Generate a new arrival from the distribution and check if the new token arrival is inside calendar,
25        otherwise wait for a suitable time."""
26        if self._type == 'distribution':
27            resource = self._process._get_resource('TRIGGER_TIMER')
28            arrival = getattr(np.random, self.name_distribution)(**self.params, size=1)[0]
29            if resource._get_calendar():
30                stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now + arrival))
31                return stop + arrival
32            else:
33                return arrival
34        elif self._type == 'custom':
35            return self.custom_arrival(case)
36        else:
37            raise ValueError('ERROR: Invalid arrival times generator')
38
39    def custom_arrival(self, case):
40        """
41        Call to the custom functions in the file custom_function.py.
42        """
43        return custom.custom_arrivals_time(case)
class InterTriggerTimer:
13class InterTriggerTimer(object):
14
15    def __init__(self, params: Parameters, process: SimulationProcess, start: datetime):
16        self._process = process
17        self._start_time = start
18        self._type = params.INTER_TRIGGER['type']
19        if self._type == 'distribution':
20            """Define the distribution of token arrivals from specified in the file json"""
21            self.name_distribution = params.INTER_TRIGGER['name']
22            self.params = params.INTER_TRIGGER['parameters']
23
24    def get_next_arrival(self, env, case):
25        """Generate a new arrival from the distribution and check if the new token arrival is inside calendar,
26        otherwise wait for a suitable time."""
27        if self._type == 'distribution':
28            resource = self._process._get_resource('TRIGGER_TIMER')
29            arrival = getattr(np.random, self.name_distribution)(**self.params, size=1)[0]
30            if resource._get_calendar():
31                stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now + arrival))
32                return stop + arrival
33            else:
34                return arrival
35        elif self._type == 'custom':
36            return self.custom_arrival(case)
37        else:
38            raise ValueError('ERROR: Invalid arrival times generator')
39
40    def custom_arrival(self, case):
41        """
42        Call to the custom functions in the file custom_function.py.
43        """
44        return custom.custom_arrivals_time(case)
InterTriggerTimer( params: parameters.Parameters, process: process.SimulationProcess, start: datetime.datetime)
15    def __init__(self, params: Parameters, process: SimulationProcess, start: datetime):
16        self._process = process
17        self._start_time = start
18        self._type = params.INTER_TRIGGER['type']
19        if self._type == 'distribution':
20            """Define the distribution of token arrivals from specified in the file json"""
21            self.name_distribution = params.INTER_TRIGGER['name']
22            self.params = params.INTER_TRIGGER['parameters']
def get_next_arrival(self, env, case):
25    def get_next_arrival(self, env, case):
26        """Generate a new arrival from the distribution and check if the new token arrival is inside calendar,
27        otherwise wait for a suitable time."""
28        next = 0
29        if self._type == 'distribution':
30            resource = self._process._get_resource('TRIGGER_TIMER')
31            arrival = getattr(np.random, self.name_distribution)(**self.params, size=1)[0]
32            if resource._get_calendar():
33                stop = resource.to_time_schedule(self._start_time + timedelta(seconds=env.now + arrival))
34                next = stop + arrival
35            else:
36                next = arrival
37        elif self._type == 'custom':
38            next = self.custom_arrival(case, self._previous)
39        else:
40            raise ValueError('ERROR: Invalid arrival times generator')
41        self._previous = self._start_time + timedelta(seconds=env.now + next)
42        return next

Generate a new arrival from the distribution and check if the new token arrival is inside calendar, otherwise wait for a suitable time.

def custom_arrival(self, case):
40    def custom_arrival(self, case):
41        """
42        Call to the custom functions in the file custom_function.py.
43        """
44        return custom.custom_arrivals_time(case)

Call to the custom functions in the file custom_function.py.