From 0d1ae173a043813b9b93dcab423e4544cf0ead88 Mon Sep 17 00:00:00 2001 From: Joakim Soderlund Date: Sun, 23 Jun 2019 14:58:01 +0200 Subject: [PATCH] Add Fimfarchive writer --- fimfarchive/writers.py | 106 ++++++++++++++++++++++++++++++++++++++- tests/test_writers.py | 111 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 213 insertions(+), 4 deletions(-) diff --git a/fimfarchive/writers.py b/fimfarchive/writers.py index ee06789..9ae6c43 100644 --- a/fimfarchive/writers.py +++ b/fimfarchive/writers.py @@ -23,10 +23,15 @@ Writers for Fimfarchive. import json +from copy import deepcopy from pathlib import Path -from typing import Callable, Union +from typing import Callable, Iterable, Tuple, Union +from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED -from fimfarchive.mappers import StaticMapper, StoryPathMapper +from fimfarchive.mappers import ( + DataFormatMapper, StaticMapper, StoryPathMapper, StorySlugMapper, +) +from fimfarchive.stampers import FlavorStamper, PathStamper from fimfarchive.stories import Story @@ -57,6 +62,17 @@ class Writer(): """ raise NotImplementedError() + def close(self) -> None: + """ + Finalizes writes and closes files. + """ + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + class DirectoryWriter(Writer): """ @@ -190,3 +206,89 @@ class DirectoryWriter(Writer): if data_target is not None: data_path = Path(data_target).resolve() self.write_data(story, data_path) + + +class FimfarchiveWriter(Writer): + """ + Writes stories to a ZIP-file. + """ + + def __init__( + self, + path: Union[Path, str], + extras: Iterable[Tuple[str, bytes]] = (), + ) -> None: + """ + Constructor. + + Args: + path: Output path for the archive. + extras: Extra names and data to add. + """ + archive_path = Path(path).resolve(False) + index_path = archive_path.with_suffix('.json') + + if archive_path.suffix != '.zip': + raise ValueError(f"Path '{archive_path}' needs zip suffix.") + + if archive_path.exists(): + raise ValueError(f"Path '{archive_path}' already exists.") + + if index_path.exists(): + raise ValueError(f"Path '{index_path}' already exists.") + + self.index_path = index_path + self.archive_path = archive_path + self.extras = extras + + self.stamp_format = FlavorStamper(DataFormatMapper()) + self.stamp_path = PathStamper(StorySlugMapper()) + + index_path.parent.mkdir(parents=True, exist_ok=True) + archive_path.parent.mkdir(parents=True, exist_ok=True) + + self.index = index_path.open('wt', encoding='utf8') + self.archive = ZipFile(archive_path, 'w', ZIP_STORED) + + self.index.write('{\n') + self.open = True + + def write(self, story: Story) -> None: + if not self.open: + raise ValueError("Writer is closed.") + + if story.key != story.meta['id']: + raise ValueError("Invalid story key.") + + story = story.merge(meta=deepcopy(story.meta)) + + self.stamp_format(story) + self.stamp_path(story) + + path = story.meta['archive']['path'] + meta = json.dumps(story.meta, ensure_ascii=False, sort_keys=True) + line = f'"{story.key}": {meta},\n' + + self.index.write(line) + self.archive.writestr(path, story.data, ZIP_STORED) + + def close(self) -> None: + if not self.open: + return + + self.open = False + + if 2 < self.index.tell(): + self.index.seek(self.index.tell() - 2) + + self.index.write('\n}\n') + self.index.close() + + for name, data in self.extras: + self.archive.writestr(name, data, ZIP_DEFLATED) + + self.archive.write(self.index_path, 'index.json', ZIP_DEFLATED) + self.archive.close() + + del self.index + del self.archive diff --git a/tests/test_writers.py b/tests/test_writers.py index 2b38e69..8f19fbe 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -24,12 +24,18 @@ Writer tests. import json import os +from functools import partial +from io import BytesIO from pathlib import Path +from zipfile import ZipFile import pytest -from fimfarchive.mappers import StoryPathMapper -from fimfarchive.writers import DirectoryWriter +from fimfarchive.flavors import DataFormat +from fimfarchive.mappers import StoryPathMapper, StorySlugMapper +from fimfarchive.stampers import PathStamper +from fimfarchive.stories import Story +from fimfarchive.writers import DirectoryWriter, FimfarchiveWriter class TestDirectoryWriter: @@ -159,3 +165,104 @@ class TestDirectoryWriter: """ writer = DirectoryWriter() writer.check_directory(Path('key')) + + +class TestFimfarchiveWriter: + """ + FimfarchiveWriter tests. + """ + + def story(self, key, title, author, name) -> Story: + """ + Returns a dummy story for writing. + """ + stream = BytesIO() + + with ZipFile(stream, 'w') as zobj: + zobj.writestr('text', "Story {key}") + + meta = { + 'id': key, + 'title': title, + 'author': { + 'id': author, + 'name': name, + }, + } + + return Story( + key=key, + fetcher=None, + meta=meta, + data=stream.getvalue(), + flavors=[DataFormat.EPUB], + ) + + @pytest.fixture + def stories(self): + """ + Returns a collection of stories to write. + """ + return ( + self.story(32, "Floof", 48, "Floofer"), + self.story(64, "Poof", 80, "Poofer"), + ) + + @pytest.fixture + def extras(self): + """ + Returns extra data to write. + """ + return ( + ('about.json', b'about'), + ('readme.pdf', b'readme'), + ) + + @pytest.fixture + def archive(self, tmpdir, stories, extras): + """ + Returns an archive as a ZipFile instance. + """ + archive = Path(tmpdir) / 'archive.zip' + + with FimfarchiveWriter(archive, extras) as writer: + for story in stories: + writer.write(story) + + return ZipFile(BytesIO(archive.read_bytes())) + + def test_meta(self, stories, archive): + """ + Tests index looks as expected. + """ + stamp = PathStamper(StorySlugMapper()) + + for story in stories: + stamp(story) + + dumps = partial(json.dumps, ensure_ascii=False, sort_keys=True) + first, second = tuple(dumps(story.meta) for story in stories) + raw = f'{{\n"32": {first},\n"64": {second}\n}}\n' + + assert json.loads(archive.read('index.json').decode()) + assert raw.encode() == archive.read('index.json') + + def test_data(self, stories, archive): + """ + Tests archive includes story data. + """ + index = json.loads(archive.read('index.json').decode()) + + for story in stories: + data = story.data + meta = index[str(story.key)] + path = meta['archive']['path'] + + assert data == archive.read(path) + + def test_extras(self, extras, archive): + """ + Tests archive includes extras. + """ + for name, data in extras: + assert data == archive.read(name)