feature: Add a MAM history filler
This commit is contained in:
parent
48abe2ad7e
commit
bf8965fb4b
3 changed files with 105 additions and 23 deletions
|
@ -1,8 +1,16 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import List, Dict, Any
|
||||
from poezio.config import config
|
||||
from poezio import tabs
|
||||
from poezio.logger import iterate_messages_reverse, Logger
|
||||
from poezio.logger import (
|
||||
build_log_message,
|
||||
iterate_messages_reverse,
|
||||
last_message_in_archive,
|
||||
Logger,
|
||||
)
|
||||
from poezio.mam import (
|
||||
fetch_history,
|
||||
NoMAMSupportException,
|
||||
|
@ -25,26 +33,27 @@ def make_line_local(tab: tabs.ChatTab, msg: Dict[str, Any]) -> Message:
|
|||
jid.resource = msg['nickname']
|
||||
else:
|
||||
jid = JID(tab.jid)
|
||||
msg['time'] = msg['time'].astimezone(tz=timezone.utc)
|
||||
return make_line(tab, msg['txt'], msg['time'], jid, '')
|
||||
|
||||
|
||||
STATUS = {'mam_only', 'local_only', 'local_mam_completed'}
|
||||
STATUS = {'mam_only', 'local_only'}
|
||||
|
||||
|
||||
class LogLoader:
|
||||
"""
|
||||
An ephemeral class that loads history in a tab
|
||||
An ephemeral class that loads history in a tab.
|
||||
|
||||
Loading from local logs is blocked until history has been fetched from
|
||||
MAM to fill the local archive.
|
||||
"""
|
||||
load_status: str = 'mam_only'
|
||||
logger: Logger
|
||||
tab: tabs.ChatTab
|
||||
mam_only: bool
|
||||
|
||||
def __init__(self, logger: Logger, tab: tabs.ChatTab,
|
||||
load_status: str = 'local_only'):
|
||||
if load_status not in STATUS:
|
||||
self.load_status = 'mam_only'
|
||||
else:
|
||||
self.load_status = load_status
|
||||
mam_only: bool = True):
|
||||
self.mam_only = mam_only
|
||||
self.logger = logger
|
||||
self.tab = tab
|
||||
|
||||
|
@ -53,12 +62,12 @@ class LogLoader:
|
|||
amount = 2 * self.tab.text_win.height
|
||||
gap = self.tab._text_buffer.find_last_gap_muc()
|
||||
if gap is not None:
|
||||
if self.load_status == 'local_only':
|
||||
messages = await self.local_fill_gap(gap)
|
||||
else:
|
||||
if self.mam_only:
|
||||
messages = await self.mam_fill_gap(gap)
|
||||
else:
|
||||
messages = await self.local_fill_gap(gap)
|
||||
else:
|
||||
if self.load_status == 'mam_only':
|
||||
if self.mam_only:
|
||||
messages = await self.mam_tab_open(amount)
|
||||
else:
|
||||
messages = await self.local_tab_open(amount)
|
||||
|
@ -84,6 +93,7 @@ class LogLoader:
|
|||
tab.query_status = False
|
||||
|
||||
async def local_tab_open(self, nb: int) -> List[BaseMessage]:
|
||||
await self.wait_mam()
|
||||
results: List[BaseMessage] = []
|
||||
filepath = self.logger.get_file_path(self.tab.jid)
|
||||
for msg in iterate_messages_reverse(filepath):
|
||||
|
@ -111,6 +121,7 @@ class LogLoader:
|
|||
tab.query_status = False
|
||||
|
||||
async def local_fill_gap(self, gap: HistoryGap) -> List[BaseMessage]:
|
||||
await self.wait_mam()
|
||||
start = gap.last_timestamp_before_leave
|
||||
end = gap.first_timestamp_after_join
|
||||
|
||||
|
@ -141,16 +152,16 @@ class LogLoader:
|
|||
if rest > 1:
|
||||
return None
|
||||
|
||||
if self.load_status == 'mam_only':
|
||||
if self.mam_only:
|
||||
messages = await self.mam_scroll_requested(height)
|
||||
else:
|
||||
messages = await self.local_scroll_requested(height)
|
||||
log.debug('%s %s', messages[0].txt, messages[0].time)
|
||||
tab._text_buffer.add_history_messages(messages)
|
||||
if messages:
|
||||
tab._text_buffer.add_history_messages(messages)
|
||||
tab.core.refresh_window()
|
||||
|
||||
async def local_scroll_requested(self, nb: int) -> List[BaseMessage]:
|
||||
await self.wait_mam()
|
||||
tab = self.tab
|
||||
last_message_time = None
|
||||
if tab._text_buffer.messages:
|
||||
|
@ -183,12 +194,80 @@ class LogLoader:
|
|||
messages = [EndOfArchive('End of archive reached', time=time)]
|
||||
return messages
|
||||
except NoMAMSupportException:
|
||||
return await self.local_scroll_requested(nb)
|
||||
return []
|
||||
except (MAMQueryException, DiscoInfoException):
|
||||
tab.core.information(
|
||||
f'An error occured when fetching MAM for {tab.jid}',
|
||||
'Error'
|
||||
)
|
||||
return await self.local_scroll_requested(nb)
|
||||
return []
|
||||
finally:
|
||||
tab.query_status = False
|
||||
|
||||
async def wait_mam(self) -> None:
|
||||
if not isinstance(self.tab, tabs.MucTab):
|
||||
return
|
||||
if self.tab.mam_filler is None:
|
||||
return
|
||||
await self.tab.mam_filler.done.wait()
|
||||
|
||||
|
||||
class MAMFiller:
|
||||
"""Class that loads messages from MAM history into the local logs.
|
||||
"""
|
||||
tab: tabs.ChatTab
|
||||
logger: Logger
|
||||
future: asyncio.Future
|
||||
done: asyncio.Event
|
||||
|
||||
def __init__(self, tab: tabs.ChatTab, logger: Logger):
|
||||
self.tab = tab
|
||||
self.logger = logger
|
||||
logger.fd_busy(str(tab.jid))
|
||||
self.future = asyncio.ensure_future(self.fetch_routine())
|
||||
self.done = asyncio.Event()
|
||||
|
||||
def cancel(self):
|
||||
self.future.cancel()
|
||||
self.end()
|
||||
|
||||
async def fetch_routine(self) -> None:
|
||||
filepath = self.logger.get_file_path(self.tab.jid)
|
||||
try:
|
||||
last_msg = last_message_in_archive(filepath)
|
||||
last_msg_time = None
|
||||
if last_msg:
|
||||
last_msg_time = last_msg['time'] + timedelta(seconds=1)
|
||||
try:
|
||||
messages = await fetch_history(
|
||||
self.tab,
|
||||
start=last_msg_time,
|
||||
amount=2000,
|
||||
)
|
||||
except (DiscoInfoException, NoMAMSupportException, MAMQueryException):
|
||||
log.debug('Failed for %s', self.tab.jid, exc_info=True)
|
||||
return
|
||||
log.debug('Fetched %s:\n%s', len(messages), messages)
|
||||
|
||||
def build_message(msg):
|
||||
return build_log_message(
|
||||
msg.nickname,
|
||||
msg.txt,
|
||||
msg.time,
|
||||
prefix='MR',
|
||||
)
|
||||
|
||||
logs = ''.join(map(build_message, messages))
|
||||
log.debug(logs)
|
||||
|
||||
self.logger.log_raw(self.tab.jid, logs, force=True)
|
||||
except Exception as exc:
|
||||
log.debug('exception: %s', exc, exc_info=True)
|
||||
finally:
|
||||
log.debug('finishing fill for %s', self.tab.jid)
|
||||
self.end()
|
||||
|
||||
def end(self):
|
||||
self.logger.fd_available(str(self.tab.jid))
|
||||
self.tab.mam_filler = None
|
||||
self.done.set()
|
||||
|
|
|
@ -967,7 +967,7 @@ class ChatTab(Tab):
|
|||
if not self.query_status:
|
||||
from poezio.log_loader import LogLoader
|
||||
asyncio.ensure_future(
|
||||
LogLoader(logger, self).scroll_requested()
|
||||
LogLoader(logger, self, config.get('use_log')).scroll_requested()
|
||||
)
|
||||
return self.text_win.scroll_up(self.text_win.height - 1)
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ from poezio.config import config
|
|||
from poezio.core.structs import Command
|
||||
from poezio.decorators import refresh_wrapper, command_args_parser
|
||||
from poezio.logger import logger
|
||||
from poezio.log_loader import LogLoader
|
||||
from poezio.log_loader import LogLoader, MAMFiller
|
||||
from poezio.roster import roster
|
||||
from poezio.theming import get_theme, dump_tuple
|
||||
from poezio.user import User
|
||||
|
@ -83,7 +83,8 @@ class MucTab(ChatTab):
|
|||
plugin_commands: Dict[str, Command] = {}
|
||||
plugin_keys: Dict[str, Callable[..., Any]] = {}
|
||||
additional_information: Dict[str, Callable[[str], str]] = {}
|
||||
lagged = False
|
||||
lagged: bool = False
|
||||
mam_filler: Optional[MAMFiller]
|
||||
|
||||
def __init__(self, core: Core, jid: JID, nick: str, password: Optional[str] = None) -> None:
|
||||
ChatTab.__init__(self, core, jid)
|
||||
|
@ -104,6 +105,7 @@ class MucTab(ChatTab):
|
|||
self.topic_from = ''
|
||||
# Self ping event, so we can cancel it when we leave the room
|
||||
self.self_ping_event: Optional[timed_events.DelayedEvent] = None
|
||||
self.mam_filler = None
|
||||
# UI stuff
|
||||
self.topic_win = windows.Topic()
|
||||
self.v_separator = windows.VerticalSeparator()
|
||||
|
@ -179,6 +181,7 @@ class MucTab(ChatTab):
|
|||
seconds = None
|
||||
if last_message is not None:
|
||||
seconds = (datetime.now() - last_message.time).seconds
|
||||
self.mam_filler = MAMFiller(self, logger)
|
||||
muc.join_groupchat(
|
||||
self.core,
|
||||
self.jid.bare,
|
||||
|
@ -605,7 +608,7 @@ class MucTab(ChatTab):
|
|||
),
|
||||
)
|
||||
asyncio.ensure_future(
|
||||
LogLoader(logger, self).tab_open(),
|
||||
LogLoader(logger, self, config.get('use_log')).tab_open(),
|
||||
)
|
||||
|
||||
def handle_presence_joined(self, presence: Presence, status_codes: Set[int]) -> None:
|
||||
|
|
Loading…
Reference in a new issue