Add Fimfarchive writer

This commit is contained in:
Joakim Soderlund 2019-06-23 14:58:01 +02:00
parent 9ca1872343
commit 0d1ae173a0
2 changed files with 213 additions and 4 deletions

View file

@ -23,10 +23,15 @@ Writers for Fimfarchive.
import json
from copy import deepcopy
from pathlib import Path
from typing import Callable, Union
from typing import Callable, Iterable, Tuple, Union
from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED
from fimfarchive.mappers import StaticMapper, StoryPathMapper
from fimfarchive.mappers import (
DataFormatMapper, StaticMapper, StoryPathMapper, StorySlugMapper,
)
from fimfarchive.stampers import FlavorStamper, PathStamper
from fimfarchive.stories import Story
@ -57,6 +62,17 @@ class Writer():
"""
raise NotImplementedError()
def close(self) -> None:
"""
Finalizes writes and closes files.
"""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
class DirectoryWriter(Writer):
"""
@ -190,3 +206,89 @@ class DirectoryWriter(Writer):
if data_target is not None:
data_path = Path(data_target).resolve()
self.write_data(story, data_path)
class FimfarchiveWriter(Writer):
"""
Writes stories to a ZIP-file.
"""
def __init__(
self,
path: Union[Path, str],
extras: Iterable[Tuple[str, bytes]] = (),
) -> None:
"""
Constructor.
Args:
path: Output path for the archive.
extras: Extra names and data to add.
"""
archive_path = Path(path).resolve(False)
index_path = archive_path.with_suffix('.json')
if archive_path.suffix != '.zip':
raise ValueError(f"Path '{archive_path}' needs zip suffix.")
if archive_path.exists():
raise ValueError(f"Path '{archive_path}' already exists.")
if index_path.exists():
raise ValueError(f"Path '{index_path}' already exists.")
self.index_path = index_path
self.archive_path = archive_path
self.extras = extras
self.stamp_format = FlavorStamper(DataFormatMapper())
self.stamp_path = PathStamper(StorySlugMapper())
index_path.parent.mkdir(parents=True, exist_ok=True)
archive_path.parent.mkdir(parents=True, exist_ok=True)
self.index = index_path.open('wt', encoding='utf8')
self.archive = ZipFile(archive_path, 'w', ZIP_STORED)
self.index.write('{\n')
self.open = True
def write(self, story: Story) -> None:
if not self.open:
raise ValueError("Writer is closed.")
if story.key != story.meta['id']:
raise ValueError("Invalid story key.")
story = story.merge(meta=deepcopy(story.meta))
self.stamp_format(story)
self.stamp_path(story)
path = story.meta['archive']['path']
meta = json.dumps(story.meta, ensure_ascii=False, sort_keys=True)
line = f'"{story.key}": {meta},\n'
self.index.write(line)
self.archive.writestr(path, story.data, ZIP_STORED)
def close(self) -> None:
if not self.open:
return
self.open = False
if 2 < self.index.tell():
self.index.seek(self.index.tell() - 2)
self.index.write('\n}\n')
self.index.close()
for name, data in self.extras:
self.archive.writestr(name, data, ZIP_DEFLATED)
self.archive.write(self.index_path, 'index.json', ZIP_DEFLATED)
self.archive.close()
del self.index
del self.archive

View file

@ -24,12 +24,18 @@ Writer tests.
import json
import os
from functools import partial
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
import pytest
from fimfarchive.mappers import StoryPathMapper
from fimfarchive.writers import DirectoryWriter
from fimfarchive.flavors import DataFormat
from fimfarchive.mappers import StoryPathMapper, StorySlugMapper
from fimfarchive.stampers import PathStamper
from fimfarchive.stories import Story
from fimfarchive.writers import DirectoryWriter, FimfarchiveWriter
class TestDirectoryWriter:
@ -159,3 +165,104 @@ class TestDirectoryWriter:
"""
writer = DirectoryWriter()
writer.check_directory(Path('key'))
class TestFimfarchiveWriter:
"""
FimfarchiveWriter tests.
"""
def story(self, key, title, author, name) -> Story:
"""
Returns a dummy story for writing.
"""
stream = BytesIO()
with ZipFile(stream, 'w') as zobj:
zobj.writestr('text', "Story {key}")
meta = {
'id': key,
'title': title,
'author': {
'id': author,
'name': name,
},
}
return Story(
key=key,
fetcher=None,
meta=meta,
data=stream.getvalue(),
flavors=[DataFormat.EPUB],
)
@pytest.fixture
def stories(self):
"""
Returns a collection of stories to write.
"""
return (
self.story(32, "Floof", 48, "Floofer"),
self.story(64, "Poof", 80, "Poofer"),
)
@pytest.fixture
def extras(self):
"""
Returns extra data to write.
"""
return (
('about.json', b'about'),
('readme.pdf', b'readme'),
)
@pytest.fixture
def archive(self, tmpdir, stories, extras):
"""
Returns an archive as a ZipFile instance.
"""
archive = Path(tmpdir) / 'archive.zip'
with FimfarchiveWriter(archive, extras) as writer:
for story in stories:
writer.write(story)
return ZipFile(BytesIO(archive.read_bytes()))
def test_meta(self, stories, archive):
"""
Tests index looks as expected.
"""
stamp = PathStamper(StorySlugMapper())
for story in stories:
stamp(story)
dumps = partial(json.dumps, ensure_ascii=False, sort_keys=True)
first, second = tuple(dumps(story.meta) for story in stories)
raw = f'{{\n"32": {first},\n"64": {second}\n}}\n'
assert json.loads(archive.read('index.json').decode())
assert raw.encode() == archive.read('index.json')
def test_data(self, stories, archive):
"""
Tests archive includes story data.
"""
index = json.loads(archive.read('index.json').decode())
for story in stories:
data = story.data
meta = index[str(story.key)]
path = meta['archive']['path']
assert data == archive.read(path)
def test_extras(self, extras, archive):
"""
Tests archive includes extras.
"""
for name, data in extras:
assert data == archive.read(name)