Allow tasks to remove themselves during execution
The scheduler class is now capable with dealing with tasks which remove themselves from the scheduler during execution. Additionally, some optimizations were applied by use of iterators and some functions better suited for the purpose. Please peer-review, all tests pass.
This commit is contained in:
parent
5867f08bf1
commit
e3fab66dfb
1 changed files with 17 additions and 11 deletions
|
@ -15,6 +15,7 @@
|
||||||
import time
|
import time
|
||||||
import threading
|
import threading
|
||||||
import logging
|
import logging
|
||||||
|
import itertools
|
||||||
|
|
||||||
from sleekxmpp.util import Queue, QueueEmpty
|
from sleekxmpp.util import Queue, QueueEmpty
|
||||||
|
|
||||||
|
@ -156,17 +157,23 @@ class Scheduler(object):
|
||||||
newtask = self.addq.get(True, 0.1)
|
newtask = self.addq.get(True, 0.1)
|
||||||
elapsed += 0.1
|
elapsed += 0.1
|
||||||
except QueueEmpty:
|
except QueueEmpty:
|
||||||
cleanup = []
|
|
||||||
self.schedule_lock.acquire()
|
self.schedule_lock.acquire()
|
||||||
for task in self.schedule:
|
# select only those tasks which are to be executed now
|
||||||
if time.time() >= task.next:
|
relevant = itertools.takewhile(
|
||||||
updated = True
|
lambda task: time.time() >= task.next, self.schedule)
|
||||||
if not task.run():
|
# run the tasks and keep the return value in a tuple
|
||||||
cleanup.append(task)
|
status = map(lambda task: (task, task.run()), relevant)
|
||||||
|
# remove non-repeating tasks
|
||||||
|
for task, doRepeat in status:
|
||||||
|
if not doRepeat:
|
||||||
|
try:
|
||||||
|
self.schedule.remove(task)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
break
|
# only need to resort tasks if a repeated task has
|
||||||
for task in cleanup:
|
# been kept in the list.
|
||||||
self.schedule.pop(self.schedule.index(task))
|
updated = True
|
||||||
else:
|
else:
|
||||||
updated = True
|
updated = True
|
||||||
self.schedule_lock.acquire()
|
self.schedule_lock.acquire()
|
||||||
|
@ -174,8 +181,7 @@ class Scheduler(object):
|
||||||
self.schedule.append(newtask)
|
self.schedule.append(newtask)
|
||||||
finally:
|
finally:
|
||||||
if updated:
|
if updated:
|
||||||
self.schedule = sorted(self.schedule,
|
self.schedule.sort(key=lambda task: task.next)
|
||||||
key=lambda task: task.next)
|
|
||||||
self.schedule_lock.release()
|
self.schedule_lock.release()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
self.run = False
|
self.run = False
|
||||||
|
|
Loading…
Reference in a new issue