from __future__ import annotations import asyncio import uuid from collections import OrderedDict from contextlib import asynccontextmanager from datetime import datetime from secrets import token_bytes from urllib.parse import urljoin import aiohttp from argparse import ArgumentParser, Namespace from typing import Any, Tuple, Sequence, Awaitable, Dict, Mapping, Optional, AsyncIterator, FrozenSet, Iterable, \ AsyncIterable, Final from pymap.backend.mailbox import MailboxSetInterface, MailboxDataInterface, MailboxDataT_co from pymap.backend.session import BaseSession from pymap.concurrent import ReadWriteLock, Event from pymap.context import subsystem from pymap.exceptions import AuthorizationFailure, UserNotFound, NotAllowedError, NotSupportedError, MailboxNotFound, \ MailboxError from pymap.flags import FlagOp from pymap.interfaces.message import CachedMessage, MessageT_co, LoadedMessageInterface from pymap.listtree import ListTree from pymap.mailbox import MailboxSnapshot from pymap.message import BaseMessage from pymap.mime import MessageContent from pymap.parsing.message import AppendMessage from pymap.parsing.specials import ObjectId, Flag, SequenceSet, FetchRequirement from pymap.selected import SelectedSet, SelectedMailbox from pymap.user import UserMetadata from pysasl.hashing import Cleartext from pymap.backend import backends from pymap.interfaces.backend import BackendInterface, ServiceInterface from pymap.interfaces.login import LoginInterface, IdentityInterface from pymap.config import IMAPConfig, BackendCapability from pymap.health import HealthStatus from pymap.interfaces.token import TokensInterface from pymap.token import AllTokens __all__ = ['MinifluxBackend', 'Config'] from pysasl import AuthenticationCredentials class MinifluxBackend(BackendInterface): def __init__(self, login: Login, config: Config) -> None: super().__init__() self._login = login self._config = config self._status = HealthStatus(True) @property def login(self) -> Login: return self._login @property def config(self) -> Config: return self._config @property def status(self) -> HealthStatus: return self._status @classmethod def add_subparser(cls, name: str, subparsers: Any) -> ArgumentParser: parser: ArgumentParser = subparsers.add_parser(name, help='Miniflux') parser.add_argument("server", help="Miniflux API endpoint") return parser @classmethod async def init(cls, args: Namespace, **overrides: Any) \ -> Tuple[MinifluxBackend, Config]: config = Config.from_args(args, **overrides) login = Login(config) return cls(login, config), config async def start(self, services: Sequence[ServiceInterface]) -> Awaitable: tasks = [await service.start() for service in services] return asyncio.gather(*tasks) class Config(IMAPConfig): def __init__(self, args: Namespace, *, server: str, **extra: Any) -> None: super().__init__(args, hash_context=Cleartext(), admin_key=token_bytes(), **extra) self.server = server @property def backend_capability(self) -> BackendCapability: return BackendCapability(idle=False, multi_append=False, object_id=False) @classmethod def parse_args(cls, args: Namespace) -> Mapping[str, Any]: return {**super().parse_args(args), 'server': args.server} class Login(LoginInterface): def __init__(self, config: Config) -> None: super().__init__() self.config = config # self.users_dict = {config.demo_user: UserMetadata( # config, password=config.demo_password)} self.tokens_dict: Dict[str, Tuple[str, bytes]] = {} self._tokens = AllTokens() @property def tokens(self) -> TokensInterface: return self._tokens async def authenticate(self, credentials: AuthenticationCredentials) -> IdentityInterface: session = aiohttp.ClientSession(auth=aiohttp.BasicAuth(credentials.identity, credentials.secret)) async with session.get(urljoin(self.config.server, '/v1/me')) as resp: print(resp.status) profile = await resp.json() print(profile) if resp.status == 200: return Identity(credentials.identity, session, profile, self) raise AuthorizationFailure() class Identity(IdentityInterface): def __init__(self, username, session, profile, login): self.username = username self.session = session self.profile = profile self.login = login self.config = login.config @property def name(self): return self.username async def new_token(self, *, expiration: datetime = None) -> Optional[str]: token_id = uuid.uuid4().hex token_key = token_bytes() self.login.tokens_dict[token_id] = (self.name, token_key) return self.login.tokens.get_login_token( token_id, token_key, expiration=expiration) @asynccontextmanager async def new_session(self) -> AsyncIterator[Session]: identity = self.name config = self.config session = self.session _ = await self.get() yield Session(identity, session, config) async def get(self) -> UserMetadata: return UserMetadata(self.config) async def set(self, data: UserMetadata) -> None: return None async def delete(self) -> None: print("Deleting session", self.session) self.session.close() class Session(BaseSession): """The session implementation for the dict backend.""" def __init__(self, owner: str, session: aiohttp.ClientSession, config: Config) -> None: super().__init__(owner) self._config = config self._session = session self._mailbox_set = MailboxSet(session, config) @property def config(self) -> Config: return self._config @property def mailbox_set(self) -> MailboxSet: return self._mailbox_set class Message(BaseMessage): def __init__(self, uid: int, internal_date: datetime, permanent_flags: Iterable[Flag], *, expunged: bool = False, email_id: ObjectId = None, thread_id: ObjectId = None, recent: bool = False, content: MessageContent = None) -> None: super().__init__(uid, internal_date, permanent_flags, expunged=expunged, email_id=email_id, thread_id=thread_id) self._uid = uid self._expunged = expunged self._internal_date = internal_date self._recent = recent self._content = content @property def uid(self) -> int: return self._uid @uid.setter def uid(self, value): self._uid = value @property def expunged(self) -> bool: return self._expunged @expunged.setter def expunged(self, value): self._expunged = value @property def internal_date(self) -> datetime: return self._internal_date @internal_date.setter def internal_date(self, value): self._internal_date = value async def load_content(self, requirement: FetchRequirement) -> LoadedMessageInterface: pass class MailboxData(MailboxDataInterface[Message]): def __init__(self, name, feed, session, config): self._session: aiohttp.ClientSession = session self._config = config self._name = name self._feed = feed self._mailbox_id = ObjectId.random_mailbox_id() self._readonly = False self._updated = subsystem.get().new_event() self._messages_lock = subsystem.get().new_rwlock() self._selected_set = SelectedSet() self._uid_validity = MailboxSnapshot.new_uid_validity() #self._max_uid = 100 #self._mod_sequences = _ModSequenceMapping() self._messages: Dict[int, Message] = OrderedDict() @property def mailbox_id(self) -> ObjectId: return self._mailbox_id @property def readonly(self) -> bool: return self._readonly @property def uid_validity(self) -> int: return self._uid_validity @property def messages_lock(self) -> ReadWriteLock: return self._messages_lock @property def selected_set(self) -> SelectedSet: return self._selected_set async def update_selected(self, selected: SelectedMailbox, *, wait_on: Event = None) -> SelectedMailbox: print("update selected") messages = [] for id, entry in self._messages.items(): messages.append(Message( uid=id, internal_date=datetime.fromisoformat(entry['published_at']), permanent_flags=[], )) selected.add_updates(messages, []) return selected async def get(self, uid: int, cached_msg: CachedMessage) -> Message: print("get message", uid, cached_msg) return cached_msg #raise MailboxError(mailbox=self._name, message="Not implemented") async def append(self, append_msg: AppendMessage, *, recent: bool = False) -> Message: raise NotSupportedError() async def copy(self, uid: int, destination: MailboxData, *, recent: bool = False) -> Optional[int]: raise NotSupportedError() async def move(self: MailboxData, uid: int, destination: MailboxData, *, recent: bool = False) -> Optional[int]: raise NotSupportedError() async def update(self, uid: int, cached_msg: CachedMessage, flag_set: FrozenSet[Flag], mode: FlagOp) -> Message: raise NotSupportedError() async def delete(self, uids: Iterable[int]) -> None: raise NotSupportedError() async def claim_recent(self, selected: SelectedMailbox) -> None: print("claim recent") async def cleanup(self) -> None: pass async def messages(self): url_fragmet = f'/v1/feeds/{self._feed["id"]}/entries' async with self._session.get(urljoin(self._config.server, url_fragmet)) as resp: if resp.status == 200: data = await resp.json() for entry in data["entries"]: self._messages[entry["id"]] = entry yield entry async def snapshot(self) -> MailboxSnapshot: print("snapshot") exists = 0 recent = 0 unseen = 0 first_unseen: Optional[int] = None async for msg in self.messages(): exists += 1 next_uid = exists + 1 return MailboxSnapshot(self.mailbox_id, self.readonly, self.uid_validity, self.permanent_flags, self.session_flags, exists, recent, unseen, first_unseen, next_uid) class MailboxSet(MailboxSetInterface[MailboxData]): def __init__(self, session: aiohttp.ClientSession, config: Config) -> None: super().__init__() self.session = session self.config = config self._feeds = {} @property def delimiter(self): return '/' async def set_subscribed(self, name: str, subscribed: bool) -> None: print("set_subscribed", name, subscribed) raise NotSupportedError() async def list_subscribed(self) -> ListTree: """Tell client that we are subscribed to all mailboxes""" return await self.list_mailboxes() async def list_mailboxes(self) -> ListTree: list_tree = ListTree(delimiter=self.delimiter) async with self.session.get(urljoin(self.config.server, '/v1/feeds')) as resp: if resp.status == 200: feeds = {} data = await resp.json() for feed in data: name = feed['category']['title']+'/'+feed['title'] feeds[name] = feed list_tree.update(name) self._feeds = feeds #print(list(list_tree.list())) #list_tree.update('INBOX') return list_tree async def get_mailbox(self, name: str) -> MailboxData: try: return MailboxData(name, self._feeds[name], self.session, self.config) except KeyError: raise MailboxNotFound(name) async def add_mailbox(self, name: str) -> ObjectId: raise NotSupportedError() async def delete_mailbox(self, name: str) -> None: raise NotSupportedError() async def rename_mailbox(self, before: str, after: str) -> None: raise NotSupportedError() backends.add("miniflux", MinifluxBackend)