diff --git a/src/parser.rs b/src/parser.rs index d403d2a..99a6867 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -22,33 +22,60 @@ pub fn parse(reader: impl BufRead) -> Result> { _ => SourceError("Could not read index line."), })?; - if 3 < line.len() { - tx.send(line).unwrap(); + if tx.send(line).is_ok() { + continue; } + + return Err(match rx.recv() { + Err(_) => SourceError("Parser disappeared unexpectedly."), + Ok(Ok(_)) => SourceError("Parser returned unexpectedly."), + Ok(Err(error)) => error, + }); } drop(tx); - rx.recv().unwrap() + rx.recv().map_err(|e| match e { + _ => SourceError("Missing parser result."), + })? } fn spawn_parser(stream: Receiver) -> Receiver>> { + use Error::*; + let (tx, rx) = channel(); spawn(move || { let mut stories = Vec::with_capacity(250_000); + let mut wrappers = String::with_capacity(2); while let Ok(line) = stream.recv() { + if line.len() == 1 { + wrappers.push_str(&line); + continue; + } + match deserialize(line) { Ok(story) => stories.push(story), - Err(e) => return tx.send(Err(e)).unwrap(), + Err(e) => return tx.send(Err(e)), }; } - stories.shrink_to_fit(); - stories.sort_by_key(|story| story.id); + let count = stories.len(); - tx.send(Ok(stories)).unwrap(); + stories.sort_by_key(|story| story.id); + stories.dedup_by_key(|story| story.id); + stories.shrink_to_fit(); + + if wrappers != "{}" { + return tx.send(Err(SourceError("Invalid index structure."))); + } + + if count != stories.len() { + return tx.send(Err(SourceError("Index contains duplicates."))); + } + + tx.send(Ok(stories)) }); rx