## Copyright (c) 2020-2022 The PyUnity Team
## This file is licensed under the MIT License.
## See https://docs.pyunity.x10.bz/en/latest/license.html
__all__ = ["Event", "EventLoopManager", "EventLoop", "WaitForSeconds", "WaitForEventLoop",
"WaitForUpdate", "WaitForFixedUpdate", "WaitForRender", "StartCoroutine"]
from . import Logger, config
from .errors import PyUnityException
from .core import Component, GameObject
from .values import SavableStruct, StructEntry, Clock
import functools
import threading
import asyncio
import inspect
# import signal
import time
[docs]@SavableStruct(
component=StructEntry(Component, required=True),
name=StructEntry(str, required=True),
args=StructEntry(tuple, required=True))
class Event:
def __init__(self, func, args=(), kwargs={}):
if not hasattr(func, "__self__"):
raise PyUnityException(
"Cannot create event from callback that is not attached to a Component")
if not isinstance(func.__self__, Component):
raise PyUnityException(
"Cannot create event from callback that is not attached to a Component")
if not isinstance(func.__self__.gameObject, GameObject):
raise PyUnityException(
"Provided callback component does not belong to a GameObject")
functools.update_wrapper(self, func)
self.component = func.__self__
self.name = func.__name__
self.func = func
self.args = args
self.kwargs = kwargs
self.isAsync = inspect.iscoroutinefunction(func)
[docs] def trigger(self):
return self.func(*self.args, **self.kwargs)
[docs] def callSoon(self):
if self.isAsync:
loop = asyncio.get_running_loop()
loop.create_task(self.trigger())
else:
if EventLoopManager.current is None:
raise PyUnityException("No EventLoopManager running")
EventLoopManager.current.pending.append(self)
def _fromDict(self, factory, attrs, instanceCheck=None):
def wrapper(component, method, args=(), kwargs={}):
func = getattr(component, method)
return factory(func, args, kwargs)
return SavableStruct.fromDict(self, wrapper, attrs, instanceCheck)
[docs]def wrap(func):
@functools.wraps(func)
def inner(loop):
return func()
return inner
[docs]class EventLoopManager:
current = None
exceptions = []
exceptionLock = threading.RLock()
waitingLock = threading.RLock()
def __init__(self):
self.threads = []
self.loops = []
self.separateLoops = []
self.waiting = {}
self.pending = []
self.updates = []
self.mainLoop = None
self.mainWaitFor = None
self.running = False
[docs] def schedule(self, *funcs, main=False, ups=None, waitFor=None):
functions = list(funcs)
for i in range(len(functions)):
sig = inspect.signature(functions[i])
if "loop" not in sig.parameters:
functions[i] = wrap(functions[i])
if main:
self.updates.extend(functions)
self.mainWaitFor = waitFor
else:
if ups is None:
raise PyUnityException("ups argument is required if main is False")
self.waiting[waitFor] = []
loop = EventLoop()
self.loops.append(loop)
def inner():
clock = Clock()
clock.Start(ups)
while self.running:
with EventLoopManager.waitingLock:
for waiter in self.waiting[waitFor]:
waiter.loop.call_soon_threadsafe(waiter.event.set)
for func in functions:
try:
func(loop)
except Exception as e:
with EventLoopManager.exceptionLock:
EventLoopManager.exceptions.append(e)
break
loop.call_soon(loop.stop)
loop.run_forever()
clock.Maintain()
t = threading.Thread(target=inner, daemon=True)
self.threads.append(t)
[docs] def addLoop(self, loop):
def inner():
while self.running:
loop.call_soon(loop.stop)
loop.run_forever()
self.loops.append(loop)
self.separateLoops.append(loop)
t = threading.Thread(target=inner, daemon=True)
self.threads.append(t)
[docs] def start(self):
if EventLoopManager.current is not None:
raise PyUnityException("Only one EventLoopManager can be running")
EventLoopManager.current = self
self.waiting[self.mainWaitFor] = []
for loop in self.separateLoops:
loop.call_soon(loop.stop)
loop.run_forever() # Run until awaits are encountered
self.separateLoops.clear()
self.running = True
for thread in self.threads:
thread.start()
self.mainLoop = EventLoop()
asyncio.set_event_loop(self.mainLoop)
while self.running:
with EventLoopManager.exceptionLock:
if len(EventLoopManager.exceptions):
from . import SceneManager
from .scenes.runner import ChangeScene
if isinstance(EventLoopManager.exceptions[0], ChangeScene):
exc = EventLoopManager.exceptions.pop()
EventLoopManager.exceptions.clear()
raise exc
elif config.exitOnError:
Logger.LogLine(Logger.ERROR,
f"Exception in Scene: {SceneManager.CurrentScene().name!r}")
exc = EventLoopManager.exceptions.pop()
EventLoopManager.exceptions.clear()
raise exc
else:
for exception in EventLoopManager.exceptions:
Logger.LogLine(Logger.ERROR,
f"Exception ignored in Scene: {SceneManager.CurrentScene().name!r}")
Logger.LogException(exception)
EventLoopManager.exceptions.clear()
with EventLoopManager.waitingLock:
for waiter in self.waiting[self.mainWaitFor]:
waiter.loop.call_soon_threadsafe(waiter.event.set)
for func in self.updates:
func(loop)
for event in self.pending:
event.trigger()
self.pending.clear()
self.mainLoop.call_soon(self.mainLoop.stop)
self.mainLoop.run_forever()
[docs] def quit(self):
self.running = False
for thread in self.threads:
thread.join() # Will wait until this iteration has finished
self.threads = []
self.loops = []
self.separateLoops = []
self.waiting = {}
self.pending = []
self.updates = []
self.mainLoop = None
EventLoopManager.current = None
[docs]class EventLoop(asyncio.SelectorEventLoop):
def __init__(self, selector=None):
super(EventLoop, self).__init__(selector)
self.set_exception_handler(EventLoop.handleException)
# signals = (signal.SIGTERM, signal.SIGINT)
# for s in signals:
# self.add_signal_handler(
# s, lambda sig=s: asyncio.create_task(self.shutdown(sig)))
[docs] async def shutdown(self, signal=None):
if signal is not None:
Logger.LogLine(Logger.INFO, f"Received exit signal {signal.name}")
tasks = [t for t in asyncio.all_tasks(self) if t is not
asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self.stop()
[docs] def handleException(self, context):
if "exception" in context:
with EventLoopManager.exceptionLock:
EventLoopManager.exceptions.append(context["exception"])
[docs]def StartCoroutine(coro):
loop = asyncio.get_running_loop()
if not isinstance(loop, EventLoop):
Logger.LogLine(Logger.ERROR,
f"Expected loop of type EventLoop, got {type(loop).__name__}")
loop.create_task(coro)
[docs]class WaitForSeconds:
def __init__(self, length):
self.length = length
def __await__(self):
start = time.perf_counter()
sleep = asyncio.sleep(self.length)
yield from sleep.__await__()
return time.perf_counter() - start
[docs]class WaitForEventLoop:
def __init__(self):
self.event = asyncio.Event()
self.loop = asyncio.get_running_loop()
with EventLoopManager.waitingLock:
EventLoopManager.current.waiting[type(self)].append(self)
def __await__(self):
start = time.perf_counter()
yield from self.event.wait().__await__()
with EventLoopManager.waitingLock:
EventLoopManager.current.waiting[type(self)].remove(self)
return time.perf_counter() - start
[docs]class WaitForUpdate(WaitForEventLoop):
pass
[docs]class WaitForFixedUpdate(WaitForEventLoop):
pass
[docs]class WaitForRender(WaitForEventLoop):
pass