Add concurrent index parser to Fimfarchive fetcher

This commit is contained in:
Joakim Soderlund 2019-03-13 18:37:32 +01:00
parent e5b321136b
commit 9dd10b943c

View file

@ -23,8 +23,13 @@ Fimfarchive fetcher.
import json
import marshal
from io import BufferedReader
from typing import Any, Dict, IO, Iterator, Optional, Tuple, Union
from multiprocessing import Pool
from typing import (
cast, Any, Callable, Dict, IO, Iterator,
Mapping, Optional, Tuple, Union,
)
from zipfile import ZipFile, BadZipFile
from jmespath import compile as jmes
@ -47,6 +52,96 @@ PATH = jmes('archive.path || path')
compress, decompress = find_compressor()
serialize = cast(Callable[[Dict[str, Any]], bytes], marshal.dumps)
deserialize = cast(Callable[[bytes], Dict[str, Any]], marshal.loads)
class Index(Mapping[int, Dict[str, Any]]):
"""
Mapping from story key to meta.
"""
def close(self) -> None:
"""
Closes the index, if necessary.
"""
def load(self, source: IO[bytes]) -> Iterator[Tuple[int, bytes]]:
"""
Yields index items from a byte stream.
Args:
source: The stream to read from.
Returns:
An iterable over index items.
Raises:
StorySourceError: If the stream is malformed.
"""
reader = BufferedReader(source, BUFFER_SIZE) # type: ignore
with Pool() as pool:
parts = (part for part in reader if 2 < len(part))
mapper = pool.imap(self.parse, parts, 1024)
for key, value in mapper:
if key < 0:
raise StorySourceError(value.decode())
else:
yield key, value
@staticmethod
def parse(pair: bytes) -> Tuple[int, bytes]:
"""
Converts a JSON key-value pair to Marshal.
Args:
pair: JSON key-value pair.
Returns:
Marshal key-value pair.
"""
try:
key, meta = pair.split(b':', 1)
key = key.strip(b'\n\r\t "')
meta = meta.strip(b'\n\r\t ,')
except Exception as e:
return -1, f"Unknown parser error: {e}".encode()
if meta[0] != 123 or meta[-1] != 125:
return -1, f"Malformed JSON object for {key}.".encode()
try:
return int(key), serialize(json.loads(meta.decode()))
except UnicodeDecodeError as e:
return -1, f"Incorrectly encoded index: {e}".encode()
except ValueError as e:
return -1, f"Malformed meta for {key}: {e}".encode()
class MemoryIndex(Index):
"""
In-memory mapping from story key to meta.
"""
def __init__(self, stream: IO[bytes]) -> None:
self.data = {k: compress(v) for k, v in self.load(stream)}
def __getitem__(self, key: int) -> Dict[str, Any]:
return deserialize(decompress(self.data[key]))
def __contains__(self, item) -> bool:
return item in self.data
def __iter__(self) -> Iterator[int]:
return iter(self.data)
def __len__(self) -> int:
return len(self.data)
def close(self):
self.data.close()
class FimfarchiveFetcher(Fetcher):
@ -73,7 +168,7 @@ class FimfarchiveFetcher(Fetcher):
StorySourceError: If no valid Fimfarchive release can be loaded.
"""
self.archive: ZipFile
self.index: Dict[int, bytes]
self.index: Index
self.paths: Dict[int, str]
self.is_open: bool = False
@ -102,8 +197,7 @@ class FimfarchiveFetcher(Fetcher):
try:
with self.archive.open('index.json') as fobj:
reader = BufferedReader(fobj, BUFFER_SIZE) # type: ignore
self.index = dict(self.load_index(reader))
self.index = MemoryIndex(fobj)
except KeyError as e:
raise StorySourceError("Archive is missing the index.") from e
except BadZipFile as e:
@ -112,39 +206,6 @@ class FimfarchiveFetcher(Fetcher):
self.paths = dict()
self.is_open = True
def load_index(
self,
source: Iterator[bytes],
) -> Iterator[Tuple[int, bytes]]:
"""
Yields unparsed index items from a byte stream.
Args:
source: The stream to read from.
Returns:
An iterable over index items.
Raises:
StorySourceError: If an item is malformed.
"""
for part in source:
if len(part) < 3:
continue
line = part.strip()
key, meta = line.split(b':', 1)
key = key.strip(b' "')
meta = meta.strip(b' ,')
if meta[0] != 123 or meta[-1] != 125:
raise StorySourceError(f"Malformed index meta: {key}")
try:
yield int(key), compress(meta)
except ValueError as e:
raise StorySourceError(f"Malformed index key: {key}") from e
def __len__(self) -> int:
"""
Returns the total number of stories in the archive.
@ -217,22 +278,14 @@ class FimfarchiveFetcher(Fetcher):
self.archive.close()
if hasattr(self, 'index'):
self.index.clear()
self.index.close()
if hasattr(self, 'paths'):
self.paths.clear()
def fetch_meta(self, key: int) -> Dict[str, Any]:
key = self.validate(key)
raw = self.index[key]
try:
meta = json.loads(decompress(raw).decode())
except UnicodeDecodeError as e:
raise StorySourceError("Incorrectly encoded index.") from e
except ValueError as e:
raise StorySourceError(f"Malformed meta for {key}: {raw}") from e
meta = self.index[key]
actual = meta.get('id')
if key != actual: