Use Rayon when parsing index

This commit is contained in:
Joakim Soderlund 2019-09-07 17:36:00 +02:00
parent a2d140f815
commit 8e59aff73d

View file

@ -4,6 +4,7 @@ use std::io::BufRead;
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::thread::spawn; use std::thread::spawn;
use rayon::prelude::*;
use serde::de::Error; use serde::de::Error;
use serde_json::error::Result; use serde_json::error::Result;
use serde_json::from_str; use serde_json::from_str;
@ -13,6 +14,8 @@ use super::story::Story;
const TRIM: &[char] = &['"', ',', ' ', '\t', '\n', '\r']; const TRIM: &[char] = &['"', ',', ' ', '\t', '\n', '\r'];
pub fn parse(reader: impl BufRead) -> Result<Vec<Story>> { pub fn parse(reader: impl BufRead) -> Result<Vec<Story>> {
let mut wrappers = String::with_capacity(2);
let (tx, rx) = channel(); let (tx, rx) = channel();
let rx = spawn_parser(rx); let rx = spawn_parser(rx);
@ -21,6 +24,11 @@ pub fn parse(reader: impl BufRead) -> Result<Vec<Story>> {
_ => Error::custom("Could not read line"), _ => Error::custom("Could not read line"),
})?; })?;
if line.len() == 1 {
wrappers.push_str(&line);
continue;
}
if tx.send(line).is_ok() { if tx.send(line).is_ok() {
continue; continue;
} }
@ -34,6 +42,10 @@ pub fn parse(reader: impl BufRead) -> Result<Vec<Story>> {
drop(tx); drop(tx);
if wrappers != "{}" {
return Err(Error::custom("Invalid file structure"));
}
rx.recv().map_err(|e| match e { rx.recv().map_err(|e| match e {
_ => Error::custom("Missing parser result"), _ => Error::custom("Missing parser result"),
})? })?
@ -43,20 +55,13 @@ fn spawn_parser(stream: Receiver<String>) -> Receiver<Result<Vec<Story>>> {
let (tx, rx) = channel(); let (tx, rx) = channel();
spawn(move || { spawn(move || {
let mut stories = Vec::with_capacity(250_000); let bridge = stream.into_iter().par_bridge();
let mut wrappers = String::with_capacity(2); let result = bridge.map(deserialize).collect();
while let Ok(line) = stream.recv() { let mut stories: Vec<Story> = match result {
if line.len() == 1 {
wrappers.push_str(&line);
continue;
}
match deserialize(line) {
Ok(story) => stories.push(story),
Err(e) => return tx.send(Err(e)), Err(e) => return tx.send(Err(e)),
Ok(stories) => stories,
}; };
}
let count = stories.len(); let count = stories.len();
@ -64,10 +69,6 @@ fn spawn_parser(stream: Receiver<String>) -> Receiver<Result<Vec<Story>>> {
stories.dedup_by_key(|story| story.id); stories.dedup_by_key(|story| story.id);
stories.shrink_to_fit(); stories.shrink_to_fit();
if wrappers != "{}" {
return tx.send(Err(Error::custom("Invalid file structure")));
}
if count != stories.len() { if count != stories.len() {
return tx.send(Err(Error::custom("Found duplicate story"))); return tx.send(Err(Error::custom("Found duplicate story")));
} }