Python: Initial support for asyncio

The API is minimal at this point: `slint.run_event_loop()` takes an
optional coroutine parameter, that's run. As the loop is run, an
asyncio event loop (as per asyncio.get_event_loop()) is active, which
maps socket operations to smol's Async adapter and polls them within the
Slint event loop.

cc #4137
This commit is contained in:
Simon Hausmann 2025-08-06 13:45:49 +02:00 committed by Simon Hausmann
parent 855cd0df14
commit ea0e3c1372
13 changed files with 699 additions and 12 deletions

View file

@ -52,6 +52,7 @@ chrono = "0.4"
spin_on = { workspace = true }
css-color-parser2 = { workspace = true }
pyo3-stub-gen = { version = "0.9.0", default-features = false }
smol = { version = "2.0.0" }
[package.metadata.maturin]
python-source = "slint"

View file

@ -22,7 +22,7 @@ in detail.
## Installation
Slint can be installed with `uv` or `pip` from the [Python Package Index](https://pypi.org):
Install Slint with `uv` or `pip` from the [Python Package Index](https://pypi.org):
```bash
uv add slint
@ -326,6 +326,22 @@ value = slint.loader.app.MyOption.Variant2
main_window.data = value
```
## Asynchronous I/O
Use Python's [asyncio](https://docs.python.org/3/library/asyncio.html) library to write concurrent Python code with the `async`/`await` syntax.
Slint's event loop is a full-featured [asyncio event loop](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio-event-loop). While
the event loop is running, [`asyncio.get_event_loop()`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop) returns
a valid loop. To run an async function when starting the loop, pass a coroutine to `slint.run_event_loop()`.
For the common use case of interacting with REST APIs, we recommend the [`aiohttp` library](https://docs.aiohttp.org/en/stable/).
### Known Limitations
- Pipes and sub-processes are only supported on Unix-like platforms.
- Exceptions thrown in the coroutine passed to `slint.run_event_loop()` don't cause the loop to terminate. This behaviour may
change in a future release.
## Third-Party Licenses
For a list of the third-party licenses of all dependencies, see the separate [Third-Party Licenses page](thirdparty.html).

View file

@ -0,0 +1,148 @@
// Copyright © SixtyFPS GmbH <info@slint.dev>
// SPDX-License-Identifier: GPL-3.0-only OR LicenseRef-Slint-Royalty-free-2.0 OR LicenseRef-Slint-Software-3.0
use std::rc::Rc;
use pyo3::prelude::*;
use pyo3_stub_gen::{derive::gen_stub_pyclass, derive::gen_stub_pymethods};
#[cfg(unix)]
struct PyFdWrapper(std::os::fd::RawFd);
#[cfg(unix)]
impl std::os::fd::AsFd for PyFdWrapper {
fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
unsafe { std::os::fd::BorrowedFd::borrow_raw(self.0) }
}
}
#[cfg(windows)]
struct PyFdWrapper(#[cfg(windows)] std::os::windows::io::RawSocket);
#[cfg(windows)]
impl std::os::windows::io::AsSocket for PyFdWrapper {
fn as_socket(&self) -> std::os::windows::io::BorrowedSocket<'_> {
unsafe { std::os::windows::io::BorrowedSocket::borrow_raw(self.0) }
}
}
struct AdapterInner {
adapter: smol::Async<PyFdWrapper>,
readable_callback: Option<Py<PyAny>>,
writable_callback: Option<Py<PyAny>>,
}
#[gen_stub_pyclass]
#[pyclass(unsendable)]
pub struct AsyncAdapter {
inner: Option<Rc<AdapterInner>>,
task: Option<slint_interpreter::JoinHandle<()>>,
}
#[gen_stub_pymethods]
#[pymethods]
impl AsyncAdapter {
#[new]
fn py_new(fd: i32) -> Self {
#[cfg(windows)]
let fd = u64::try_from(fd).unwrap();
AsyncAdapter {
inner: Some(Rc::new(AdapterInner {
adapter: smol::Async::new(PyFdWrapper(fd)).unwrap(),
readable_callback: Default::default(),
writable_callback: Default::default(),
})),
task: None,
}
}
fn wait_for_readable(&mut self, callback: Py<PyAny>) {
self.restart_after_mut_inner_access(|inner| {
inner.readable_callback.replace(callback);
});
}
fn wait_for_writable(&mut self, callback: Py<PyAny>) {
self.restart_after_mut_inner_access(|inner| {
inner.writable_callback.replace(callback);
});
}
}
impl AsyncAdapter {
fn restart_after_mut_inner_access(&mut self, callback: impl FnOnce(&mut AdapterInner)) {
if let Some(task) = self.task.take() {
task.abort();
}
// This detaches and basically makes any existing future that might get woke up fail when
// trying to upgrade the weak.
let mut inner = Rc::into_inner(self.inner.take().unwrap()).unwrap();
callback(&mut inner);
let inner = Rc::new(inner);
let inner_weak = Rc::downgrade(&inner);
self.inner = Some(inner);
self.task = Some(
slint_interpreter::spawn_local(std::future::poll_fn(move |cx| loop {
let Some(inner) = inner_weak.upgrade() else {
return std::task::Poll::Ready(());
};
let readable_poll_status: Option<std::task::Poll<Py<PyAny>>> =
inner.readable_callback.as_ref().map(|callback| {
if inner.adapter.poll_readable(cx).is_ready() {
std::task::Poll::Ready(Python::attach(|py| callback.clone_ref(py)))
} else {
std::task::Poll::Pending
}
});
let writable_poll_status: Option<std::task::Poll<Py<PyAny>>> =
inner.writable_callback.as_ref().map(|callback| {
if inner.adapter.poll_writable(cx).is_ready() {
std::task::Poll::Ready(Python::attach(|py| callback.clone_ref(py)))
} else {
std::task::Poll::Pending
}
});
let fd = inner.adapter.get_ref().0;
drop(inner);
if let Some(std::task::Poll::Ready(callback)) = &readable_poll_status {
Python::attach(|py| {
callback.call1(py, (fd,)).expect(
"unexpected failure running python async readable adapter callback",
);
});
}
if let Some(std::task::Poll::Ready(callback)) = &writable_poll_status {
Python::attach(|py| {
callback.call1(py, (fd,)).expect(
"unexpected failure running python async writable adapter callback",
);
});
}
match &readable_poll_status {
Some(std::task::Poll::Ready(..)) => continue, // poll again and then probably return in the next iteration
Some(std::task::Poll::Pending) => return std::task::Poll::Pending, // waker registered, come back later
None => {} // Nothing to poll
}
match &writable_poll_status {
Some(std::task::Poll::Ready(..)) => continue, // poll again and then probably return in the next iteration
Some(std::task::Poll::Pending) => return std::task::Poll::Pending, // waker registered, come back later
None => {} // Nothing to poll
}
return std::task::Poll::Ready(());
}))
.unwrap(),
);
}
}

View file

@ -439,10 +439,6 @@ impl ComponentInstance {
Ok(self.instance.hide()?)
}
fn run(&self) -> Result<(), PyPlatformError> {
Ok(self.instance.run()?)
}
fn __traverse__(&self, visit: PyVisit<'_>) -> Result<(), PyTraverseError> {
self.callbacks.__traverse__(&visit)?;
for global_callbacks in self.global_callbacks.values() {

View file

@ -11,6 +11,7 @@ use interpreter::{
CompilationResult, Compiler, ComponentDefinition, ComponentInstance, PyDiagnostic,
PyDiagnosticLevel, PyValueType,
};
mod async_adapter;
mod brush;
mod errors;
mod models;
@ -42,10 +43,11 @@ thread_local! {
#[gen_stub_pyfunction]
#[pyfunction]
fn run_event_loop() -> Result<(), PyErr> {
fn run_event_loop(py: Python<'_>) -> Result<(), PyErr> {
EVENT_LOOP_EXCEPTION.replace(None);
EVENT_LOOP_RUNNING.set(true);
let result = slint_interpreter::run_event_loop();
// Release the GIL while running the event loop, so that other Python threads can run.
let result = py.detach(|| slint_interpreter::run_event_loop());
EVENT_LOOP_RUNNING.set(false);
result.map_err(|e| errors::PyPlatformError::from(e))?;
EVENT_LOOP_EXCEPTION.take().map_or(Ok(()), |err| Err(err))
@ -100,6 +102,7 @@ fn slint(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<brush::PyBrush>()?;
m.add_class::<models::PyModelBase>()?;
m.add_class::<value::PyStruct>()?;
m.add_class::<async_adapter::AsyncAdapter>()?;
m.add_function(wrap_pyfunction!(run_event_loop, m)?)?;
m.add_function(wrap_pyfunction!(quit_event_loop, m)?)?;
m.add_function(wrap_pyfunction!(set_xdg_app_id, m)?)?;

View file

@ -8,4 +8,4 @@ import nox
def python(session: nox.Session):
session.env["MATURIN_PEP517_ARGS"] = "--profile=dev"
session.install(".[dev]")
session.run("pytest", "-s")
session.run("pytest", "-s", "-v")

View file

@ -37,7 +37,7 @@ Changelog = "https://github.com/slint-ui/slint/blob/master/CHANGELOG.md"
Tracker = "https://github.com/slint-ui/slint/issues"
[project.optional-dependencies]
dev = ["pytest", "numpy>=2.3.2", "pillow>=11.3.0"]
dev = ["pytest", "numpy>=2.3.2", "pillow>=11.3.0", "aiohttp>=3.12.15"]
[dependency-groups]
dev = [
@ -48,6 +48,7 @@ dev = [
"ruff>=0.9.6",
"pillow>=11.3.0",
"numpy>=2.3.2",
"aiohttp>=3.12.15",
]
[tool.uv]

View file

@ -16,7 +16,10 @@ from typing import Any
import pathlib
from .models import ListModel, Model
from .slint import Image, Color, Brush, Timer, TimerMode
from .loop import SlintEventLoop
from pathlib import Path
from collections.abc import Coroutine
import asyncio
Struct = native.PyStruct
@ -52,7 +55,9 @@ class Component:
def run(self) -> None:
"""Shows the window, runs the event loop, hides it when the loop is quit, and returns."""
self.__instance__.run()
self.show()
run_event_loop()
self.hide()
def _normalize_prop(name: str) -> str:
@ -426,6 +431,56 @@ def set_xdg_app_id(app_id: str) -> None:
native.set_xdg_app_id(app_id)
quit_event = asyncio.Event()
def run_event_loop(
main_coro: typing.Optional[Coroutine[None, None, None]] = None,
) -> None:
"""Runs the main Slint event loop. If specified, the coroutine `main_coro` is run in parallel. The event loop doesn't
terminate when the coroutine finishes, it terminates when calling `quit_event_loop()`.
Example:
```python
import slint
...
image_model: slint.ListModel[slint.Image] = slint.ListModel()
...
async def main_receiver(image_model: slint.ListModel) -> None:
async with aiohttp.ClientSession() as session:
async with session.get("http://some.server/svg-image") as response:
svg = await response.read()
image = slint.Image.from_svg_data(svg)
image_model.append(image)
...
slint.run_event_loop(main_receiver(image_model))
```
"""
async def run_inner() -> None:
global quit_event
loop = typing.cast(SlintEventLoop, asyncio.get_event_loop())
if main_coro:
loop.create_task(main_coro)
await quit_event.wait()
global quit_event
quit_event = asyncio.Event()
asyncio.run(run_inner(), debug=False, loop_factory=SlintEventLoop)
def quit_event_loop() -> None:
"""Quits the running event loop in the next event processing cycle. This will make an earlier call to `run_event_loop()`
return."""
global quit_event
quit_event.set()
__all__ = [
"CompileError",
"Component",
@ -440,4 +495,6 @@ __all__ = [
"TimerMode",
"set_xdg_app_id",
"callback",
"run_event_loop",
"quit_event_loop",
]

View file

@ -0,0 +1,244 @@
# Copyright © SixtyFPS GmbH <info@slint.dev>
# SPDX-License-Identifier: GPL-3.0-only OR LicenseRef-Slint-Royalty-free-2.0 OR LicenseRef-Slint-Software-3.0
from . import slint as native
import asyncio.selector_events
import asyncio
import asyncio.events
import selectors
import typing
from collections.abc import Mapping
import datetime
class HasFileno(typing.Protocol):
def fileno(self) -> int: ...
def fd_for_fileobj(fileobj: int | HasFileno) -> int:
if isinstance(fileobj, int):
return fileobj
return int(fileobj.fileno())
class _SlintSelectorMapping(Mapping[typing.Any, selectors.SelectorKey]):
def __init__(self, slint_selector: "_SlintSelector") -> None:
self.slint_selector = slint_selector
def __len__(self) -> int:
return len(self.slint_selector.fd_to_selector_key)
def get(self, fileobj, default=None): # type: ignore
fd = fd_for_fileobj(fileobj)
return self.slint_selector.fd_to_selector_key.get(fd, default)
def __getitem__(self, fileobj: typing.Any) -> selectors.SelectorKey:
fd = fd_for_fileobj(fileobj)
return self.slint_selector.fd_to_selector_key[fd]
def __iter__(self): # type: ignore
return iter(self.slint_selector.fd_to_selector_key)
class _SlintSelector(selectors.BaseSelector):
def __init__(self) -> None:
self.fd_to_selector_key: typing.Dict[typing.Any, selectors.SelectorKey] = {}
self.mapping = _SlintSelectorMapping(self)
self.adapters: typing.Dict[int, native.AsyncAdapter] = {}
def register(
self, fileobj: typing.Any, events: typing.Any, data: typing.Any = None
) -> selectors.SelectorKey:
fd = fd_for_fileobj(fileobj)
key = selectors.SelectorKey(fileobj, fd, events, data)
self.fd_to_selector_key[fd] = key
adapter = native.AsyncAdapter(fd)
self.adapters[fd] = adapter
if events & selectors.EVENT_READ:
adapter.wait_for_readable(self.read_notify)
if events & selectors.EVENT_WRITE:
adapter.wait_for_writable(self.write_notify)
return key
def unregister(self, fileobj: typing.Any) -> selectors.SelectorKey:
fd = fd_for_fileobj(fileobj)
key = self.fd_to_selector_key.pop(fd)
try:
del self.adapters[fd]
except KeyError:
pass
return key
def modify(
self, fileobj: typing.Any, events: int, data: typing.Any = None
) -> selectors.SelectorKey:
fd = fd_for_fileobj(fileobj)
key = self.fd_to_selector_key[fd]
if key.events != events:
self.unregister(fileobj)
key = self.register(fileobj, events, data)
elif key.data != data:
key._replace(data=data)
self.fd_to_selector_key[fd] = key
return key
def select(
self, timeout: float | None = None
) -> typing.List[typing.Tuple[selectors.SelectorKey, int]]:
raise NotImplementedError
def close(self) -> None:
pass
def get_map(self) -> Mapping[int | HasFileno, selectors.SelectorKey]:
return self.mapping
def read_notify(self, fd: int) -> None:
key = self.fd_to_selector_key[fd]
(reader, writer) = key.data
reader._run()
def write_notify(self, fd: int) -> None:
key = self.fd_to_selector_key[fd]
(reader, writer) = key.data
writer._run()
class SlintEventLoop(asyncio.SelectorEventLoop):
def __init__(self) -> None:
self._is_running = False
self._timers: typing.Set[native.Timer] = set()
self.stop_run_forever_event = asyncio.Event()
self._soon_tasks: typing.List[asyncio.TimerHandle] = []
super().__init__(_SlintSelector())
def run_forever(self) -> None:
async def loop_stopper(event: asyncio.Event) -> None:
await event.wait()
native.quit_event_loop()
asyncio.events._set_running_loop(self)
self._is_running = True
try:
self.stop_run_forever_event = asyncio.Event()
self.create_task(loop_stopper(self.stop_run_forever_event))
native.run_event_loop()
finally:
self._is_running = False
asyncio.events._set_running_loop(None)
def run_until_complete[T](self, future: typing.Awaitable[T]) -> T | None: # type: ignore[override]
def stop_loop(future: typing.Any) -> None:
self.stop()
future = asyncio.ensure_future(future, loop=self)
future.add_done_callback(stop_loop)
try:
self.run_forever()
finally:
future.remove_done_callback(stop_loop)
if future.done():
return future.result()
else:
if self.stop_run_forever_event.is_set():
raise RuntimeError("run_until_complete's future isn't done", future)
else:
# If the loop was quit for example because the user closed the last window, then
# don't thrown an error but return a None sentinel. The return value of asyncio.run()
# isn't used by slint.run_event_loop() anyway
# TODO: see if we can properly cancel the future by calling cancel() and throwing
# the task cancellation exception.
return None
def _run_forever_setup(self) -> None:
pass
def _run_forever_cleanup(self) -> None:
pass
def stop(self) -> None:
self.stop_run_forever_event.set()
def is_running(self) -> bool:
return self._is_running
def close(self) -> None:
super().close()
def is_closed(self) -> bool:
return False
def call_later(self, delay, callback, *args, context=None) -> asyncio.TimerHandle: # type: ignore
timer = native.Timer()
handle = asyncio.TimerHandle(
when=self.time() + delay,
callback=callback,
args=args,
loop=self,
context=context,
)
timers = self._timers
def timer_done_cb() -> None:
timers.remove(timer)
if not handle._cancelled:
handle._run()
timer.start(
native.TimerMode.SingleShot,
interval=datetime.timedelta(seconds=delay),
callback=timer_done_cb,
)
timers.add(timer)
return handle
def call_at(self, when, callback, *args, context=None) -> asyncio.TimerHandle: # type: ignore
return self.call_later(when - self.time(), callback, *args, context=context)
def call_soon(self, callback, *args, context=None) -> asyncio.TimerHandle: # type: ignore
# Collect call-soon tasks in a separate list to ensure FIFO order, as there's no guarantee
# that multiple single-shot timers in Slint are run in order.
handle = asyncio.TimerHandle(
when=self.time(), callback=callback, args=args, loop=self, context=context
)
self._soon_tasks.append(handle)
self.call_later(0, self._flush_soon_tasks)
return handle
def _flush_soon_tasks(self) -> None:
tasks_now = self._soon_tasks
self._soon_tasks = []
for handle in tasks_now:
if not handle._cancelled:
handle._run()
def call_soon_threadsafe(self, callback, *args, context=None) -> asyncio.Handle: # type: ignore
handle = asyncio.Handle(
callback=callback,
args=args,
loop=self,
context=context,
)
def run_handle_cb() -> None:
if not handle._cancelled:
handle._run()
native.invoke_from_event_loop(run_handle_cb)
return handle
def _write_to_self(self) -> None:
raise NotImplementedError

View file

@ -10,7 +10,7 @@ import os
import pathlib
import typing
from typing import Any, List
from collections.abc import Callable, Buffer
from collections.abc import Callable, Buffer, Coroutine
from enum import Enum, auto
class RgbColor:
@ -182,7 +182,6 @@ class PyDiagnostic:
class ComponentInstance:
def show(self) -> None: ...
def hide(self) -> None: ...
def run(self) -> None: ...
def invoke(self, callback_name: str, *args: Any) -> Any: ...
def invoke_global(
self, global_name: str, callback_name: str, *args: Any
@ -229,3 +228,11 @@ class Compiler:
def build_from_source(
self, source: str, path: os.PathLike[Any] | pathlib.Path
) -> CompilationResult: ...
class AsyncAdapter:
def __new__(
cls,
fd: int,
) -> "AsyncAdapter": ...
def wait_for_readable(self, callback: typing.Callable[[int], None]) -> None: ...
def wait_for_writable(self, callback: typing.Callable[[int], None]) -> None: ...

View file

@ -0,0 +1,209 @@
# Copyright © SixtyFPS GmbH <info@slint.dev>
# SPDX-License-Identifier: GPL-3.0-only OR LicenseRef-Slint-Royalty-free-2.0 OR LicenseRef-Slint-Software-3.0
import slint
from slint import slint as native
import asyncio
import typing
import aiohttp
from aiohttp import web
import socket
import threading
import pytest
import sys
import platform
def test_async_basic() -> None:
async def quit_soon(call_check: typing.List[bool]) -> None:
await asyncio.sleep(1)
call_check[0] = True
slint.quit_event_loop()
call_check = [False]
slint.run_event_loop(quit_soon(call_check))
assert call_check[0]
def test_async_aiohttp() -> None:
def probe_port() -> int:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 0))
port = typing.cast(int, s.getsockname()[1])
# This is a race condition, but should be good enough for test environments
s.close()
return port
async def hello(request: web.Request) -> web.Response:
return web.Response(text="Hello, world")
async def run_network_requests(
port: int, exceptions: typing.List[Exception]
) -> None:
try:
app = web.Application()
app.add_routes([web.get("/", hello)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", port)
await site.start()
async with aiohttp.ClientSession() as session:
async with session.get(f"http://127.0.0.1:{port}") as response:
#
print("Status:", response.status)
print("Content-type:", response.headers["content-type"])
#
html = await response.text()
print("Body:", html[:15], "...")
assert html == "Hello, world"
await runner.cleanup()
except Exception as e:
exceptions.append(e)
finally:
slint.quit_event_loop()
exceptions: typing.List[Exception] = []
slint.run_event_loop(run_network_requests(probe_port(), exceptions))
assert len(exceptions) == 0
def test_basic_socket() -> None:
def server_thread(server_socket: socket.socket) -> None:
server_socket.listen(1)
conn, _ = server_socket.accept()
try:
data = conn.recv(1024)
if data == b"ping":
conn.sendall(b"pong")
else:
conn.sendall(b"error")
finally:
conn.close()
server_socket.close()
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("127.0.0.1", 0))
port = server_socket.getsockname()[1]
thread = threading.Thread(target=server_thread, args=(server_socket,))
thread.start()
async def run_network_request(port: int) -> None:
reader, writer = await asyncio.open_connection("127.0.0.1", port)
writer.write(b"ping")
await writer.drain()
response = []
while chunk := await reader.read(1024):
response.append(chunk)
writer.close()
await writer.wait_closed()
assert response[0] == b"pong"
slint.quit_event_loop()
slint.run_event_loop(run_network_request(port))
thread.join()
def test_server_socket() -> None:
async def handle_client(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
data = await reader.read(1024)
if data == b"ping":
writer.write(b"pong")
else:
writer.write(b"error")
await writer.drain()
writer.close()
await writer.wait_closed()
async def run_network_request(port: int) -> None:
try:
reader, writer = await asyncio.open_connection("127.0.0.1", port)
writer.write(b"ping")
await writer.drain()
response = []
while chunk := await reader.read(1024):
response.append(chunk)
writer.close()
await writer.wait_closed()
assert response[0] == b"pong"
finally:
slint.quit_event_loop()
async def run_server_and_client(exception_check: typing.List[Exception]) -> None:
try:
server = await asyncio.start_server(handle_client, "127.0.0.1", 0)
port = server.sockets[0].getsockname()[1]
async with server:
await asyncio.gather(
server.serve_forever(),
run_network_request(port),
)
except Exception as e:
exception_check.append(e)
raise
exception_check: typing.List[Exception] = []
slint.run_event_loop(run_server_and_client(exception_check))
if len(exception_check) > 0:
raise exception_check[0]
def test_loop_close_while_main_future_runs() -> None:
def q() -> None:
native.quit_event_loop()
async def never_quit() -> None:
loop = asyncio.get_running_loop()
# Call native.quit_event_loop() directly as if the user closed the last window. We should gracefully
# handle that the future that this function represents isn't terminated.
loop.call_later(0.1, q)
while True:
await asyncio.sleep(1)
try:
slint.run_event_loop(never_quit())
except Exception:
pytest.fail("Should not throw a run-time error")
@pytest.mark.skipif(platform.system() == "Windows", reason="pipes aren't supported yet")
def test_subprocess() -> None:
async def launch_process(exception_check: typing.List[Exception]) -> None:
try:
proc = await asyncio.create_subprocess_exec(
sys.executable,
"--version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout, _ = await proc.communicate()
output = stdout.decode().strip()
print(f"Process output: {output}")
assert proc.returncode == 0
assert output != ""
slint.quit_event_loop()
except Exception as e:
exception_check[0] = e
raise
exception_check: typing.List[Exception] = []
slint.run_event_loop(launch_process(exception_check))
if len(exception_check) > 0:
raise exception_check[0]