poezio/poezio/plugin_manager.py
Maxime “pep” Buquet 833cccb188 Rework priority handling for events
Introduce the concept of priority for event handlers, instead of the
position parameter.

The new `priority` parameter replacing `position` should be an integer
between 0 and 100. It defaults to 50.

The previous `position` parameter was only used to insert at a certain
position in the list of handlers (for this particular event). No
reference of it was kept, which means that it was not possible to ensure
an event was called in the position is was supposed to be.

I am now using per-event dicts, containing priority buckets (lists) of
handlers. I am using OrderedDicts to make it easier to loop through all
of the handlers, as insertion happens less often than reading.

I was also suggested using bisect with a simple list of tuples
(priority, handler) per event, but bisect tries to compare bound
methods, which is obviously not possible. Maybe it would be interesting
to find a way use this method instead of OrderedDict as that might be
less resource consuming.

This said, I don't think this part of poezio is a bottleneck at all, so
maybe this is just fine as is.

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2019-06-22 03:01:02 +02:00

397 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Plugin manager module.
Define the PluginManager class, the one that glues all the plugins and
the API together. Defines also a bunch of variables related to the
plugin env.
"""
import os
from os import path
from pathlib import Path
import logging
from poezio import tabs, xdg
from poezio.core.structs import Command, Completion
from poezio.plugin import PluginAPI
from poezio.config import config
log = logging.getLogger(__name__)
class PluginManager:
"""
Plugin Manager
Contains all the references to the plugins
And keeps track of everything the plugin has done through the API.
"""
def __init__(self, core):
self.core = core
# module name -> module object
self.modules = {}
# module name -> plugin object
self.plugins = {}
# module name -> dict of commands loaded for the module
self.commands = {}
# module name -> list of event_name/handler pairs loaded for the module
self.event_handlers = {}
# module name -> dict of tab types; tab type -> commands
# loaded by the module
self.tab_commands = {}
# module name → dict of keys/handlers loaded for the module
self.keys = {}
# module name → dict of tab types; tab type → list of keybinds (tuples)
self.tab_keys = {}
self.roster_elements = {}
from importlib import machinery
self.finder = machinery.PathFinder()
self.initial_set_plugins_dir()
self.initial_set_plugins_conf_dir()
self.fill_load_path()
self.plugin_api = PluginAPI(core, self)
def disable_plugins(self):
for plugin in set(self.plugins.keys()):
self.unload(plugin, notify=False)
def load(self, name, notify=True):
"""
Load a plugin.
"""
if name in self.plugins:
self.unload(name)
try:
module = None
loader = self.finder.find_module(name, self.load_path)
if not loader:
self.core.information('Could not find plugin: %s' % name,
'Error')
return
module = loader.load_module()
except Exception as e:
log.debug("Could not load plugin %s", name, exc_info=True)
self.core.information("Could not load plugin %s: %s" % (name, e),
'Error')
finally:
if not module:
return
self.modules[name] = module
self.commands[name] = {}
self.keys[name] = {}
self.tab_keys[name] = {}
self.tab_commands[name] = {}
self.event_handlers[name] = []
try:
self.plugins[name] = None
self.plugins[name] = module.Plugin(self.plugin_api, self.core,
self.plugins_conf_dir)
except Exception as e:
log.error('Error while loading the plugin %s', name, exc_info=True)
if notify:
self.core.information(
'Unable to load the plugin %s: %s' % (name, e), 'Error')
self.unload(name, notify=False)
else:
if notify:
self.core.information('Plugin %s loaded' % name, 'Info')
def unload(self, name, notify=True):
if name in self.plugins:
try:
for command in self.commands[name].keys():
del self.core.commands[command]
for key in self.keys[name].keys():
del self.core.key_func[key]
for tab in list(self.tab_commands[name].keys()):
for command in self.tab_commands[name][tab][:]:
self.del_tab_command(name, getattr(tabs, tab),
command[0])
del self.tab_commands[name][tab]
for tab in list(self.tab_keys[name].keys()):
for key in self.tab_keys[name][tab][:]:
self.del_tab_key(name, getattr(tabs, tab), key[0])
del self.tab_keys[name][tab]
for event_name, handler in self.event_handlers[name][:]:
self.del_event_handler(name, event_name, handler)
if self.plugins[name] is not None:
self.plugins[name].unload()
del self.plugins[name]
del self.commands[name]
del self.keys[name]
del self.tab_commands[name]
del self.event_handlers[name]
if notify:
self.core.information('Plugin %s unloaded' % name, 'Info')
except Exception as e:
log.debug("Could not unload plugin %s", name, exc_info=True)
self.core.information(
"Could not unload plugin %s: %s" % (name, e), 'Error')
def add_command(self,
module_name,
name,
handler,
help,
completion=None,
short='',
usage=''):
"""
Add a global command.
"""
if name in self.core.commands:
raise Exception("Command '%s' already exists" % (name, ))
commands = self.commands[module_name]
commands[name] = Command(handler, help, completion, short, usage)
self.core.commands[name] = commands[name]
def del_command(self, module_name, name):
"""
Remove a global command added through add_command.
"""
if name in self.commands[module_name]:
del self.commands[module_name][name]
if name in self.core.commands:
del self.core.commands[name]
def add_tab_command(self,
module_name,
tab_type,
name,
handler,
help,
completion=None,
short='',
usage=''):
"""
Add a command only for a type of Tab.
"""
commands = self.tab_commands[module_name]
t = tab_type.__name__
if name in tab_type.plugin_commands:
return
if t not in commands:
commands[t] = []
commands[t].append((name, handler, help, completion))
tab_type.plugin_commands[name] = Command(handler, help, completion,
short, usage)
for tab in self.core.tabs:
if isinstance(tab, tab_type):
tab.update_commands()
def del_tab_command(self, module_name, tab_type, name):
"""
Remove a command added through add_tab_command.
"""
commands = self.tab_commands[module_name]
t = tab_type.__name__
if t not in commands:
return
for command in commands[t]:
if command[0] == name:
commands[t].remove(command)
del tab_type.plugin_commands[name]
for tab in self.core.tabs:
if isinstance(tab, tab_type) and name in tab.commands:
del tab.commands[name]
def add_tab_key(self, module_name, tab_type, key, handler):
"""
Associate a key binding to a handler only for a type of Tab.
"""
keys = self.tab_keys[module_name]
t = tab_type.__name__
if key in tab_type.plugin_keys:
return
if t not in keys:
keys[t] = []
keys[t].append((key, handler))
tab_type.plugin_keys[key] = handler
for tab in self.core.tabs:
if isinstance(tab, tab_type):
tab.update_keys()
def del_tab_key(self, module_name, tab_type, key):
"""
Remove a key binding added through add_tab_key.
"""
keys = self.tab_keys[module_name]
t = tab_type.__name__
if t not in keys:
return
for _key in keys[t]:
if _key[0] == key:
keys[t].remove(_key)
del tab_type.plugin_keys[key]
for tab in self.core.tabs:
if isinstance(tab, tab_type) and key in tab.key_func:
del tab.key_func[key]
def add_key(self, module_name, key, handler):
"""
Associate a global key binding to a handler, except if it
already exists.
"""
if key in self.core.key_func:
raise Exception("Key '%s' already exists" % (key, ))
keys = self.keys[module_name]
keys[key] = handler
self.core.key_func[key] = handler
def del_key(self, module_name, key):
"""
Remove a global key binding added by a plugin.
"""
if key in self.keys[module_name]:
del self.keys[module_name][key]
if key in self.core.key_func:
del self.core.commands[key]
def add_event_handler(self, module_name, event_name, handler, *args, **kwargs):
"""
Add an event handler. If event_name isnt in the event list, assume
it is a slixmpp event.
"""
eh = self.event_handlers[module_name]
eh.append((event_name, handler))
if event_name in self.core.events.events:
self.core.events.add_event_handler(event_name, handler, *args, **kwargs)
else:
self.core.xmpp.add_event_handler(event_name, handler)
def del_event_handler(self, module_name, event_name, handler):
"""
Remove an event handler if it exists.
"""
if event_name in self.core.events.events:
self.core.events.del_event_handler(None, handler)
else:
self.core.xmpp.del_event_handler(event_name, handler)
eh = self.event_handlers[module_name]
eh = [e for e in eh if e != (event_name, handler)]
def completion_load(self, the_input):
"""
completion function that completes the name of the plugins, from
all .py files in plugins_dir
"""
names = set()
for path_ in self.load_path:
try:
add = set(os.listdir(path_))
names |= add
except OSError:
pass
plugins_files = [
name[:-3] for name in names if name.endswith('.py')
and name != '__init__.py' and not name.startswith('.')
]
plugins_files.sort()
position = the_input.get_argument_position(quoted=False)
return Completion(
the_input.new_completion,
plugins_files,
position,
'',
quotify=False)
def completion_unload(self, the_input):
"""
completion function that completes the name of loaded plugins
"""
position = the_input.get_argument_position(quoted=False)
return Completion(
the_input.new_completion,
sorted(self.plugins.keys()),
position,
'',
quotify=False)
def on_plugins_dir_change(self, _, new_value):
self.plugins_dir = Path(new_value).expanduser()
self.check_create_plugins_dir()
self.fill_load_path()
def on_plugins_conf_dir_change(self, _, new_value):
self.plugins_conf_dir = Path(new_value).expanduser()
self.check_create_plugins_conf_dir()
def initial_set_plugins_conf_dir(self):
"""
Create the plugins_conf_dir
"""
plugins_conf_dir = config.get('plugins_conf_dir')
self.plugins_conf_dir = Path(plugins_conf_dir).expanduser(
) if plugins_conf_dir else xdg.CONFIG_HOME / 'plugins'
self.check_create_plugins_conf_dir()
def check_create_plugins_conf_dir(self):
"""
Create the plugins config directory if it does not exist.
Returns True on success, False on failure.
"""
if not os.access(str(self.plugins_conf_dir), os.R_OK | os.X_OK):
try:
self.plugins_conf_dir.mkdir(parents=True, exist_ok=True)
except OSError:
log.error(
'Unable to create the plugin conf dir: %s',
self.plugins_conf_dir,
exc_info=True)
return False
return True
def initial_set_plugins_dir(self):
"""
Set the plugins_dir on start
"""
plugins_dir = config.get('plugins_dir')
self.plugins_dir = Path(plugins_dir).expanduser(
) if plugins_dir else xdg.DATA_HOME / 'plugins'
self.check_create_plugins_dir()
def check_create_plugins_dir(self):
"""
Create the plugins directory if it does not exist.
Returns True on success, False on failure.
"""
if not os.access(str(self.plugins_dir), os.R_OK | os.X_OK):
try:
self.plugins_dir.mkdir(parents=True, exist_ok=True)
except OSError:
log.error(
'Unable to create the plugins dir: %s',
self.plugins_dir,
exc_info=True)
return False
return True
def fill_load_path(self):
"""
Append the global packages and the source directory if available
"""
self.load_path = []
default_plugin_path = path.join(
path.dirname(path.dirname(__file__)), 'plugins')
if os.access(default_plugin_path, os.R_OK | os.X_OK):
self.load_path.insert(0, default_plugin_path)
if os.access(str(self.plugins_dir), os.R_OK | os.X_OK):
self.load_path.append(str(self.plugins_dir))
try:
import poezio_plugins
except:
pass
else:
if poezio_plugins.__path__:
self.load_path.append(list(poezio_plugins.__path__)[0])