From 9dd10b943cb4b71d4123eb739993f8667374277a Mon Sep 17 00:00:00 2001 From: Joakim Soderlund Date: Wed, 13 Mar 2019 18:37:32 +0100 Subject: [PATCH] Add concurrent index parser to Fimfarchive fetcher --- fimfarchive/fetchers/fimfarchive.py | 147 +++++++++++++++++++--------- 1 file changed, 100 insertions(+), 47 deletions(-) diff --git a/fimfarchive/fetchers/fimfarchive.py b/fimfarchive/fetchers/fimfarchive.py index b704696..bd7464e 100644 --- a/fimfarchive/fetchers/fimfarchive.py +++ b/fimfarchive/fetchers/fimfarchive.py @@ -23,8 +23,13 @@ Fimfarchive fetcher. import json +import marshal from io import BufferedReader -from typing import Any, Dict, IO, Iterator, Optional, Tuple, Union +from multiprocessing import Pool +from typing import ( + cast, Any, Callable, Dict, IO, Iterator, + Mapping, Optional, Tuple, Union, +) from zipfile import ZipFile, BadZipFile from jmespath import compile as jmes @@ -47,6 +52,96 @@ PATH = jmes('archive.path || path') compress, decompress = find_compressor() +serialize = cast(Callable[[Dict[str, Any]], bytes], marshal.dumps) +deserialize = cast(Callable[[bytes], Dict[str, Any]], marshal.loads) + + +class Index(Mapping[int, Dict[str, Any]]): + """ + Mapping from story key to meta. + """ + + def close(self) -> None: + """ + Closes the index, if necessary. + """ + + def load(self, source: IO[bytes]) -> Iterator[Tuple[int, bytes]]: + """ + Yields index items from a byte stream. + + Args: + source: The stream to read from. + + Returns: + An iterable over index items. + + Raises: + StorySourceError: If the stream is malformed. + """ + reader = BufferedReader(source, BUFFER_SIZE) # type: ignore + + with Pool() as pool: + parts = (part for part in reader if 2 < len(part)) + mapper = pool.imap(self.parse, parts, 1024) + + for key, value in mapper: + if key < 0: + raise StorySourceError(value.decode()) + else: + yield key, value + + @staticmethod + def parse(pair: bytes) -> Tuple[int, bytes]: + """ + Converts a JSON key-value pair to Marshal. + + Args: + pair: JSON key-value pair. + + Returns: + Marshal key-value pair. + """ + try: + key, meta = pair.split(b':', 1) + key = key.strip(b'\n\r\t "') + meta = meta.strip(b'\n\r\t ,') + except Exception as e: + return -1, f"Unknown parser error: {e}".encode() + + if meta[0] != 123 or meta[-1] != 125: + return -1, f"Malformed JSON object for {key}.".encode() + + try: + return int(key), serialize(json.loads(meta.decode())) + except UnicodeDecodeError as e: + return -1, f"Incorrectly encoded index: {e}".encode() + except ValueError as e: + return -1, f"Malformed meta for {key}: {e}".encode() + + +class MemoryIndex(Index): + """ + In-memory mapping from story key to meta. + """ + + def __init__(self, stream: IO[bytes]) -> None: + self.data = {k: compress(v) for k, v in self.load(stream)} + + def __getitem__(self, key: int) -> Dict[str, Any]: + return deserialize(decompress(self.data[key])) + + def __contains__(self, item) -> bool: + return item in self.data + + def __iter__(self) -> Iterator[int]: + return iter(self.data) + + def __len__(self) -> int: + return len(self.data) + + def close(self): + self.data.close() class FimfarchiveFetcher(Fetcher): @@ -73,7 +168,7 @@ class FimfarchiveFetcher(Fetcher): StorySourceError: If no valid Fimfarchive release can be loaded. """ self.archive: ZipFile - self.index: Dict[int, bytes] + self.index: Index self.paths: Dict[int, str] self.is_open: bool = False @@ -102,8 +197,7 @@ class FimfarchiveFetcher(Fetcher): try: with self.archive.open('index.json') as fobj: - reader = BufferedReader(fobj, BUFFER_SIZE) # type: ignore - self.index = dict(self.load_index(reader)) + self.index = MemoryIndex(fobj) except KeyError as e: raise StorySourceError("Archive is missing the index.") from e except BadZipFile as e: @@ -112,39 +206,6 @@ class FimfarchiveFetcher(Fetcher): self.paths = dict() self.is_open = True - def load_index( - self, - source: Iterator[bytes], - ) -> Iterator[Tuple[int, bytes]]: - """ - Yields unparsed index items from a byte stream. - - Args: - source: The stream to read from. - - Returns: - An iterable over index items. - - Raises: - StorySourceError: If an item is malformed. - """ - for part in source: - if len(part) < 3: - continue - - line = part.strip() - key, meta = line.split(b':', 1) - key = key.strip(b' "') - meta = meta.strip(b' ,') - - if meta[0] != 123 or meta[-1] != 125: - raise StorySourceError(f"Malformed index meta: {key}") - - try: - yield int(key), compress(meta) - except ValueError as e: - raise StorySourceError(f"Malformed index key: {key}") from e - def __len__(self) -> int: """ Returns the total number of stories in the archive. @@ -217,22 +278,14 @@ class FimfarchiveFetcher(Fetcher): self.archive.close() if hasattr(self, 'index'): - self.index.clear() + self.index.close() if hasattr(self, 'paths'): self.paths.clear() def fetch_meta(self, key: int) -> Dict[str, Any]: key = self.validate(key) - raw = self.index[key] - - try: - meta = json.loads(decompress(raw).decode()) - except UnicodeDecodeError as e: - raise StorySourceError("Incorrectly encoded index.") from e - except ValueError as e: - raise StorySourceError(f"Malformed meta for {key}: {raw}") from e - + meta = self.index[key] actual = meta.get('id') if key != actual: