433 lines
16 KiB
Python
433 lines
16 KiB
Python
from builtins import range
|
|
import traceback
|
|
from io import open
|
|
from typing import Union, Any, Dict, List, Set, Tuple, Optional, Callable, TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from eel.types import OptionsDictT, WebSocketT
|
|
else:
|
|
WebSocketT = Any
|
|
OptionsDictT = Any
|
|
|
|
from gevent.threading import Timer
|
|
import gevent as gvt
|
|
import json as jsn
|
|
import bottle as btl
|
|
try:
|
|
import bottle_websocket as wbs
|
|
except ImportError:
|
|
import bottle.ext.websocket as wbs
|
|
import re as rgx
|
|
import os
|
|
import eel.browsers as brw
|
|
import pyparsing as pp
|
|
import random as rnd
|
|
import sys
|
|
import pkg_resources as pkg
|
|
import socket
|
|
import mimetypes
|
|
|
|
|
|
mimetypes.add_type('application/javascript', '.js')
|
|
_eel_js_file: str = pkg.resource_filename('eel', 'eel.js')
|
|
_eel_js: str = open(_eel_js_file, encoding='utf-8').read()
|
|
_websockets: List[Tuple[Any, WebSocketT]] = []
|
|
_call_return_values: Dict[Any, Any] = {}
|
|
_call_return_callbacks: Dict[float, Tuple[Callable[..., Any], Optional[Callable[..., Any]]]] = {}
|
|
_call_number: int = 0
|
|
_exposed_functions: Dict[Any, Any] = {}
|
|
_js_functions: List[Any] = []
|
|
_mock_queue: List[Any] = []
|
|
_mock_queue_done: Set[Any] = set()
|
|
_shutdown: Optional[gvt.Greenlet] = None # Later assigned as global by _websocket_close()
|
|
root_path: str # Later assigned as global by init()
|
|
|
|
# The maximum time (in milliseconds) that Python will try to retrieve a return value for functions executing in JS
|
|
# Can be overridden through `eel.init` with the kwarg `js_result_timeout` (default: 10000)
|
|
_js_result_timeout: int = 10000
|
|
|
|
# All start() options must provide a default value and explanation here
|
|
_start_args: OptionsDictT = {
|
|
'mode': 'chrome', # What browser is used
|
|
'host': 'localhost', # Hostname use for Bottle server
|
|
'port': 8000, # Port used for Bottle server (use 0 for auto)
|
|
'block': True, # Whether start() blocks calling thread
|
|
'jinja_templates': None, # Folder for jinja2 templates
|
|
'cmdline_args': ['--disable-http-cache'], # Extra cmdline flags to pass to browser start
|
|
'size': None, # (width, height) of main window
|
|
'position': None, # (left, top) of main window
|
|
'geometry': {}, # Dictionary of size/position for all windows
|
|
'close_callback': None, # Callback for when all windows have closed
|
|
'app_mode': True, # (Chrome specific option)
|
|
'all_interfaces': False, # Allow bottle server to listen for connections on all interfaces
|
|
'disable_cache': True, # Sets the no-store response header when serving assets
|
|
'default_path': 'index.html', # The default file to retrieve for the root URL
|
|
'app': btl.default_app(), # Allows passing in a custom Bottle instance, e.g. with middleware
|
|
'shutdown_delay': 1.0 # how long to wait after a websocket closes before detecting complete shutdown
|
|
}
|
|
|
|
# == Temporary (suppressible) error message to inform users of breaking API change for v1.0.0 ===
|
|
_start_args['suppress_error'] = False
|
|
api_error_message: str = '''
|
|
----------------------------------------------------------------------------------
|
|
'options' argument deprecated in v1.0.0, see https://github.com/ChrisKnott/Eel
|
|
To suppress this error, add 'suppress_error=True' to start() call.
|
|
This option will be removed in future versions
|
|
----------------------------------------------------------------------------------
|
|
'''
|
|
# ===============================================================================================
|
|
|
|
# Public functions
|
|
|
|
def expose(name_or_function: Optional[Callable[..., Any]] = None) -> Callable[..., Any]:
|
|
# Deal with '@eel.expose()' - treat as '@eel.expose'
|
|
if name_or_function is None:
|
|
return expose
|
|
|
|
if isinstance(name_or_function, str): # Called as '@eel.expose("my_name")'
|
|
name = name_or_function
|
|
|
|
def decorator(function: Callable[..., Any]) -> Any:
|
|
_expose(name, function)
|
|
return function
|
|
return decorator
|
|
else:
|
|
function = name_or_function
|
|
_expose(function.__name__, function)
|
|
return function
|
|
|
|
|
|
# PyParsing grammar for parsing exposed functions in JavaScript code
|
|
# Examples: `eel.expose(w, "func_name")`, `eel.expose(func_name)`, `eel.expose((function (e){}), "func_name")`
|
|
EXPOSED_JS_FUNCTIONS: pp.ZeroOrMore = pp.ZeroOrMore(
|
|
pp.Suppress(
|
|
pp.SkipTo(pp.Literal('eel.expose('))
|
|
+ pp.Literal('eel.expose(')
|
|
+ pp.Optional(
|
|
pp.Or([pp.nestedExpr(), pp.Word(pp.printables, excludeChars=',')]) + pp.Literal(',')
|
|
)
|
|
)
|
|
+ pp.Suppress(pp.Regex(r'["\']?'))
|
|
+ pp.Word(pp.printables, excludeChars='"\')')
|
|
+ pp.Suppress(pp.Regex(r'["\']?\s*\)')),
|
|
)
|
|
|
|
|
|
def init(path: str, allowed_extensions: List[str] = ['.js', '.html', '.txt', '.htm',
|
|
'.xhtml', '.vue'], js_result_timeout: int = 10000) -> None:
|
|
global root_path, _js_functions, _js_result_timeout
|
|
root_path = _get_real_path(path)
|
|
|
|
js_functions = set()
|
|
for root, _, files in os.walk(root_path):
|
|
for name in files:
|
|
if not any(name.endswith(ext) for ext in allowed_extensions):
|
|
continue
|
|
|
|
try:
|
|
with open(os.path.join(root, name), encoding='utf-8') as file:
|
|
contents = file.read()
|
|
expose_calls = set()
|
|
matches = EXPOSED_JS_FUNCTIONS.parseString(contents).asList()
|
|
for expose_call in matches:
|
|
# Verify that function name is valid
|
|
msg = "eel.expose() call contains '(' or '='"
|
|
assert rgx.findall(r'[\(=]', expose_call) == [], msg
|
|
expose_calls.add(expose_call)
|
|
js_functions.update(expose_calls)
|
|
except UnicodeDecodeError:
|
|
pass # Malformed file probably
|
|
|
|
_js_functions = list(js_functions)
|
|
for js_function in _js_functions:
|
|
_mock_js_function(js_function)
|
|
|
|
_js_result_timeout = js_result_timeout
|
|
|
|
|
|
def start(*start_urls: str, **kwargs: Any) -> None:
|
|
_start_args.update(kwargs)
|
|
|
|
if 'options' in kwargs:
|
|
if _start_args['suppress_error']:
|
|
_start_args.update(kwargs['options'])
|
|
else:
|
|
raise RuntimeError(api_error_message)
|
|
|
|
if _start_args['port'] == 0:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.bind(('localhost', 0))
|
|
_start_args['port'] = sock.getsockname()[1]
|
|
sock.close()
|
|
|
|
if _start_args['jinja_templates'] != None:
|
|
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
|
if not isinstance(_start_args['jinja_templates'], str):
|
|
raise TypeError("'jinja_templates start_arg/option must be of type str'")
|
|
templates_path = os.path.join(root_path, _start_args['jinja_templates'])
|
|
_start_args['jinja_env'] = Environment(loader=FileSystemLoader(templates_path),
|
|
autoescape=select_autoescape(['html', 'xml']))
|
|
|
|
# verify shutdown_delay is correct value
|
|
if not isinstance(_start_args['shutdown_delay'], (int, float)):
|
|
raise ValueError("`shutdown_delay` must be a number, "\
|
|
"got a {}".format(type(_start_args['shutdown_delay'])))
|
|
|
|
# Launch the browser to the starting URLs
|
|
show(*start_urls)
|
|
|
|
def run_lambda() -> None:
|
|
if _start_args['all_interfaces'] == True:
|
|
HOST = '0.0.0.0'
|
|
else:
|
|
if not isinstance(_start_args['host'], str):
|
|
raise TypeError("'host' start_arg/option must be of type str")
|
|
HOST = _start_args['host']
|
|
|
|
app = _start_args['app']
|
|
|
|
if isinstance(app, btl.Bottle):
|
|
register_eel_routes(app)
|
|
else:
|
|
register_eel_routes(btl.default_app())
|
|
|
|
btl.run(
|
|
host=HOST,
|
|
port=_start_args['port'],
|
|
server=wbs.GeventWebSocketServer,
|
|
quiet=True,
|
|
app=app) # Always returns None
|
|
|
|
# Start the webserver
|
|
if _start_args['block']:
|
|
run_lambda()
|
|
else:
|
|
spawn(run_lambda)
|
|
|
|
|
|
def show(*start_urls: str) -> None:
|
|
brw.open(list(start_urls), _start_args)
|
|
|
|
|
|
def sleep(seconds: Union[int, float]) -> None:
|
|
gvt.sleep(seconds)
|
|
|
|
|
|
def spawn(function: Callable[..., Any], *args: Any, **kwargs: Any) -> gvt.Greenlet:
|
|
return gvt.spawn(function, *args, **kwargs)
|
|
|
|
# Bottle Routes
|
|
|
|
def _eel() -> str:
|
|
start_geometry = {'default': {'size': _start_args['size'],
|
|
'position': _start_args['position']},
|
|
'pages': _start_args['geometry']}
|
|
|
|
page = _eel_js.replace('/** _py_functions **/',
|
|
'_py_functions: %s,' % list(_exposed_functions.keys()))
|
|
page = page.replace('/** _start_geometry **/',
|
|
'_start_geometry: %s,' % _safe_json(start_geometry))
|
|
btl.response.content_type = 'application/javascript'
|
|
_set_response_headers(btl.response)
|
|
return page
|
|
|
|
def _root() -> Optional[btl.Response]:
|
|
if not isinstance(_start_args['default_path'], str):
|
|
raise TypeError("'default_path' start_arg/option must be of type str")
|
|
return _static(_start_args['default_path'])
|
|
|
|
def _static(path: str) -> Optional[btl.Response]:
|
|
response = None
|
|
if 'jinja_env' in _start_args and 'jinja_templates' in _start_args:
|
|
if not isinstance(_start_args['jinja_templates'], str):
|
|
raise TypeError("'jinja_templates' start_arg/option must be of type str")
|
|
template_prefix = _start_args['jinja_templates'] + '/'
|
|
if path.startswith(template_prefix):
|
|
n = len(template_prefix)
|
|
template = _start_args['jinja_env'].get_template(path[n:]) # type: ignore # depends on conditional import in start()
|
|
response = btl.HTTPResponse(template.render())
|
|
|
|
if response is None:
|
|
response = btl.static_file(path, root=root_path)
|
|
|
|
_set_response_headers(response)
|
|
return response
|
|
|
|
def _websocket(ws: WebSocketT) -> None:
|
|
global _websockets
|
|
|
|
for js_function in _js_functions:
|
|
_import_js_function(js_function)
|
|
|
|
page = btl.request.query.page
|
|
if page not in _mock_queue_done:
|
|
for call in _mock_queue:
|
|
_repeated_send(ws, _safe_json(call))
|
|
_mock_queue_done.add(page)
|
|
|
|
_websockets += [(page, ws)]
|
|
|
|
while True:
|
|
msg = ws.receive()
|
|
if msg is not None:
|
|
message = jsn.loads(msg)
|
|
spawn(_process_message, message, ws)
|
|
else:
|
|
_websockets.remove((page, ws))
|
|
break
|
|
|
|
_websocket_close(page)
|
|
|
|
|
|
BOTTLE_ROUTES: Dict[str, Tuple[Callable[..., Any], Dict[Any, Any]]] = {
|
|
"/eel.js": (_eel, dict()),
|
|
"/": (_root, dict()),
|
|
"/<path:path>": (_static, dict()),
|
|
"/eel": (_websocket, dict(apply=[wbs.websocket]))
|
|
}
|
|
|
|
def register_eel_routes(app: btl.Bottle) -> None:
|
|
'''
|
|
Adds eel routes to `app`. Only needed if you are passing something besides `bottle.Bottle` to `eel.start()`.
|
|
Ex:
|
|
app = bottle.Bottle()
|
|
eel.register_eel_routes(app)
|
|
middleware = beaker.middleware.SessionMiddleware(app)
|
|
eel.start(app=middleware)
|
|
'''
|
|
for route_path, route_params in BOTTLE_ROUTES.items():
|
|
route_func, route_kwargs = route_params
|
|
app.route(path=route_path, callback=route_func, **route_kwargs)
|
|
|
|
# Private functions
|
|
|
|
def _safe_json(obj: Any) -> str:
|
|
return jsn.dumps(obj, default=lambda o: None)
|
|
|
|
|
|
def _repeated_send(ws: WebSocketT, msg: str) -> None:
|
|
for attempt in range(100):
|
|
try:
|
|
ws.send(msg)
|
|
break
|
|
except Exception:
|
|
sleep(0.001)
|
|
|
|
|
|
def _process_message(message: Dict[str, Any], ws: WebSocketT) -> None:
|
|
if 'call' in message:
|
|
error_info = {}
|
|
try:
|
|
return_val = _exposed_functions[message['name']](*message['args'])
|
|
status = 'ok'
|
|
except Exception as e:
|
|
err_traceback = traceback.format_exc()
|
|
traceback.print_exc()
|
|
return_val = None
|
|
status = 'error'
|
|
error_info['errorText'] = repr(e)
|
|
error_info['errorTraceback'] = err_traceback
|
|
_repeated_send(ws, _safe_json({ 'return': message['call'],
|
|
'status': status,
|
|
'value': return_val,
|
|
'error': error_info,}))
|
|
elif 'return' in message:
|
|
call_id = message['return']
|
|
if call_id in _call_return_callbacks:
|
|
callback, error_callback = _call_return_callbacks.pop(call_id)
|
|
if message['status'] == 'ok':
|
|
callback(message['value'])
|
|
elif message['status'] == 'error' and error_callback is not None:
|
|
error_callback(message['error'], message['stack'])
|
|
else:
|
|
_call_return_values[call_id] = message['value']
|
|
|
|
else:
|
|
print('Invalid message received: ', message)
|
|
|
|
|
|
def _get_real_path(path: str) -> str:
|
|
if getattr(sys, 'frozen', False):
|
|
return os.path.join(sys._MEIPASS, path) # type: ignore # sys._MEIPASS is dynamically added by PyInstaller
|
|
else:
|
|
return os.path.abspath(path)
|
|
|
|
|
|
def _mock_js_function(f: str) -> None:
|
|
exec('%s = lambda *args: _mock_call("%s", args)' % (f, f), globals())
|
|
|
|
|
|
def _import_js_function(f: str) -> None:
|
|
exec('%s = lambda *args: _js_call("%s", args)' % (f, f), globals())
|
|
|
|
|
|
def _call_object(name: str, args: Any) -> Dict[str, Any]:
|
|
global _call_number
|
|
_call_number += 1
|
|
call_id = _call_number + rnd.random()
|
|
return {'call': call_id, 'name': name, 'args': args}
|
|
|
|
|
|
def _mock_call(name: str, args: Any) -> Callable[[Optional[Callable[..., Any]], Optional[Callable[..., Any]]], Any]:
|
|
call_object = _call_object(name, args)
|
|
global _mock_queue
|
|
_mock_queue += [call_object]
|
|
return _call_return(call_object)
|
|
|
|
|
|
def _js_call(name: str, args: Any) -> Callable[[Optional[Callable[..., Any]], Optional[Callable[..., Any]]], Any]:
|
|
call_object = _call_object(name, args)
|
|
for _, ws in _websockets:
|
|
_repeated_send(ws, _safe_json(call_object))
|
|
return _call_return(call_object)
|
|
|
|
|
|
def _call_return(call: Dict[str, Any]) -> Callable[[Optional[Callable[..., Any]], Optional[Callable[..., Any]]], Any]:
|
|
global _js_result_timeout
|
|
call_id = call['call']
|
|
|
|
def return_func(callback: Optional[Callable[..., Any]] = None,
|
|
error_callback: Optional[Callable[..., Any]] = None) -> Any:
|
|
if callback is not None:
|
|
_call_return_callbacks[call_id] = (callback, error_callback)
|
|
else:
|
|
for w in range(_js_result_timeout):
|
|
if call_id in _call_return_values:
|
|
return _call_return_values.pop(call_id)
|
|
sleep(0.001)
|
|
return return_func
|
|
|
|
|
|
def _expose(name: str, function: Callable[..., Any]) -> None:
|
|
msg = 'Already exposed function with name "%s"' % name
|
|
assert name not in _exposed_functions, msg
|
|
_exposed_functions[name] = function
|
|
|
|
|
|
def _detect_shutdown() -> None:
|
|
if len(_websockets) == 0:
|
|
sys.exit()
|
|
|
|
|
|
def _websocket_close(page: str) -> None:
|
|
global _shutdown
|
|
|
|
close_callback = _start_args.get('close_callback')
|
|
|
|
if close_callback is not None:
|
|
if not callable(close_callback):
|
|
raise TypeError("'close_callback' start_arg/option must be callable or None")
|
|
sockets = [p for _, p in _websockets]
|
|
close_callback(page, sockets)
|
|
else:
|
|
if isinstance(_shutdown, gvt.Greenlet):
|
|
_shutdown.kill()
|
|
|
|
_shutdown = gvt.spawn_later(_start_args['shutdown_delay'], _detect_shutdown)
|
|
|
|
|
|
def _set_response_headers(response: btl.Response) -> None:
|
|
if _start_args['disable_cache']:
|
|
# https://stackoverflow.com/a/24748094/280852
|
|
response.set_header('Cache-Control', 'no-store')
|