Event Simulation#
- exception seqlogic.CancelledError#
Task has been cancelled.
- exception seqlogic.FinishError#
Force the simulation to stop.
- exception seqlogic.InvalidStateError#
Task has an invalid state.
- class seqlogic.Region(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#
- class seqlogic.Singular(value)#
Model state organized as a single unit.
- class seqlogic.Aggregate(value)#
Model state organized as multiple units.
- class seqlogic.AggrValue(fget, fset)#
Wrap Aggregate value getter/setter.
- class seqlogic.TaskState(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#
Task State.
Transitions:
+--------------------------+ | | v | CREATED -> PENDING -> RUNNING -> WAIT_* -> CANCELLED -> EXCEPTED -> RETURNED
- CREATED = 1#
- WAIT_FIFO = 2#
- WAIT_STATE = 3#
- PENDING = 4#
- RUNNING = 5#
- RETURNED = 6#
- EXCEPTED = 7#
- CANCELLED = 8#
- class seqlogic.Task(coro: Coroutine, region: Region = Region.ACTIVE)#
Coroutine wrapper.
- property region#
- run(value=None)#
- done() bool#
- cancelled() bool#
- set_result(result)#
- result()#
- set_exception(exc)#
- exception()#
- get_coro() Coroutine#
- cancel(msg: str | None = None)#
- class seqlogic.TaskGroup#
Group of tasks.
- class seqlogic.Event#
Notify multiple tasks that some event has happened.
- async wait()#
- set()#
- clear()#
- is_set() bool#
- class seqlogic.Semaphore(value: int = 1)#
Semaphore to synchronize tasks.
- async acquire()#
- try_acquire() bool#
- release()#
- locked() bool#
- class seqlogic.Lock#
Bases:
BoundedSemaphoreMutex lock to synchronize tasks.
- class seqlogic.EventLoop#
Simulation event loop.
- clear()#
Clear all task collections.
- restart()#
Restart current simulation.
- time() int#
- run(ticks: int | None = None, until: int | None = None)#
Run the simulation.
Until: 1. We hit the runlimit, OR 2. There are no tasks left in the queue
- irun(ticks: int | None = None, until: int | None = None) Generator[int, None, None]#
Iterate the simulation.
Until: 1. We hit the runlimit, OR 2. There are no tasks left in the queue
- seqlogic.del_event_loop()#
Delete the current event loop.
- seqlogic.now() int#
- seqlogic.run(coro: Coroutine | None = None, region: Region = Region.ACTIVE, loop: EventLoop | None = None, ticks: int | None = None, until: int | None = None)#
Run a simulation.
- seqlogic.irun(coro: Coroutine | None = None, region: Region = Region.ACTIVE, loop: EventLoop | None = None, ticks: int | None = None, until: int | None = None) Generator[int, None, None]#
Iterate a simulation.
- async seqlogic.sleep(delay: int)#
Suspend the task, and wake up after a delay.
- async seqlogic.changed(*states: State) State#
Resume execution upon state change.
- async seqlogic.resume(*triggers: tuple[State, Predicate]) State#
Resume execution upon event.
- seqlogic.finish()#