internal: better typing & docstrings for logger/log loader
This commit is contained in:
parent
a827743b86
commit
6d7921da55
2 changed files with 118 additions and 47 deletions
|
@ -1,15 +1,20 @@
|
|||
"""
|
||||
This modules contains a class that loads messages into a ChatTab, either from
|
||||
MAM or the local logs, and a class that loads MUC history into the local
|
||||
logs.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import List, Dict, Any
|
||||
from poezio.config import config
|
||||
from typing import List
|
||||
from poezio import tabs
|
||||
from poezio.logger import (
|
||||
build_log_message,
|
||||
iterate_messages_reverse,
|
||||
last_message_in_archive,
|
||||
Logger,
|
||||
LogDict,
|
||||
)
|
||||
from poezio.mam import (
|
||||
fetch_history,
|
||||
|
@ -27,17 +32,20 @@ from slixmpp import JID
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def make_line_local(tab: tabs.ChatTab, msg: Dict[str, Any]) -> Message:
|
||||
def make_line_local(tab: tabs.ChatTab, msg: LogDict) -> Message:
|
||||
"""Create a UI message from a local log read.
|
||||
|
||||
:param tab: Tab in which that message will be displayed
|
||||
:param msg: Log data
|
||||
:returns: The UI message
|
||||
"""
|
||||
if isinstance(tab, tabs.MucTab):
|
||||
jid = JID(tab.jid)
|
||||
jid.resource = msg['nickname']
|
||||
jid.resource = msg.get('nickname') or ''
|
||||
else:
|
||||
jid = JID(tab.jid)
|
||||
msg['time'] = msg['time'].astimezone(tz=timezone.utc)
|
||||
return make_line(tab, msg['txt'], msg['time'], jid, '')
|
||||
|
||||
|
||||
STATUS = {'mam_only', 'local_only'}
|
||||
return make_line(tab, msg['txt'], msg['time'], jid, '', msg['nickname'])
|
||||
|
||||
|
||||
class LogLoader:
|
||||
|
@ -52,15 +60,16 @@ class LogLoader:
|
|||
mam_only: bool
|
||||
|
||||
def __init__(self, logger: Logger, tab: tabs.ChatTab,
|
||||
mam_only: bool = True):
|
||||
self.mam_only = mam_only
|
||||
local_logs: bool = True):
|
||||
self.mam_only = not local_logs
|
||||
self.logger = logger
|
||||
self.tab = tab
|
||||
|
||||
async def tab_open(self):
|
||||
async def tab_open(self) -> None:
|
||||
"""Called on a tab opening or a MUC join"""
|
||||
amount = 2 * self.tab.text_win.height
|
||||
gap = self.tab._text_buffer.find_last_gap_muc()
|
||||
messages = []
|
||||
if gap is not None:
|
||||
if self.mam_only:
|
||||
messages = await self.mam_fill_gap(gap)
|
||||
|
@ -72,11 +81,20 @@ class LogLoader:
|
|||
else:
|
||||
messages = await self.local_tab_open(amount)
|
||||
|
||||
log.debug(
|
||||
'Fetched %s messages for %s',
|
||||
len(messages), self.tab.jid
|
||||
)
|
||||
if messages:
|
||||
self.tab._text_buffer.add_history_messages(messages)
|
||||
self.tab.core.refresh_window()
|
||||
|
||||
async def mam_tab_open(self, nb: int) -> List[BaseMessage]:
|
||||
"""Fetch messages in MAM when opening a new tab.
|
||||
|
||||
:param nb: number of max messages to fetch.
|
||||
:returns: list of ui messages to add
|
||||
"""
|
||||
tab = self.tab
|
||||
end = datetime.now()
|
||||
for message in tab._text_buffer.messages:
|
||||
|
@ -93,6 +111,11 @@ class LogLoader:
|
|||
tab.query_status = False
|
||||
|
||||
async def local_tab_open(self, nb: int) -> List[BaseMessage]:
|
||||
"""Fetch messages locally when opening a new tab.
|
||||
|
||||
:param nb: number of max messages to fetch.
|
||||
:returns: list of ui messages to add
|
||||
"""
|
||||
await self.wait_mam()
|
||||
results: List[BaseMessage] = []
|
||||
filepath = self.logger.get_file_path(self.tab.jid)
|
||||
|
@ -105,6 +128,11 @@ class LogLoader:
|
|||
return results[::-1]
|
||||
|
||||
async def mam_fill_gap(self, gap: HistoryGap) -> List[BaseMessage]:
|
||||
"""Fill a message gap in an existing tab using MAM.
|
||||
|
||||
:param gap: Object describing the history gap
|
||||
:returns: list of ui messages to add
|
||||
"""
|
||||
tab = self.tab
|
||||
|
||||
start = gap.last_timestamp_before_leave
|
||||
|
@ -121,6 +149,12 @@ class LogLoader:
|
|||
tab.query_status = False
|
||||
|
||||
async def local_fill_gap(self, gap: HistoryGap) -> List[BaseMessage]:
|
||||
"""Fill a message gap in an existing tab using the local logs.
|
||||
Mostly useless when not used with the MAMFiller.
|
||||
|
||||
:param gap: Object describing the history gap
|
||||
:returns: list of ui messages to add
|
||||
"""
|
||||
await self.wait_mam()
|
||||
start = gap.last_timestamp_before_leave
|
||||
end = gap.first_timestamp_after_join
|
||||
|
@ -161,6 +195,11 @@ class LogLoader:
|
|||
tab.core.refresh_window()
|
||||
|
||||
async def local_scroll_requested(self, nb: int) -> List[BaseMessage]:
|
||||
"""Fetch messages locally on scroll up.
|
||||
|
||||
:param nb: Number of messages to fetch
|
||||
:returns: list of ui messages to add
|
||||
"""
|
||||
await self.wait_mam()
|
||||
tab = self.tab
|
||||
last_message_time = None
|
||||
|
@ -180,6 +219,11 @@ class LogLoader:
|
|||
return results[::-1]
|
||||
|
||||
async def mam_scroll_requested(self, nb: int) -> List[BaseMessage]:
|
||||
"""Fetch messages from MAM on scroll up.
|
||||
|
||||
:param nb: Number of messages to fetch
|
||||
:returns: list of ui messages to add
|
||||
"""
|
||||
tab = self.tab
|
||||
try:
|
||||
messages = await fetch_history(tab, amount=nb)
|
||||
|
@ -205,8 +249,10 @@ class LogLoader:
|
|||
tab.query_status = False
|
||||
|
||||
async def wait_mam(self) -> None:
|
||||
if not isinstance(self.tab, tabs.MucTab):
|
||||
return
|
||||
"""Waitt for the MAM history sync before reading the local logs.
|
||||
|
||||
Does nothing apart from blocking.
|
||||
"""
|
||||
if self.tab.mam_filler is None:
|
||||
return
|
||||
await self.tab.mam_filler.done.wait()
|
||||
|
@ -219,20 +265,24 @@ class MAMFiller:
|
|||
logger: Logger
|
||||
future: asyncio.Future
|
||||
done: asyncio.Event
|
||||
max_msgs: int = 2000
|
||||
|
||||
def __init__(self, tab: tabs.ChatTab, logger: Logger):
|
||||
def __init__(self, logger: Logger, tab: tabs.ChatTab):
|
||||
self.tab = tab
|
||||
self.logger = logger
|
||||
logger.fd_busy(str(tab.jid))
|
||||
logger.fd_busy(tab.jid)
|
||||
self.future = asyncio.ensure_future(self.fetch_routine())
|
||||
self.done = asyncio.Event()
|
||||
|
||||
def cancel(self):
|
||||
def cancel(self) -> None:
|
||||
"""Cancel the routine and signal the end."""
|
||||
self.future.cancel()
|
||||
self.end()
|
||||
|
||||
async def fetch_routine(self) -> None:
|
||||
"""Load logs into the local archive, if possible."""
|
||||
filepath = self.logger.get_file_path(self.tab.jid)
|
||||
log.debug('Fetching logs for %s', self.tab.jid)
|
||||
try:
|
||||
last_msg = last_message_in_archive(filepath)
|
||||
last_msg_time = None
|
||||
|
@ -242,14 +292,20 @@ class MAMFiller:
|
|||
messages = await fetch_history(
|
||||
self.tab,
|
||||
start=last_msg_time,
|
||||
amount=2000,
|
||||
amount=self.max_msgs,
|
||||
)
|
||||
log.debug(
|
||||
'Fetched %s messages to fill local logs for %s',
|
||||
len(messages), self.tab.jid,
|
||||
)
|
||||
except (DiscoInfoException, NoMAMSupportException, MAMQueryException):
|
||||
log.debug('Failed for %s', self.tab.jid, exc_info=True)
|
||||
log.debug(
|
||||
'Failed fetching logs for %s',
|
||||
self.tab.jid, exc_info=True
|
||||
)
|
||||
return
|
||||
log.debug('Fetched %s:\n%s', len(messages), messages)
|
||||
|
||||
def build_message(msg):
|
||||
def build_message(msg) -> str:
|
||||
return build_log_message(
|
||||
msg.nickname,
|
||||
msg.txt,
|
||||
|
@ -258,16 +314,17 @@ class MAMFiller:
|
|||
)
|
||||
|
||||
logs = ''.join(map(build_message, messages))
|
||||
log.debug(logs)
|
||||
|
||||
self.logger.log_raw(self.tab.jid, logs, force=True)
|
||||
except Exception as exc:
|
||||
log.debug('exception: %s', exc, exc_info=True)
|
||||
finally:
|
||||
log.debug('finishing fill for %s', self.tab.jid)
|
||||
self.end()
|
||||
|
||||
def end(self):
|
||||
self.logger.fd_available(str(self.tab.jid))
|
||||
def end(self) -> None:
|
||||
"""End a MAM fill (error or sucess). Remove references and signal on
|
||||
the Event().
|
||||
"""
|
||||
try:
|
||||
self.logger.fd_available(self.tab.jid)
|
||||
except Exception:
|
||||
log.error('Error when restoring log fd:', exc_info=True)
|
||||
self.tab.mam_filler = None
|
||||
self.done.set()
|
||||
|
|
|
@ -11,7 +11,7 @@ conversations and roster changes
|
|||
|
||||
import mmap
|
||||
import re
|
||||
from typing import List, Dict, Optional, IO, Any, Union
|
||||
from typing import List, Dict, Optional, IO, Any, Union, Generator
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
|
@ -19,6 +19,8 @@ from poezio import common
|
|||
from poezio.config import config
|
||||
from poezio.xhtml import clean_text
|
||||
from poezio.ui.types import Message, BaseMessage, LoggableTrait
|
||||
from slixmpp import JID
|
||||
from slixmpp.types import TypedDict, Literal
|
||||
|
||||
import logging
|
||||
|
||||
|
@ -63,6 +65,16 @@ class LogMessage(LogItem):
|
|||
self.nick = nick
|
||||
|
||||
|
||||
LogDict = TypedDict(
|
||||
'LogDict',
|
||||
{
|
||||
'type': Literal['message', 'info'], 'txt': str, 'time': datetime,
|
||||
'history': bool, 'nickname': str
|
||||
},
|
||||
total=False,
|
||||
)
|
||||
|
||||
|
||||
def parse_log_line(msg: str, jid: str = '') -> Optional[LogItem]:
|
||||
"""Parse a log line.
|
||||
|
||||
|
@ -111,33 +123,35 @@ class Logger:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
def get_file_path(self, jid: str) -> Path:
|
||||
def get_file_path(self, jid: Union[str, JID]) -> Path:
|
||||
"""Return the log path for a specific jid"""
|
||||
jidstr = str(jid).replace('/', '\\')
|
||||
return self.log_dir / jidstr
|
||||
|
||||
def fd_busy(self, jid: str) -> None:
|
||||
def fd_busy(self, jid: Union[str, JID]) -> None:
|
||||
"""Signal to the logger that this logfile is busy elsewhere.
|
||||
And that the messages should be queued to be logged later.
|
||||
|
||||
:param jid: file name
|
||||
"""
|
||||
self._busy_fds[jid] = True
|
||||
self._buffered_fds[jid] = []
|
||||
jidstr = str(jid).replace('/', '\\')
|
||||
self._busy_fds[jidstr] = True
|
||||
self._buffered_fds[jidstr] = []
|
||||
|
||||
def fd_available(self, jid: str) -> None:
|
||||
def fd_available(self, jid: Union[str, JID]) -> None:
|
||||
"""Signal to the logger that this logfile is no longer busy.
|
||||
And write messages to the end.
|
||||
|
||||
:param jid: file name
|
||||
"""
|
||||
if jid in self._busy_fds:
|
||||
del self._busy_fds[jid]
|
||||
if jid in self._buffered_fds:
|
||||
msgs = ''.join(self._buffered_fds.pop(jid))
|
||||
if jid in self._fds:
|
||||
self._fds[jid].close()
|
||||
del self._fds[jid]
|
||||
jidstr = str(jid).replace('/', '\\')
|
||||
if jidstr in self._busy_fds:
|
||||
del self._busy_fds[jidstr]
|
||||
if jidstr in self._buffered_fds:
|
||||
msgs = ''.join(self._buffered_fds.pop(jidstr))
|
||||
if jidstr in self._fds:
|
||||
self._fds[jidstr].close()
|
||||
del self._fds[jidstr]
|
||||
self.log_raw(jid, msgs)
|
||||
|
||||
def close(self, jid: str) -> None:
|
||||
|
@ -226,7 +240,7 @@ class Logger:
|
|||
return True
|
||||
return self.log_raw(jid, logged_msg)
|
||||
|
||||
def log_raw(self, jid: str, logged_msg: str, force: bool = False) -> bool:
|
||||
def log_raw(self, jid: Union[str, JID], logged_msg: str, force: bool = False) -> bool:
|
||||
"""Log a raw string.
|
||||
|
||||
:param jid: filename
|
||||
|
@ -334,7 +348,7 @@ def build_log_message(nick: str,
|
|||
return logged_msg + ''.join(' %s\n' % line for line in lines)
|
||||
|
||||
|
||||
def last_message_in_archive(filepath: Path) -> Optional[Dict]:
|
||||
def last_message_in_archive(filepath: Path) -> Optional[LogDict]:
|
||||
"""Get the last message from the local archive.
|
||||
|
||||
:param filepath: the log file path
|
||||
|
@ -347,7 +361,7 @@ def last_message_in_archive(filepath: Path) -> Optional[Dict]:
|
|||
return last_msg
|
||||
|
||||
|
||||
def iterate_messages_reverse(filepath: Path):
|
||||
def iterate_messages_reverse(filepath: Path) -> Generator[LogDict, None, None]:
|
||||
"""Get the latest messages from the log file, one at a time.
|
||||
|
||||
:param fd: the file descriptor
|
||||
|
@ -397,7 +411,7 @@ def _get_lines_from_fd(fd: IO[Any], nb: int = 10) -> List[str]:
|
|||
return lines
|
||||
|
||||
|
||||
def parse_log_lines(lines: List[str], jid: str = '') -> List[Dict[str, Any]]:
|
||||
def parse_log_lines(lines: List[str], jid: str = '') -> List[LogDict]:
|
||||
"""
|
||||
Parse raw log lines into poezio log objects
|
||||
|
||||
|
@ -420,11 +434,11 @@ def parse_log_lines(lines: List[str], jid: str = '') -> List[Dict[str, Any]]:
|
|||
log.debug('wrong log format? %s', log_item)
|
||||
continue
|
||||
message_lines = []
|
||||
message = {
|
||||
message = LogDict({
|
||||
'history': True,
|
||||
'time': common.get_local_time(log_item.time),
|
||||
'type': 'message',
|
||||
}
|
||||
})
|
||||
size = log_item.nb_lines
|
||||
if isinstance(log_item, LogInfo):
|
||||
message_lines.append(log_item.text)
|
||||
|
|
Loading…
Reference in a new issue