slixmpp/sleekxmpp/xmlstream/scheduler.py

236 lines
7.8 KiB
Python
Raw Normal View History

2011-12-05 00:43:05 +00:00
# -*- coding: utf-8 -*-
2010-10-06 19:03:21 +00:00
"""
2011-12-05 00:43:05 +00:00
sleekxmpp.xmlstream.scheduler
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2010-10-06 19:03:21 +00:00
2011-12-05 00:43:05 +00:00
This module provides a task scheduler that works better
with SleekXMPP's threading usage than the stock version.
Part of SleekXMPP: The Sleek XMPP Library
:copyright: (c) 2011 Nathanael C. Fritz
:license: MIT, see LICENSE for more details
2010-10-06 19:03:21 +00:00
"""
2010-05-27 01:32:28 +00:00
import time
import threading
import logging
2010-10-06 19:03:21 +00:00
try:
import queue
except ImportError:
import Queue as queue
2010-05-27 01:32:28 +00:00
log = logging.getLogger(__name__)
2010-05-27 01:32:28 +00:00
class Task(object):
2010-10-06 19:03:21 +00:00
"""
A scheduled task that will be executed by the scheduler
after a given time interval has passed.
2011-12-05 00:43:05 +00:00
:param string name: The name of the task.
:param int seconds: The number of seconds to wait before executing.
:param callback: The function to execute.
:param tuple args: The arguments to pass to the callback.
:param dict kwargs: The keyword arguments to pass to the callback.
:param bool repeat: Indicates if the task should repeat.
Defaults to ``False``.
:param pointer: A pointer to an event queue for queuing callback
2010-10-06 19:03:21 +00:00
execution instead of executing immediately.
"""
def __init__(self, name, seconds, callback, args=None,
kwargs=None, repeat=False, qpointer=None):
2011-12-05 00:43:05 +00:00
#: The name of the task.
2010-10-06 19:03:21 +00:00
self.name = name
2011-12-05 00:43:05 +00:00
#: The number of seconds to wait before executing.
2010-10-06 19:03:21 +00:00
self.seconds = seconds
2011-12-05 00:43:05 +00:00
#: The function to execute once enough time has passed.
2010-10-06 19:03:21 +00:00
self.callback = callback
2011-12-05 00:43:05 +00:00
#: The arguments to pass to :attr:`callback`.
2010-10-06 19:03:21 +00:00
self.args = args or tuple()
2011-12-05 00:43:05 +00:00
#: The keyword arguments to pass to :attr:`callback`.
2010-10-06 19:03:21 +00:00
self.kwargs = kwargs or {}
2012-06-19 08:29:48 +00:00
2011-12-05 00:43:05 +00:00
#: Indicates if the task should repeat after executing,
#: using the same :attr:`seconds` delay.
2010-10-06 19:03:21 +00:00
self.repeat = repeat
2011-12-05 00:43:05 +00:00
#: The time when the task should execute next.
2010-10-06 19:03:21 +00:00
self.next = time.time() + self.seconds
2011-12-05 00:43:05 +00:00
#: The main event queue, which allows for callbacks to
#: be queued for execution instead of executing immediately.
2010-10-06 19:03:21 +00:00
self.qpointer = qpointer
def run(self):
2011-12-05 00:43:05 +00:00
"""Execute the task's callback.
2010-10-06 19:03:21 +00:00
If an event queue was supplied, place the callback in the queue;
otherwise, execute the callback immediately.
"""
if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback,
self.args, self.name))
2010-10-06 19:03:21 +00:00
else:
self.callback(*self.args, **self.kwargs)
self.reset()
return self.repeat
def reset(self):
2011-12-05 00:43:05 +00:00
"""Reset the task's timer so that it will repeat."""
2010-10-06 19:03:21 +00:00
self.next = time.time() + self.seconds
2010-05-27 01:32:28 +00:00
class Scheduler(object):
2010-10-06 19:03:21 +00:00
"""
A threaded scheduler that allows for updates mid-execution unlike the
scheduler in the standard library.
2011-12-05 00:43:05 +00:00
Based on: http://docs.python.org/library/sched.html#module-sched
:param parentstop: An :class:`~threading.Event` to signal stopping
the scheduler.
2010-10-06 19:03:21 +00:00
"""
def __init__(self, parentstop=None):
2011-12-05 00:43:05 +00:00
#: A queue for storing tasks
2010-10-06 19:03:21 +00:00
self.addq = queue.Queue()
2012-06-19 08:29:48 +00:00
2011-12-05 00:43:05 +00:00
#: A list of tasks in order of execution time.
2010-10-06 19:03:21 +00:00
self.schedule = []
2011-12-05 00:43:05 +00:00
#: If running in threaded mode, this will be the thread processing
#: the schedule.
2010-10-06 19:03:21 +00:00
self.thread = None
2011-12-05 00:43:05 +00:00
#: A flag indicating that the scheduler is running.
2010-10-06 19:03:21 +00:00
self.run = False
2011-12-05 00:43:05 +00:00
#: An :class:`~threading.Event` instance for signalling to stop
#: the scheduler.
self.stop = parentstop
2011-12-05 00:43:05 +00:00
#: Lock for accessing the task queue.
self.schedule_lock = threading.RLock()
2010-10-06 19:03:21 +00:00
def process(self, threaded=True, daemon=False):
2011-12-05 00:43:05 +00:00
"""Begin accepting and processing scheduled tasks.
2010-10-06 19:03:21 +00:00
2011-12-05 00:43:05 +00:00
:param bool threaded: Indicates if the scheduler should execute
in its own thread. Defaults to ``True``.
2010-10-06 19:03:21 +00:00
"""
if threaded:
2011-12-05 00:43:05 +00:00
self.thread = threading.Thread(name='scheduler_process',
2010-10-06 19:03:21 +00:00
target=self._process)
self.thread.daemon = daemon
2010-10-06 19:03:21 +00:00
self.thread.start()
else:
self._process()
def _process(self):
"""Process scheduled tasks."""
self.run = True
2010-10-14 01:15:21 +00:00
try:
while self.run and not self.stop.is_set():
wait = 0.1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
if wait <= 0.0:
newtask = self.addq.get(False)
2010-10-14 01:15:21 +00:00
else:
if wait >= 3.0:
wait = 3.0
newtask = None
elapsed = 0
while not self.stop.is_set() and \
newtask is None and \
elapsed < wait:
newtask = self.addq.get(True, 0.1)
elapsed += 0.1
except queue.Empty:
cleanup = []
self.schedule_lock.acquire()
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule_lock.acquire()
self.schedule.append(newtask)
finally:
if updated:
self.schedule = sorted(self.schedule,
key=lambda task: task.next)
self.schedule_lock.release()
2010-10-14 01:15:21 +00:00
except KeyboardInterrupt:
self.run = False
except SystemExit:
self.run = False
log.debug("Quitting Scheduler thread")
2010-10-06 19:03:21 +00:00
def add(self, name, seconds, callback, args=None,
kwargs=None, repeat=False, qpointer=None):
2011-12-05 00:43:05 +00:00
"""Schedule a new task.
:param string name: The name of the task.
:param int seconds: The number of seconds to wait before executing.
:param callback: The function to execute.
:param tuple args: The arguments to pass to the callback.
:param dict kwargs: The keyword arguments to pass to the callback.
:param bool repeat: Indicates if the task should repeat.
Defaults to ``False``.
:param pointer: A pointer to an event queue for queuing callback
2010-10-06 19:03:21 +00:00
execution instead of executing immediately.
"""
try:
self.schedule_lock.acquire()
for task in self.schedule:
if task.name == name:
raise ValueError("Key %s already exists" % name)
self.addq.put(Task(name, seconds, callback, args,
kwargs, repeat, qpointer))
except:
raise
finally:
self.schedule_lock.release()
def remove(self, name):
2011-12-05 00:43:05 +00:00
"""Remove a scheduled task ahead of schedule, and without
executing it.
2011-12-05 00:43:05 +00:00
:param string name: The name of the task to remove.
"""
try:
self.schedule_lock.acquire()
the_task = None
for task in self.schedule:
if task.name == name:
the_task = task
if the_task is not None:
self.schedule.remove(the_task)
except:
raise
finally:
self.schedule_lock.release()
2010-10-06 19:03:21 +00:00
def quit(self):
"""Shutdown the scheduler."""
self.run = False