Event Simulation#

exception seqlogic.CancelledError#

Task has been cancelled.

exception seqlogic.FinishError#

Force the simulation to stop.

exception seqlogic.InvalidStateError#

Task has an invalid state.

class seqlogic.Region(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#
class seqlogic.Singular(value)#

Model state organized as a single unit.

class seqlogic.Aggregate(value)#

Model state organized as multiple units.

class seqlogic.AggrValue(fget, fset)#

Wrap Aggregate value getter/setter.

class seqlogic.TaskState(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#

Task State.

Transitions:

           +--------------------------+
           |                          |
           v                          |
CREATED -> PENDING -> RUNNING -> WAIT_*
                              -> CANCELLED
                              -> EXCEPTED
                              -> RETURNED
CREATED = 1#
WAIT_FIFO = 2#
WAIT_STATE = 3#
PENDING = 4#
RUNNING = 5#
RETURNED = 6#
EXCEPTED = 7#
CANCELLED = 8#
class seqlogic.Task(coro: Coroutine, region: Region = Region.ACTIVE)#

Coroutine wrapper.

property region#
set_state(state: TaskState, parent=None)#
run(value=None)#
done() bool#
cancelled() bool#
set_result(result)#
result()#
set_exception(exc)#
exception()#
get_coro() Coroutine#
cancel(msg: str | None = None)#
class seqlogic.TaskGroup#

Group of tasks.

create_task(coro: Coroutine, region: Region = Region.ACTIVE) Task#
class seqlogic.Event#

Notify multiple tasks that some event has happened.

async wait()#
set()#
clear()#
is_set() bool#
class seqlogic.Semaphore(value: int = 1)#

Semaphore to synchronize tasks.

async acquire()#
try_acquire() bool#
release()#
locked() bool#
class seqlogic.BoundedSemaphore(value: int = 1)#

Bases: Semaphore

Semaphore.release()#
class seqlogic.Lock#

Bases: BoundedSemaphore

Mutex lock to synchronize tasks.

class seqlogic.EventLoop#

Simulation event loop.

clear()#

Clear all task collections.

restart()#

Restart current simulation.

time() int#
task() Task | None#
run(ticks: int | None = None, until: int | None = None)#

Run the simulation.

Until: 1. We hit the runlimit, OR 2. There are no tasks left in the queue

irun(ticks: int | None = None, until: int | None = None) Generator[int, None, None]#

Iterate the simulation.

Until: 1. We hit the runlimit, OR 2. There are no tasks left in the queue

seqlogic.get_running_loop() EventLoop#
seqlogic.get_event_loop() EventLoop | None#

Get the current event loop.

seqlogic.set_event_loop(loop: EventLoop)#

Set the current event loop.

seqlogic.new_event_loop() EventLoop#

Create and return a new event loop.

seqlogic.del_event_loop()#

Delete the current event loop.

seqlogic.now() int#
seqlogic.run(coro: Coroutine | None = None, region: Region = Region.ACTIVE, loop: EventLoop | None = None, ticks: int | None = None, until: int | None = None)#

Run a simulation.

seqlogic.irun(coro: Coroutine | None = None, region: Region = Region.ACTIVE, loop: EventLoop | None = None, ticks: int | None = None, until: int | None = None) Generator[int, None, None]#

Iterate a simulation.

async seqlogic.sleep(delay: int)#

Suspend the task, and wake up after a delay.

async seqlogic.changed(*states: State) State#

Resume execution upon state change.

async seqlogic.resume(*triggers: tuple[State, Predicate]) State#

Resume execution upon event.

async seqlogic.wait(aws, return_when='ALL_COMPLETED') tuple[set[Task], set[Task]]#
seqlogic.finish()#