poezio/poezio/plugin_manager.py
Maxime “pep” Buquet a72152c462
plugins: Add refs to dependencies on plugins
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2020-04-12 16:43:53 +02:00

456 lines
16 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Plugin manager module.
Define the PluginManager class, the one that glues all the plugins and
the API together. Defines also a bunch of variables related to the
plugin env.
"""
import logging
import os
from typing import Dict, Set
from importlib import import_module, machinery
from pathlib import Path
from os import path
import pkg_resources
from poezio import tabs, xdg
from poezio.core.structs import Command, Completion
from poezio.plugin import PluginAPI
from poezio.config import config
log = logging.getLogger(__name__)
class PluginManager:
"""
Plugin Manager
Contains all the references to the plugins
And keeps track of everything the plugin has done through the API.
"""
rdeps: Dict[str, Set[str]] = {}
def __init__(self, core):
self.core = core
# module name -> module object
self.modules = {}
# module name -> plugin object
self.plugins = {}
# module name -> dict of commands loaded for the module
self.commands = {}
# module name -> list of event_name/handler pairs loaded for the module
self.event_handlers = {}
# module name -> dict of tab types; tab type -> commands
# loaded by the module
self.tab_commands = {}
# module name → dict of keys/handlers loaded for the module
self.keys = {}
# module name → dict of tab types; tab type → list of keybinds (tuples)
self.tab_keys = {}
self.roster_elements = {}
self.finder = machinery.PathFinder()
self.initial_set_plugins_dir()
self.initial_set_plugins_conf_dir()
self.fill_load_path()
self.plugin_api = PluginAPI(core, self)
def disable_plugins(self):
for plugin in set(self.plugins.keys()):
self.unload(plugin, notify=False)
def set_rdeps(self, name):
"""
Runs through plugin dependencies to build the reverse dependencies table.
"""
if name not in self.rdeps:
self.rdeps[name] = set()
for dep in self.plugins[name].dependencies:
if dep not in self.rdeps:
self.rdeps[dep] = {name}
else:
self.rdeps[dep].add(name)
def load(self, name: str, notify=True, unload_first=True):
"""
Load a plugin.
"""
if not unload_first and name in self.plugins:
return None
if name in self.plugins:
self.unload(name)
try:
module = None
loader = self.finder.find_module(name, self.load_path)
if loader:
log.debug('Found candidate loader for plugin %s: %r', name, loader)
module = loader.load_module()
if module is None:
log.debug('Failed to load plugin %s from loader', name)
else:
try:
module = import_module('poezio_plugins.%s' % name)
except ModuleNotFoundError:
pass
for entry in pkg_resources.iter_entry_points('poezio_plugins'):
if entry.name == name:
log.debug('Found candidate entry for plugin %s: %r', name, entry)
try:
module = entry.load()
except Exception as exn:
log.debug('Failed to import plugin: %s\n%r', name,
exn, exc_info=True)
finally:
break
if not module:
self.core.information('Could not find plugin: %s' % name,
'Error')
return
log.debug('Plugin %s loaded from "%s"', name, module.__file__)
except Exception as e:
log.debug("Could not load plugin %s", name, exc_info=True)
self.core.information("Could not load plugin %s: %s" % (name, e),
'Error')
finally:
if not module:
return
self.modules[name] = module
self.commands[name] = {}
self.keys[name] = {}
self.tab_keys[name] = {}
self.tab_commands[name] = {}
self.event_handlers[name] = []
try:
self.plugins[name] = None
for dep in module.Plugin.dependencies:
self.load(dep, unload_first=False)
if dep not in self.plugins:
log.debug(
'Plugin %s couldn\'t load because of dependency %s',
name, dep
)
return None
# Add reference of the dep to the plugin's usage
module.Plugin.refs[dep] = self.plugins[dep]
self.plugins[name] = module.Plugin(name, self.plugin_api, self.core,
self.plugins_conf_dir)
self.set_rdeps(name)
except Exception as e:
log.error('Error while loading the plugin %s', name, exc_info=True)
if notify:
self.core.information(
'Unable to load the plugin %s: %s' % (name, e), 'Error')
self.unload(name, notify=False)
else:
if notify:
self.core.information('Plugin %s loaded' % name, 'Info')
def unload(self, name: str, notify=True):
"""
Unloads plugin as well as plugins depending on it.
"""
if name in self.plugins:
try:
if self.plugins[name] is not None:
self.plugins[name]._unloading = True # Prevents loops
for rdep in self.rdeps[name].copy():
if rdep in self.plugins and not self.plugins[rdep]._unloading:
self.unload(rdep)
if rdep in self.plugins:
log.debug('Failed to unload reverse dependency %s first.', rdep)
return None
for command in self.commands[name].keys():
del self.core.commands[command]
for key in self.keys[name].keys():
del self.core.key_func[key]
for tab in list(self.tab_commands[name].keys()):
for command in self.tab_commands[name][tab][:]:
self.del_tab_command(name, getattr(tabs, tab),
command[0])
del self.tab_commands[name][tab]
for tab in list(self.tab_keys[name].keys()):
for key in self.tab_keys[name][tab][:]:
self.del_tab_key(name, getattr(tabs, tab), key[0])
del self.tab_keys[name][tab]
for event_name, handler in self.event_handlers[name][:]:
self.del_event_handler(name, event_name, handler)
if self.plugins[name] is not None:
self.plugins[name].unload()
del self.plugins[name]
del self.rdeps[name]
del self.commands[name]
del self.keys[name]
del self.tab_commands[name]
del self.event_handlers[name]
if notify:
self.core.information('Plugin %s unloaded' % name, 'Info')
except Exception as e:
log.debug("Could not unload plugin %s", name, exc_info=True)
self.core.information(
"Could not unload plugin %s: %s" % (name, e), 'Error')
def add_command(self,
module_name,
name,
handler,
help,
completion=None,
short='',
usage=''):
"""
Add a global command.
"""
if name in self.core.commands:
raise Exception("Command '%s' already exists" % (name, ))
commands = self.commands[module_name]
commands[name] = Command(handler, help, completion, short, usage)
self.core.commands[name] = commands[name]
def del_command(self, module_name, name):
"""
Remove a global command added through add_command.
"""
if name in self.commands[module_name]:
del self.commands[module_name][name]
if name in self.core.commands:
del self.core.commands[name]
def add_tab_command(self,
module_name,
tab_type,
name,
handler,
help,
completion=None,
short='',
usage=''):
"""
Add a command only for a type of Tab.
"""
commands = self.tab_commands[module_name]
t = tab_type.__name__
if name in tab_type.plugin_commands:
return
if t not in commands:
commands[t] = []
commands[t].append((name, handler, help, completion))
tab_type.plugin_commands[name] = Command(handler, help, completion,
short, usage)
for tab in self.core.tabs:
if isinstance(tab, tab_type):
tab.update_commands()
def del_tab_command(self, module_name, tab_type, name):
"""
Remove a command added through add_tab_command.
"""
commands = self.tab_commands[module_name]
t = tab_type.__name__
if t not in commands:
return
for command in commands[t]:
if command[0] == name:
commands[t].remove(command)
del tab_type.plugin_commands[name]
for tab in self.core.tabs:
if isinstance(tab, tab_type) and name in tab.commands:
del tab.commands[name]
def add_tab_key(self, module_name, tab_type, key, handler):
"""
Associate a key binding to a handler only for a type of Tab.
"""
keys = self.tab_keys[module_name]
t = tab_type.__name__
if key in tab_type.plugin_keys:
return
if t not in keys:
keys[t] = []
keys[t].append((key, handler))
tab_type.plugin_keys[key] = handler
for tab in self.core.tabs:
if isinstance(tab, tab_type):
tab.update_keys()
def del_tab_key(self, module_name, tab_type, key):
"""
Remove a key binding added through add_tab_key.
"""
keys = self.tab_keys[module_name]
t = tab_type.__name__
if t not in keys:
return
for _key in keys[t]:
if _key[0] == key:
keys[t].remove(_key)
del tab_type.plugin_keys[key]
for tab in self.core.tabs:
if isinstance(tab, tab_type) and key in tab.key_func:
del tab.key_func[key]
def add_key(self, module_name, key, handler):
"""
Associate a global key binding to a handler, except if it
already exists.
"""
if key in self.core.key_func:
raise Exception("Key '%s' already exists" % (key, ))
keys = self.keys[module_name]
keys[key] = handler
self.core.key_func[key] = handler
def del_key(self, module_name, key):
"""
Remove a global key binding added by a plugin.
"""
if key in self.keys[module_name]:
del self.keys[module_name][key]
if key in self.core.key_func:
del self.core.commands[key]
def add_event_handler(self, module_name, event_name, handler, *args, **kwargs):
"""
Add an event handler. If event_name isnt in the event list, assume
it is a slixmpp event.
"""
eh = self.event_handlers[module_name]
eh.append((event_name, handler))
if event_name in self.core.events.events:
self.core.events.add_event_handler(event_name, handler, *args, **kwargs)
else:
self.core.xmpp.add_event_handler(event_name, handler)
def del_event_handler(self, module_name, event_name, handler):
"""
Remove an event handler if it exists.
"""
if event_name in self.core.events.events:
self.core.events.del_event_handler(None, handler)
else:
self.core.xmpp.del_event_handler(event_name, handler)
eh = self.event_handlers[module_name]
eh = [e for e in eh if e != (event_name, handler)]
def completion_load(self, the_input):
"""
completion function that completes the name of the plugins, from
all .py files in plugins_dir
"""
names = set()
for path_ in self.load_path:
try:
add = set(os.listdir(path_))
names |= add
except OSError:
pass
plugins_files = [
name[:-3] for name in names if name.endswith('.py')
and name != '__init__.py' and not name.startswith('.')
]
plugins_files.sort()
position = the_input.get_argument_position(quoted=False)
return Completion(
the_input.new_completion,
plugins_files,
position,
'',
quotify=False)
def completion_unload(self, the_input):
"""
completion function that completes the name of loaded plugins
"""
position = the_input.get_argument_position(quoted=False)
return Completion(
the_input.new_completion,
sorted(self.plugins.keys()),
position,
'',
quotify=False)
def on_plugins_dir_change(self, _, new_value):
self.plugins_dir = Path(new_value).expanduser()
self.check_create_plugins_dir()
self.fill_load_path()
def on_plugins_conf_dir_change(self, _, new_value):
self.plugins_conf_dir = Path(new_value).expanduser()
self.check_create_plugins_conf_dir()
def initial_set_plugins_conf_dir(self):
"""
Create the plugins_conf_dir
"""
plugins_conf_dir = config.get('plugins_conf_dir')
self.plugins_conf_dir = Path(plugins_conf_dir).expanduser(
) if plugins_conf_dir else xdg.CONFIG_HOME / 'plugins'
self.check_create_plugins_conf_dir()
def check_create_plugins_conf_dir(self):
"""
Create the plugins config directory if it does not exist.
Returns True on success, False on failure.
"""
if not os.access(str(self.plugins_conf_dir), os.R_OK | os.X_OK):
try:
self.plugins_conf_dir.mkdir(parents=True, exist_ok=True)
except OSError:
log.error(
'Unable to create the plugin conf dir: %s',
self.plugins_conf_dir,
exc_info=True)
return False
return True
def initial_set_plugins_dir(self):
"""
Set the plugins_dir on start
"""
plugins_dir = config.get('plugins_dir')
self.plugins_dir = Path(plugins_dir).expanduser(
) if plugins_dir else xdg.DATA_HOME / 'plugins'
self.check_create_plugins_dir()
def check_create_plugins_dir(self):
"""
Create the plugins directory if it does not exist.
Returns True on success, False on failure.
"""
if not os.access(str(self.plugins_dir), os.R_OK | os.X_OK):
try:
self.plugins_dir.mkdir(parents=True, exist_ok=True)
except OSError:
log.error(
'Unable to create the plugins dir: %s',
self.plugins_dir,
exc_info=True)
return False
return True
def fill_load_path(self):
"""
Append the global packages and the source directory if available
"""
self.load_path = []
default_plugin_path = path.join(
path.dirname(path.dirname(__file__)), 'plugins')
if os.access(default_plugin_path, os.R_OK | os.X_OK):
self.load_path.insert(0, default_plugin_path)
if os.access(str(self.plugins_dir), os.R_OK | os.X_OK):
self.load_path.append(str(self.plugins_dir))