diff --git a/src/archive/parser.rs b/src/archive/parser.rs index 088073d..cad15ad 100644 --- a/src/archive/parser.rs +++ b/src/archive/parser.rs @@ -4,6 +4,7 @@ use std::io::BufRead; use std::sync::mpsc::{channel, Receiver}; use std::thread::spawn; +use rayon::prelude::*; use serde::de::Error; use serde_json::error::Result; use serde_json::from_str; @@ -13,6 +14,8 @@ use super::story::Story; const TRIM: &[char] = &['"', ',', ' ', '\t', '\n', '\r']; pub fn parse(reader: impl BufRead) -> Result> { + let mut wrappers = String::with_capacity(2); + let (tx, rx) = channel(); let rx = spawn_parser(rx); @@ -21,6 +24,11 @@ pub fn parse(reader: impl BufRead) -> Result> { _ => Error::custom("Could not read line"), })?; + if line.len() == 1 { + wrappers.push_str(&line); + continue; + } + if tx.send(line).is_ok() { continue; } @@ -34,6 +42,10 @@ pub fn parse(reader: impl BufRead) -> Result> { drop(tx); + if wrappers != "{}" { + return Err(Error::custom("Invalid file structure")); + } + rx.recv().map_err(|e| match e { _ => Error::custom("Missing parser result"), })? @@ -43,20 +55,13 @@ fn spawn_parser(stream: Receiver) -> Receiver>> { let (tx, rx) = channel(); spawn(move || { - let mut stories = Vec::with_capacity(250_000); - let mut wrappers = String::with_capacity(2); + let bridge = stream.into_iter().par_bridge(); + let result = bridge.map(deserialize).collect(); - while let Ok(line) = stream.recv() { - if line.len() == 1 { - wrappers.push_str(&line); - continue; - } - - match deserialize(line) { - Ok(story) => stories.push(story), - Err(e) => return tx.send(Err(e)), - }; - } + let mut stories: Vec = match result { + Err(e) => return tx.send(Err(e)), + Ok(stories) => stories, + }; let count = stories.len(); @@ -64,10 +69,6 @@ fn spawn_parser(stream: Receiver) -> Receiver>> { stories.dedup_by_key(|story| story.id); stories.shrink_to_fit(); - if wrappers != "{}" { - return tx.send(Err(Error::custom("Invalid file structure"))); - } - if count != stories.len() { return tx.send(Err(Error::custom("Found duplicate story"))); }