Add optional SQLite cache to Fimfarchive fetcher

This commit is contained in:
Joakim Soderlund 2019-09-01 15:10:14 +02:00
parent b2bd5e30bd
commit 0f9492bc0f

View file

@ -24,8 +24,10 @@ Fimfarchive fetcher.
import json import json
import marshal import marshal
import sqlite3
from io import BufferedReader from io import BufferedReader
from multiprocessing import Pool from multiprocessing import Pool
from pathlib import Path
from typing import ( from typing import (
cast, Any, Callable, Dict, IO, Iterable, Iterator, cast, Any, Callable, Dict, IO, Iterable, Iterator,
Mapping, Optional, Sized, Tuple, Union, Mapping, Optional, Sized, Tuple, Union,
@ -66,6 +68,12 @@ class Index(Mapping[int, Dict[str, Any]]):
Closes the index, if necessary. Closes the index, if necessary.
""" """
def iteritems(self) -> Iterator[Tuple[int, Dict[str, Any]]]:
"""
Special items iterator, for performance.
"""
yield from self.items()
def load(self, source: IO[bytes]) -> Iterator[Tuple[int, bytes]]: def load(self, source: IO[bytes]) -> Iterator[Tuple[int, bytes]]:
""" """
Yields index items from a byte stream. Yields index items from a byte stream.
@ -140,10 +148,58 @@ class MemoryIndex(Index):
def __len__(self) -> int: def __len__(self) -> int:
return len(self.data) return len(self.data)
def iteritems(self) -> Iterator[Tuple[int, Dict[str, Any]]]:
for key, value in self.data.items():
yield key, deserialize(decompress(value))
def close(self): def close(self):
self.data.clear() self.data.clear()
class SqliteIndex(Index):
"""
Cached mapping from key to story meta.
"""
CREATE = 'CREATE TABLE "cache" (key INT PRIMARY KEY, value BLOB)'
INSERT = 'INSERT INTO cache VALUES (?, ?)'
SELECT = 'SELECT value FROM cache WHERE key = ?'
LIST_KEYS = 'SELECT key FROM cache ORDER BY key'
LIST_ITEMS = 'SELECT key, value FROM cache ORDER BY key'
def __init__(self, name: str, stream: IO[bytes]) -> None:
if Path(name).exists():
self.db = sqlite3.connect(name)
else:
self.db = sqlite3.connect(name)
self.db.execute(self.CREATE)
self.db.executemany(self.INSERT, self.load(stream))
self.db.commit()
keys = self.db.execute(self.LIST_KEYS)
self._keys = set(row[0] for row in keys)
def __getitem__(self, key: int) -> Dict[str, Any]:
value = self.db.execute(self.SELECT, (key,))
return marshal.loads(value.fetchone()[0])
def __contains__(self, item) -> bool:
return item in self._keys
def __iter__(self) -> Iterator[int]:
return iter(sorted(self._keys))
def __len__(self) -> int:
return len(self._keys)
def iteritems(self) -> Iterator[Tuple[int, Dict[str, Any]]]:
items = self.db.execute(self.LIST_ITEMS)
return ((k, deserialize(v)) for k, v in items)
def close(self) -> None:
self.db.close()
class FimfarchiveFetcher(Iterable[Story], Sized, Fetcher): class FimfarchiveFetcher(Iterable[Story], Sized, Fetcher):
""" """
Fetcher for Fimfarchive. Fetcher for Fimfarchive.
@ -216,10 +272,12 @@ class FimfarchiveFetcher(Iterable[Story], Sized, Fetcher):
""" """
Yields all stories in the archive, ordered by ID. Yields all stories in the archive, ordered by ID.
""" """
for key in sorted(self.index.keys()): for key, meta in self.index.iteritems():
yield self.fetch(key) key = self.validate_key(key)
meta = self.validate_meta(key, meta)
yield Story(key, self, meta, None, self.flavors)
def validate(self, key: int) -> int: def validate_key(self, key: int) -> int:
""" """
Ensures that the key matches a valid story Ensures that the key matches a valid story
@ -243,6 +301,20 @@ class FimfarchiveFetcher(Iterable[Story], Sized, Fetcher):
return key return key
def validate_meta(self, key: int, meta: Dict[str, Any]) -> Dict[str, Any]:
actual = meta.get('id')
if key != actual:
raise StorySourceError(f"Invalid ID for {key}: {actual}")
try:
archive = meta.get('archive', meta)
self.paths[key] = archive['path']
except KeyError:
pass
return meta
def fetch_path(self, key: int) -> Optional[str]: def fetch_path(self, key: int) -> Optional[str]:
""" """
Fetches the archive path of a story. Fetches the archive path of a story.
@ -257,7 +329,7 @@ class FimfarchiveFetcher(Iterable[Story], Sized, Fetcher):
InvalidStoryError: If a valid story is not found. InvalidStoryError: If a valid story is not found.
StorySourceError: If the fetcher is closed. StorySourceError: If the fetcher is closed.
""" """
key = self.validate(key) key = self.validate_key(key)
path = self.paths.get(key) path = self.paths.get(key)
if path is not None: if path is not None:
@ -284,23 +356,13 @@ class FimfarchiveFetcher(Iterable[Story], Sized, Fetcher):
self.paths.clear() self.paths.clear()
def fetch_meta(self, key: int) -> Dict[str, Any]: def fetch_meta(self, key: int) -> Dict[str, Any]:
key = self.validate(key) key = self.validate_key(key)
meta = self.index[key] meta = self.validate_meta(key, self.index[key])
actual = meta.get('id')
if key != actual:
raise StorySourceError(f"Invalid ID for {key}: {actual}")
try:
archive = meta.get('archive', meta)
self.paths[key] = archive['path']
except KeyError:
pass
return meta return meta
def fetch_data(self, key: int) -> bytes: def fetch_data(self, key: int) -> bytes:
key = self.validate(key) key = self.validate_key(key)
path = self.fetch_path(key) path = self.fetch_path(key)
if not path: if not path: