From 1becc7b0e311f804cb03090e9430f570c40e8beb Mon Sep 17 00:00:00 2001 From: vandomej Date: Wed, 13 Oct 2021 10:59:29 -0700 Subject: [PATCH] Adding threading to file_linked operations --- file_linked/src/lib.rs | 66 ++++++++++++++++++++++++++++-------------- gemla/src/bin/bin.rs | 2 +- gemla/src/core/mod.rs | 59 ++++++++++++++++++++++++------------- 3 files changed, 85 insertions(+), 42 deletions(-) diff --git a/file_linked/src/lib.rs b/file_linked/src/lib.rs index debe7e3..afdec40 100644 --- a/file_linked/src/lib.rs +++ b/file_linked/src/lib.rs @@ -9,6 +9,7 @@ use serde::de::DeserializeOwned; use serde::Serialize; use std::fs::{copy, remove_file, File}; use std::io::ErrorKind; +use std::io::Write; use std::path::{Path, PathBuf}; /// A wrapper around an object `T` that ties the object to a physical file @@ -20,6 +21,19 @@ where val: T, path: PathBuf, temp_file_path: PathBuf, + file_thread: Option>, +} + +impl Drop for FileLinked +where T: Serialize +{ + fn drop(&mut self) { + if self.file_thread.is_some() + { + let file_thread = self.file_thread.take(); + file_thread.unwrap().join().expect("Error cleaning up file thread for file_linked object"); + } + } } impl FileLinked @@ -108,44 +122,52 @@ where .ok_or_else(|| anyhow!("Unable to get filename for tempfile {}", path.display()))? )); - let result = FileLinked { + let mut result = FileLinked { val, path: path.to_path_buf(), temp_file_path, + file_thread: None, }; result.write_data()?; Ok(result) } - fn write_data(&self) -> Result<(), Error> { + fn write_data(&mut self) -> Result<(), Error> { + let thread_path = self.path.clone(); + let thread_temp_path = self.temp_file_path.clone(); + let thread_val = bincode::serialize(&self.val) + .with_context(|| "Unable to serialize object into bincode".to_string())?; + if self.file_thread.is_some() { + let file_thread = self.file_thread.take(); + file_thread.unwrap().join().expect("Unable to join thread"); + } + match File::open(&self.path) { Ok(_) => { - copy(&self.path, &self.temp_file_path).with_context(|| { - format!( - "Unable to copy temp file from {} to {}", - self.path.display(), - self.temp_file_path.display() - ) - })?; - let file = File::create(&self.path)?; + let handle = std::thread::spawn(move || { + copy(&thread_path, &thread_temp_path).expect("Error copying temp file"); - bincode::serialize_into(file, &self.val) - .with_context(|| format!("Unable to write to file {}", self.path.display()))?; + let mut file = File::create(&thread_path).expect("Error creating file handle"); - remove_file(&self.temp_file_path).with_context(|| { - format!( - "Unable to remove temp file {}", - self.temp_file_path.display() - ) - })?; + file.write_all(thread_val.as_slice()) + .expect("Failed to write data to file"); + + remove_file(&thread_temp_path).expect("Error removing temp file"); + }); + + self.file_thread = Some(handle); } Err(error) if error.kind() == ErrorKind::NotFound => { - let file = File::create(&self.path)?; + let handle = std::thread::spawn(move || { + let mut file = File::create(&thread_path).expect("Error creating file handle"); - bincode::serialize_into(file, &self.val) - .with_context(|| format!("Unable to write to file {}", self.path.display()))?; + file.write_all(thread_val.as_slice()) + .expect("Failed to write data to file"); + }); + + self.file_thread = Some(handle); } Err(error) => return Err(Error::IO(error)), } @@ -324,6 +346,7 @@ where val, path: path.to_path_buf(), temp_file_path, + file_thread: None, }), Err(err) => { info!( @@ -341,6 +364,7 @@ where val, path: path.to_path_buf(), temp_file_path, + file_thread: None, }) } } diff --git a/gemla/src/bin/bin.rs b/gemla/src/bin/bin.rs index 90df1bd..d670a6c 100644 --- a/gemla/src/bin/bin.rs +++ b/gemla/src/bin/bin.rs @@ -35,7 +35,7 @@ fn main() -> anyhow::Result<()> { &PathBuf::from(file_path), GemlaConfig { generations_per_node: 10, - overwrite: true, + overwrite: false, }, ))?; diff --git a/gemla/src/core/mod.rs b/gemla/src/core/mod.rs index c8947f7..aea4c7b 100644 --- a/gemla/src/core/mod.rs +++ b/gemla/src/core/mod.rs @@ -72,10 +72,15 @@ where self.data.readonly().0.as_ref().unwrap().height() ); - self.data - .mutate(|(d, _c)| Gemla::process_tree(d.as_mut().unwrap()))??; + loop { + if Gemla::tree_processed(self.data.readonly().0.as_ref().unwrap())? { + info!("Processed tree"); + break; + } - info!("Processed tree"); + self.data + .mutate(|(d, _)| Gemla::process_tree(d.as_mut().unwrap()))??; + } Ok(()) } @@ -110,13 +115,31 @@ where Ok(()) } + fn tree_processed(tree: &SimulationTree) -> Result { + if tree.val.state() == &GeneticState::Finish { + match (&tree.left, &tree.right) { + (Some(l), Some(r)) => Ok(Gemla::tree_processed(l)? && Gemla::tree_processed(r)?), + (None, None) => Ok(true), + _ => Err(Error::Other(anyhow!("unable to process tree {:?}", tree))), + } + } else { + Ok(false) + } + } + fn process_tree(tree: &mut SimulationTree) -> Result<(), Error> { if tree.val.state() == &GeneticState::Initialize { match (&mut tree.left, &mut tree.right) { - (Some(l), Some(r)) => { + (Some(l), _) if l.val.state() != &GeneticState::Finish => { Gemla::process_tree(&mut (*l))?; + } + (_, Some(r)) if r.val.state() != &GeneticState::Finish => { Gemla::process_tree(&mut (*r))?; - + } + (Some(l), Some(r)) + if r.val.state() == &GeneticState::Finish + && l.val.state() == &GeneticState::Finish => + { let left_node = (*l).val.node.as_ref().unwrap(); let right_node = (*r).val.node.as_ref().unwrap(); let merged_node = GeneticNode::merge(left_node, right_node)?; @@ -131,7 +154,7 @@ where return Err(Error::Other(anyhow!("unable to process tree {:?}", tree))); } } - } else { + } else if tree.val.state() != &GeneticState::Finish { Gemla::process_node(&mut tree.val)?; } @@ -139,25 +162,21 @@ where } fn process_node(node: &mut GeneticNodeWrapper) -> Result<(), Error> { - let node_time = Instant::now(); + let node_state_time = Instant::now(); + let node_state = *node.state(); - loop { - let node_state_time = Instant::now(); - let node_state = node.state().clone(); + node.process_node()?; - if node.process_node()? == GeneticState::Finish { - break; - } + trace!( + "{:?} completed in {:?} for", + node_state, + node_state_time.elapsed() + ); - trace!( - "{:?} completed in {:?}", - node_state, - node_state_time.elapsed() - ); + if node.state() == &GeneticState::Finish { + info!("Processed node"); } - info!("Processed node in {:?}", node_time.elapsed()); - Ok(()) } }