Source code for pyunity.events

## Copyright (c) 2020-2022 The PyUnity Team
## This file is licensed under the MIT License.
## See https://docs.pyunity.x10.bz/en/latest/license.html

__all__ = ["Event", "EventLoopManager", "EventLoop", "WaitForSeconds", "WaitForEventLoop",
           "WaitForUpdate", "WaitForFixedUpdate", "WaitForRender", "StartCoroutine"]

from . import Logger, config
from .errors import PyUnityException
from .core import Component, GameObject
from .values import SavableStruct, StructEntry, Clock
import functools
import threading
import asyncio
import inspect
# import signal
import time

[docs]@SavableStruct( component=StructEntry(Component, required=True), name=StructEntry(str, required=True), args=StructEntry(tuple, required=True)) class Event: def __init__(self, func, args=(), kwargs={}): if not hasattr(func, "__self__"): raise PyUnityException( "Cannot create event from callback that is not attached to a Component") if not isinstance(func.__self__, Component): raise PyUnityException( "Cannot create event from callback that is not attached to a Component") if not isinstance(func.__self__.gameObject, GameObject): raise PyUnityException( "Provided callback component does not belong to a GameObject") functools.update_wrapper(self, func) self.component = func.__self__ self.name = func.__name__ self.func = func self.args = args self.kwargs = kwargs self.isAsync = inspect.iscoroutinefunction(func)
[docs] def trigger(self): return self.func(*self.args, **self.kwargs)
[docs] def callSoon(self): if self.isAsync: loop = asyncio.get_running_loop() loop.create_task(self.trigger()) else: if EventLoopManager.current is None: raise PyUnityException("No EventLoopManager running") EventLoopManager.current.pending.append(self)
def _fromDict(self, factory, attrs, instanceCheck=None): def wrapper(component, method, args=(), kwargs={}): func = getattr(component, method) return factory(func, args, kwargs) return SavableStruct.fromDict(self, wrapper, attrs, instanceCheck)
[docs]def wrap(func): @functools.wraps(func) def inner(loop): return func() return inner
[docs]class EventLoopManager: current = None exceptions = [] exceptionLock = threading.RLock() waitingLock = threading.RLock() def __init__(self): self.threads = [] self.loops = [] self.separateLoops = [] self.waiting = {} self.pending = [] self.updates = [] self.mainLoop = None self.mainWaitFor = None self.running = False
[docs] def schedule(self, *funcs, main=False, ups=None, waitFor=None): functions = list(funcs) for i in range(len(functions)): sig = inspect.signature(functions[i]) if "loop" not in sig.parameters: functions[i] = wrap(functions[i]) if main: self.updates.extend(functions) self.mainWaitFor = waitFor else: if ups is None: raise PyUnityException("ups argument is required if main is False") self.waiting[waitFor] = [] loop = EventLoop() self.loops.append(loop) def inner(): clock = Clock() clock.Start(ups) while self.running: with EventLoopManager.waitingLock: for waiter in self.waiting[waitFor]: waiter.loop.call_soon_threadsafe(waiter.event.set) for func in functions: try: func(loop) except Exception as e: with EventLoopManager.exceptionLock: EventLoopManager.exceptions.append(e) break loop.call_soon(loop.stop) loop.run_forever() clock.Maintain() t = threading.Thread(target=inner, daemon=True) self.threads.append(t)
[docs] def addLoop(self, loop): def inner(): while self.running: loop.call_soon(loop.stop) loop.run_forever() self.loops.append(loop) self.separateLoops.append(loop) t = threading.Thread(target=inner, daemon=True) self.threads.append(t)
[docs] def start(self): if EventLoopManager.current is not None: raise PyUnityException("Only one EventLoopManager can be running") EventLoopManager.current = self self.waiting[self.mainWaitFor] = [] for loop in self.separateLoops: loop.call_soon(loop.stop) loop.run_forever() # Run until awaits are encountered self.separateLoops.clear() self.running = True for thread in self.threads: thread.start() self.mainLoop = EventLoop() asyncio.set_event_loop(self.mainLoop) while self.running: with EventLoopManager.exceptionLock: if len(EventLoopManager.exceptions): from . import SceneManager from .scenes.runner import ChangeScene if isinstance(EventLoopManager.exceptions[0], ChangeScene): exc = EventLoopManager.exceptions.pop() EventLoopManager.exceptions.clear() raise exc elif config.exitOnError: Logger.LogLine(Logger.ERROR, f"Exception in Scene: {SceneManager.CurrentScene().name!r}") exc = EventLoopManager.exceptions.pop() EventLoopManager.exceptions.clear() raise exc else: for exception in EventLoopManager.exceptions: Logger.LogLine(Logger.ERROR, f"Exception ignored in Scene: {SceneManager.CurrentScene().name!r}") Logger.LogException(exception) EventLoopManager.exceptions.clear() with EventLoopManager.waitingLock: for waiter in self.waiting[self.mainWaitFor]: waiter.loop.call_soon_threadsafe(waiter.event.set) for func in self.updates: func(loop) for event in self.pending: event.trigger() self.pending.clear() self.mainLoop.call_soon(self.mainLoop.stop) self.mainLoop.run_forever()
[docs] def quit(self): self.running = False for thread in self.threads: thread.join() # Will wait until this iteration has finished self.threads = [] self.loops = [] self.separateLoops = [] self.waiting = {} self.pending = [] self.updates = [] self.mainLoop = None EventLoopManager.current = None
[docs]class EventLoop(asyncio.SelectorEventLoop): def __init__(self, selector=None): super(EventLoop, self).__init__(selector) self.set_exception_handler(EventLoop.handleException) # signals = (signal.SIGTERM, signal.SIGINT) # for s in signals: # self.add_signal_handler( # s, lambda sig=s: asyncio.create_task(self.shutdown(sig)))
[docs] async def shutdown(self, signal=None): if signal is not None: Logger.LogLine(Logger.INFO, f"Received exit signal {signal.name}") tasks = [t for t in asyncio.all_tasks(self) if t is not asyncio.current_task()] for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) self.stop()
[docs] def handleException(self, context): if "exception" in context: with EventLoopManager.exceptionLock: EventLoopManager.exceptions.append(context["exception"])
[docs]def StartCoroutine(coro): loop = asyncio.get_running_loop() if not isinstance(loop, EventLoop): Logger.LogLine(Logger.ERROR, f"Expected loop of type EventLoop, got {type(loop).__name__}") loop.create_task(coro)
[docs]class WaitForSeconds: def __init__(self, length): self.length = length def __await__(self): start = time.perf_counter() sleep = asyncio.sleep(self.length) yield from sleep.__await__() return time.perf_counter() - start
[docs]class WaitForEventLoop: def __init__(self): self.event = asyncio.Event() self.loop = asyncio.get_running_loop() with EventLoopManager.waitingLock: EventLoopManager.current.waiting[type(self)].append(self) def __await__(self): start = time.perf_counter() yield from self.event.wait().__await__() with EventLoopManager.waitingLock: EventLoopManager.current.waiting[type(self)].remove(self) return time.perf_counter() - start
[docs]class WaitForUpdate(WaitForEventLoop): pass
[docs]class WaitForFixedUpdate(WaitForEventLoop): pass
[docs]class WaitForRender(WaitForEventLoop): pass