mirror of
https://github.com/ZoiteChat/zoitechat.git
synced 2026-03-10 16:00:18 +00:00
Kept the existing safe behavior intact: paths are only added if they exist and are not already in sys.path, so this broadens compatibility without duplicating entries.
615 lines
16 KiB
Python
615 lines
16 KiB
Python
from __future__ import print_function
|
||
|
||
import importlib
|
||
import os
|
||
import pydoc
|
||
import signal
|
||
import sys
|
||
import traceback
|
||
import weakref
|
||
from contextlib import contextmanager
|
||
|
||
from _zoitechat_embedded import ffi, lib
|
||
|
||
if sys.version_info < (3, 0):
|
||
from io import BytesIO as HelpEater
|
||
else:
|
||
from io import StringIO as HelpEater
|
||
|
||
if not hasattr(sys, 'argv'):
|
||
sys.argv = ['<zoitechat>']
|
||
|
||
VERSION = b'2.18.0-pre1' # Sync with zoitechat.__version__
|
||
PLUGIN_NAME = ffi.new('char[]', b'Python')
|
||
PLUGIN_DESC = ffi.new('char[]', b'Python %d.%d scripting interface' % (sys.version_info[0], sys.version_info[1]))
|
||
PLUGIN_VERSION = ffi.new('char[]', VERSION)
|
||
|
||
# TODO: Constants should be screaming snake case
|
||
zoitechat = None
|
||
local_interp = None
|
||
zoitechat_stdout = None
|
||
plugins = set()
|
||
|
||
|
||
@contextmanager
|
||
def redirected_stdout():
|
||
sys.stdout = sys.__stdout__
|
||
sys.stderr = sys.__stderr__
|
||
yield
|
||
sys.stdout = zoitechat_stdout
|
||
sys.stderr = zoitechat_stdout
|
||
|
||
|
||
if os.getenv('ZOITECHAT_LOG_PYTHON'):
|
||
def log(*args):
|
||
with redirected_stdout():
|
||
print(*args)
|
||
|
||
else:
|
||
def log(*args):
|
||
pass
|
||
|
||
|
||
class Stdout:
|
||
def __init__(self):
|
||
self.buffer = bytearray()
|
||
|
||
def write(self, string):
|
||
string = string.encode()
|
||
idx = string.rfind(b'\n')
|
||
if idx != -1:
|
||
self.buffer += string[:idx]
|
||
lib.zoitechat_print(lib.ph, bytes(self.buffer))
|
||
self.buffer = bytearray(string[idx + 1:])
|
||
else:
|
||
self.buffer += string
|
||
|
||
def flush(self):
|
||
lib.zoitechat_print(lib.ph, bytes(self.buffer))
|
||
self.buffer = bytearray()
|
||
|
||
def isatty(self):
|
||
return False
|
||
|
||
|
||
class Attribute:
|
||
def __init__(self):
|
||
self.time = 0
|
||
|
||
def __repr__(self):
|
||
return '<Attribute object at {}>'.format(id(self))
|
||
|
||
|
||
class Hook:
|
||
def __init__(self, plugin, callback, userdata, is_unload):
|
||
self.is_unload = is_unload
|
||
self.plugin = weakref.proxy(plugin)
|
||
self.callback = callback
|
||
self.userdata = userdata
|
||
self.zoitechat_hook = None
|
||
self.handle = ffi.new_handle(weakref.proxy(self))
|
||
|
||
def __del__(self):
|
||
log('Removing hook', id(self))
|
||
if self.is_unload is False:
|
||
assert self.zoitechat_hook is not None
|
||
lib.zoitechat_unhook(lib.ph, self.zoitechat_hook)
|
||
|
||
|
||
if sys.version_info[0] == 2:
|
||
def compile_file(data, filename):
|
||
return compile(data, filename, 'exec', dont_inherit=True)
|
||
|
||
|
||
def compile_line(string):
|
||
try:
|
||
return compile(string, '<string>', 'eval', dont_inherit=True)
|
||
|
||
except SyntaxError:
|
||
# For some reason `print` is invalid for eval
|
||
# This will hide any return value though
|
||
return compile(string, '<string>', 'exec', dont_inherit=True)
|
||
else:
|
||
def compile_file(data, filename):
|
||
return compile(data, filename, 'exec', optimize=2, dont_inherit=True)
|
||
|
||
|
||
def compile_line(string):
|
||
# newline appended to solve unexpected EOF issues
|
||
return compile(string + '\n', '<string>', 'single', optimize=2, dont_inherit=True)
|
||
|
||
|
||
class Plugin:
|
||
def __init__(self):
|
||
self.ph = None
|
||
self.name = ''
|
||
self.filename = ''
|
||
self.version = ''
|
||
self.description = ''
|
||
self.hooks = set()
|
||
self.globals = {
|
||
'__plugin': weakref.proxy(self),
|
||
'__name__': '__main__',
|
||
}
|
||
|
||
def add_hook(self, callback, userdata, is_unload=False):
|
||
hook = Hook(self, callback, userdata, is_unload=is_unload)
|
||
self.hooks.add(hook)
|
||
return hook
|
||
|
||
def remove_hook(self, hook):
|
||
for h in self.hooks:
|
||
if id(h) == hook:
|
||
ud = h.userdata
|
||
self.hooks.remove(h)
|
||
return ud
|
||
|
||
log('Hook not found')
|
||
return None
|
||
|
||
def loadfile(self, filename):
|
||
try:
|
||
self.filename = filename
|
||
with open(filename, 'rb') as f:
|
||
data = f.read().decode('utf-8')
|
||
compiled = compile_file(data, filename)
|
||
exec(compiled, self.globals)
|
||
|
||
try:
|
||
self.name = self.globals['__module_name__']
|
||
|
||
except KeyError:
|
||
lib.zoitechat_print(lib.ph, b'Failed to load module: __module_name__ must be set')
|
||
|
||
return False
|
||
|
||
self.version = self.globals.get('__module_version__', '')
|
||
self.description = self.globals.get('__module_description__', '')
|
||
self.ph = lib.zoitechat_plugingui_add(lib.ph, filename.encode(), self.name.encode(),
|
||
self.description.encode(), self.version.encode(), ffi.NULL)
|
||
|
||
except Exception as e:
|
||
lib.zoitechat_print(lib.ph, 'Failed to load module: {}'.format(e).encode())
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
return True
|
||
|
||
def __del__(self):
|
||
log('unloading', self.filename)
|
||
for hook in self.hooks:
|
||
if hook.is_unload is True:
|
||
try:
|
||
hook.callback(hook.userdata)
|
||
|
||
except Exception as e:
|
||
log('Failed to run hook:', e)
|
||
traceback.print_exc()
|
||
|
||
del self.hooks
|
||
if self.ph is not None:
|
||
lib.zoitechat_plugingui_remove(lib.ph, self.ph)
|
||
|
||
|
||
if sys.version_info[0] == 2:
|
||
def __decode(string):
|
||
return string
|
||
|
||
else:
|
||
def __decode(string):
|
||
return string.decode()
|
||
|
||
|
||
# There can be empty entries between non-empty ones so find the actual last value
|
||
|
||
def _cstr(ptr):
|
||
"""Safely convert a C char* (possibly NULL) to bytes."""
|
||
if ptr == ffi.NULL:
|
||
return b''
|
||
try:
|
||
return ffi.string(ptr)
|
||
except Exception:
|
||
return b''
|
||
|
||
def wordlist_len(words):
|
||
# ZoiteChat passes a fixed-size array (typically 32) where unused entries may be NULL.
|
||
for i in range(31, 0, -1):
|
||
if _cstr(words[i]):
|
||
return i
|
||
return 0
|
||
|
||
|
||
def create_wordlist(words):
|
||
size = wordlist_len(words)
|
||
return [__decode(_cstr(words[i])) for i in range(1, size + 1)]
|
||
|
||
|
||
def create_wordeollist(words):
|
||
words = reversed(words)
|
||
accum = None
|
||
ret = []
|
||
for word in words:
|
||
if accum is None:
|
||
accum = word
|
||
|
||
elif word:
|
||
last = accum
|
||
accum = ' '.join((word, last))
|
||
|
||
ret.insert(0, accum)
|
||
|
||
return ret
|
||
|
||
|
||
def to_cb_ret(value):
|
||
if value is None:
|
||
return 0
|
||
|
||
return int(value)
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_command_hook(word, word_eol, userdata):
|
||
hook = ffi.from_handle(userdata)
|
||
word = create_wordlist(word)
|
||
word_eol = create_wordlist(word_eol)
|
||
return to_cb_ret(hook.callback(word, word_eol, hook.userdata))
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_print_hook(word, userdata):
|
||
hook = ffi.from_handle(userdata)
|
||
word = create_wordlist(word)
|
||
word_eol = create_wordeollist(word)
|
||
return to_cb_ret(hook.callback(word, word_eol, hook.userdata))
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_print_attrs_hook(word, attrs, userdata):
|
||
hook = ffi.from_handle(userdata)
|
||
word = create_wordlist(word)
|
||
word_eol = create_wordeollist(word)
|
||
attr = Attribute()
|
||
attr.time = attrs.server_time_utc
|
||
return to_cb_ret(hook.callback(word, word_eol, hook.userdata, attr))
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_server_hook(word, word_eol, userdata):
|
||
hook = ffi.from_handle(userdata)
|
||
word = create_wordlist(word)
|
||
word_eol = create_wordlist(word_eol)
|
||
return to_cb_ret(hook.callback(word, word_eol, hook.userdata))
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_server_attrs_hook(word, word_eol, attrs, userdata):
|
||
hook = ffi.from_handle(userdata)
|
||
word = create_wordlist(word)
|
||
word_eol = create_wordlist(word_eol)
|
||
attr = Attribute()
|
||
attr.time = attrs.server_time_utc
|
||
return to_cb_ret(hook.callback(word, word_eol, hook.userdata, attr))
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_timer_hook(userdata):
|
||
hook = ffi.from_handle(userdata)
|
||
if hook.callback(hook.userdata) == True:
|
||
return 1
|
||
|
||
try:
|
||
# Avoid calling zoitechat_unhook twice if unnecessary
|
||
hook.is_unload = True
|
||
except ReferenceError:
|
||
# hook is a weak reference, it might have been destroyed by the callback
|
||
# in which case it has already been removed from hook.plugin.hooks and
|
||
# we wouldn't be able to test it with h == hook anyway.
|
||
return 0
|
||
|
||
for h in hook.plugin.hooks:
|
||
if h == hook:
|
||
hook.plugin.hooks.remove(h)
|
||
break
|
||
|
||
return 0
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_say_command(word, word_eol, userdata):
|
||
"""Handle input in the special >>python<< tab.
|
||
|
||
This callback is wired via hook_command(b''), so it may be invoked for a wide range
|
||
of internal commands. It must never throw, and must default to EAT_NONE.
|
||
"""
|
||
try:
|
||
channel = _cstr(lib.zoitechat_get_info(lib.ph, b'channel'))
|
||
except Exception:
|
||
return 0
|
||
|
||
if channel != b'>>python<<':
|
||
return 0
|
||
|
||
try:
|
||
python = _cstr(word_eol[1])
|
||
except Exception:
|
||
python = b''
|
||
|
||
if not python:
|
||
return 1
|
||
|
||
# Don’t let exceptions here swallow core commands or wedge the UI.
|
||
try:
|
||
exec_in_interp(python)
|
||
except Exception:
|
||
# Best effort: surface the traceback in the python tab.
|
||
exc = traceback.format_exc().encode('utf-8', errors='replace')
|
||
lib.zoitechat_print(lib.ph, exc)
|
||
return 1
|
||
|
||
|
||
def load_filename(filename):
|
||
filename = os.path.expanduser(filename)
|
||
if not os.path.isabs(filename):
|
||
configdir = __decode(_cstr(lib.zoitechat_get_info(lib.ph, b'configdir')))
|
||
|
||
filename = os.path.join(configdir, 'addons', filename)
|
||
|
||
if filename and not any(plugin.filename == filename for plugin in plugins):
|
||
plugin = Plugin()
|
||
if plugin.loadfile(filename):
|
||
plugins.add(plugin)
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def unload_name(name):
|
||
if name:
|
||
for plugin in plugins:
|
||
if name in (plugin.name, plugin.filename, os.path.basename(plugin.filename)):
|
||
plugins.remove(plugin)
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def reload_name(name):
|
||
if name:
|
||
for plugin in plugins:
|
||
if name in (plugin.name, plugin.filename, os.path.basename(plugin.filename)):
|
||
filename = plugin.filename
|
||
plugins.remove(plugin)
|
||
return load_filename(filename)
|
||
|
||
return False
|
||
|
||
|
||
@contextmanager
|
||
def change_cwd(path):
|
||
old_cwd = os.getcwd()
|
||
os.chdir(path)
|
||
yield
|
||
os.chdir(old_cwd)
|
||
|
||
|
||
def autoload():
|
||
configdir = __decode(_cstr(lib.zoitechat_get_info(lib.ph, b'configdir')))
|
||
addondir = os.path.join(configdir, 'addons')
|
||
try:
|
||
with change_cwd(addondir): # Maintaining old behavior
|
||
for f in os.listdir(addondir):
|
||
if f.endswith('.py'):
|
||
log('Autoloading', f)
|
||
# TODO: Set cwd
|
||
load_filename(os.path.join(addondir, f))
|
||
|
||
except FileNotFoundError as e:
|
||
log('Autoload failed', e)
|
||
|
||
|
||
def list_plugins():
|
||
if not plugins:
|
||
lib.zoitechat_print(lib.ph, b'No python modules loaded')
|
||
return
|
||
|
||
tbl_headers = [b'Name', b'Version', b'Filename', b'Description']
|
||
tbl = [
|
||
tbl_headers,
|
||
[(b'-' * len(s)) for s in tbl_headers]
|
||
]
|
||
|
||
for plugin in plugins:
|
||
basename = os.path.basename(plugin.filename).encode()
|
||
name = plugin.name.encode()
|
||
version = plugin.version.encode() if plugin.version else b'<none>'
|
||
description = plugin.description.encode() if plugin.description else b'<none>'
|
||
tbl.append((name, version, basename, description))
|
||
|
||
column_sizes = [
|
||
max(len(item) for item in column)
|
||
for column in zip(*tbl)
|
||
]
|
||
|
||
for row in tbl:
|
||
lib.zoitechat_print(lib.ph, b' '.join(item.ljust(column_sizes[i])
|
||
for i, item in enumerate(row)))
|
||
lib.zoitechat_print(lib.ph, b'')
|
||
|
||
|
||
def exec_in_interp(python):
|
||
global local_interp
|
||
|
||
if not python:
|
||
return
|
||
|
||
if local_interp is None:
|
||
local_interp = Plugin()
|
||
local_interp.locals = {}
|
||
local_interp.globals['zoitechat'] = zoitechat
|
||
|
||
code = compile_line(python)
|
||
try:
|
||
ret = eval(code, local_interp.globals, local_interp.locals)
|
||
if ret is not None:
|
||
lib.zoitechat_print(lib.ph, '{}'.format(ret).encode())
|
||
|
||
except Exception as e:
|
||
traceback.print_exc(file=zoitechat_stdout)
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_load_command(word, word_eol, userdata):
|
||
filename = _cstr(word[2])
|
||
if filename.endswith(b'.py'):
|
||
load_filename(__decode(filename))
|
||
return 3
|
||
|
||
return 0
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_unload_command(word, word_eol, userdata):
|
||
filename = _cstr(word[2])
|
||
if filename.endswith(b'.py'):
|
||
unload_name(__decode(filename))
|
||
return 3
|
||
|
||
return 0
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_reload_command(word, word_eol, userdata):
|
||
filename = _cstr(word[2])
|
||
if filename.endswith(b'.py'):
|
||
reload_name(__decode(filename))
|
||
return 3
|
||
|
||
return 0
|
||
|
||
|
||
@ffi.def_extern(error=3)
|
||
def _on_py_command(word, word_eol, userdata):
|
||
subcmd = __decode(ffi.string(word[2])).lower()
|
||
|
||
if subcmd == 'exec':
|
||
python = __decode(ffi.string(word_eol[3]))
|
||
exec_in_interp(python)
|
||
|
||
elif subcmd == 'load':
|
||
filename = __decode(ffi.string(word[3]))
|
||
load_filename(filename)
|
||
|
||
elif subcmd == 'unload':
|
||
name = __decode(ffi.string(word[3]))
|
||
if not unload_name(name):
|
||
lib.zoitechat_print(lib.ph, b'Can\'t find a python plugin with that name')
|
||
|
||
elif subcmd == 'reload':
|
||
name = __decode(ffi.string(word[3]))
|
||
if not reload_name(name):
|
||
lib.zoitechat_print(lib.ph, b'Can\'t find a python plugin with that name')
|
||
|
||
elif subcmd == 'console':
|
||
lib.zoitechat_command(lib.ph, b'QUERY >>python<<')
|
||
|
||
elif subcmd == 'list':
|
||
list_plugins()
|
||
|
||
elif subcmd == 'about':
|
||
lib.zoitechat_print(lib.ph, b'ZoiteChat Python interface version ' + VERSION)
|
||
|
||
else:
|
||
lib.zoitechat_command(lib.ph, b'HELP PY')
|
||
|
||
return 3
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_plugin_init(plugin_name, plugin_desc, plugin_version, arg, libdir):
|
||
global zoitechat
|
||
global zoitechat_stdout
|
||
|
||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||
|
||
plugin_name[0] = PLUGIN_NAME
|
||
plugin_desc[0] = PLUGIN_DESC
|
||
plugin_version[0] = PLUGIN_VERSION
|
||
|
||
try:
|
||
libdir = __decode(_cstr(libdir))
|
||
modpaths = [
|
||
os.path.abspath(os.path.join(libdir, '..', 'python')),
|
||
os.path.abspath(os.path.join(libdir, 'python')),
|
||
]
|
||
|
||
appdir = os.getenv('APPDIR')
|
||
if appdir:
|
||
modpaths.extend([
|
||
os.path.join(appdir, 'usr', 'lib', 'zoitechat', 'python'),
|
||
os.path.join(appdir, 'usr', 'lib', 'x86_64-linux-gnu', 'zoitechat', 'python'),
|
||
])
|
||
|
||
if os.getenv('FLATPAK_ID'):
|
||
modpaths.extend([
|
||
'/app/lib/zoitechat/python',
|
||
'/app/lib/x86_64-linux-gnu/zoitechat/python',
|
||
])
|
||
|
||
for modpath in modpaths:
|
||
if os.path.isdir(modpath) and modpath not in sys.path:
|
||
sys.path.append(modpath)
|
||
|
||
zoitechat = importlib.import_module('zoitechat')
|
||
|
||
except (UnicodeDecodeError, ImportError) as e:
|
||
lib.zoitechat_print(lib.ph, b'Failed to import module: ' + repr(e).encode())
|
||
|
||
return 0
|
||
|
||
zoitechat_stdout = Stdout()
|
||
sys.stdout = zoitechat_stdout
|
||
sys.stderr = zoitechat_stdout
|
||
pydoc.help = pydoc.Helper(HelpEater(), HelpEater())
|
||
|
||
lib.zoitechat_hook_command(lib.ph, b'', 0, lib._on_say_command, ffi.NULL, ffi.NULL)
|
||
lib.zoitechat_hook_command(lib.ph, b'LOAD', 0, lib._on_load_command, ffi.NULL, ffi.NULL)
|
||
lib.zoitechat_hook_command(lib.ph, b'UNLOAD', 0, lib._on_unload_command, ffi.NULL, ffi.NULL)
|
||
lib.zoitechat_hook_command(lib.ph, b'RELOAD', 0, lib._on_reload_command, ffi.NULL, ffi.NULL)
|
||
lib.zoitechat_hook_command(lib.ph, b'PY', 0, lib._on_py_command, b'''Usage: /PY LOAD <filename>
|
||
UNLOAD <filename|name>
|
||
RELOAD <filename|name>
|
||
LIST
|
||
EXEC <command>
|
||
CONSOLE
|
||
ABOUT''', ffi.NULL)
|
||
|
||
lib.zoitechat_print(lib.ph, b'Python interface loaded')
|
||
autoload()
|
||
return 1
|
||
|
||
|
||
@ffi.def_extern()
|
||
def _on_plugin_deinit():
|
||
global local_interp
|
||
global zoitechat
|
||
global zoitechat_stdout
|
||
global plugins
|
||
|
||
plugins = set()
|
||
local_interp = None
|
||
zoitechat = None
|
||
zoitechat_stdout = None
|
||
sys.stdout = sys.__stdout__
|
||
sys.stderr = sys.__stderr__
|
||
pydoc.help = pydoc.Helper()
|
||
|
||
for mod in ('_zoitechat', 'zoitechat', 'xchat', '_zoitechat_embedded'):
|
||
try:
|
||
del sys.modules[mod]
|
||
|
||
except KeyError:
|
||
pass
|
||
|
||
return 1
|