Adding threading to file_linked operations

This commit is contained in:
vandomej 2021-10-13 10:59:29 -07:00
parent 7b11578f7a
commit 1becc7b0e3
3 changed files with 85 additions and 42 deletions

View file

@ -9,6 +9,7 @@ use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::fs::{copy, remove_file, File}; use std::fs::{copy, remove_file, File};
use std::io::ErrorKind; use std::io::ErrorKind;
use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
/// A wrapper around an object `T` that ties the object to a physical file /// A wrapper around an object `T` that ties the object to a physical file
@ -20,6 +21,19 @@ where
val: T, val: T,
path: PathBuf, path: PathBuf,
temp_file_path: PathBuf, temp_file_path: PathBuf,
file_thread: Option<std::thread::JoinHandle<()>>,
}
impl<T> Drop for FileLinked<T>
where T: Serialize
{
fn drop(&mut self) {
if self.file_thread.is_some()
{
let file_thread = self.file_thread.take();
file_thread.unwrap().join().expect("Error cleaning up file thread for file_linked object");
}
}
} }
impl<T> FileLinked<T> impl<T> FileLinked<T>
@ -108,44 +122,52 @@ where
.ok_or_else(|| anyhow!("Unable to get filename for tempfile {}", path.display()))? .ok_or_else(|| anyhow!("Unable to get filename for tempfile {}", path.display()))?
)); ));
let result = FileLinked { let mut result = FileLinked {
val, val,
path: path.to_path_buf(), path: path.to_path_buf(),
temp_file_path, temp_file_path,
file_thread: None,
}; };
result.write_data()?; result.write_data()?;
Ok(result) Ok(result)
} }
fn write_data(&self) -> Result<(), Error> { fn write_data(&mut self) -> Result<(), Error> {
let thread_path = self.path.clone();
let thread_temp_path = self.temp_file_path.clone();
let thread_val = bincode::serialize(&self.val)
.with_context(|| "Unable to serialize object into bincode".to_string())?;
if self.file_thread.is_some() {
let file_thread = self.file_thread.take();
file_thread.unwrap().join().expect("Unable to join thread");
}
match File::open(&self.path) { match File::open(&self.path) {
Ok(_) => { Ok(_) => {
copy(&self.path, &self.temp_file_path).with_context(|| {
format!(
"Unable to copy temp file from {} to {}",
self.path.display(),
self.temp_file_path.display()
)
})?;
let file = File::create(&self.path)?; let handle = std::thread::spawn(move || {
copy(&thread_path, &thread_temp_path).expect("Error copying temp file");
bincode::serialize_into(file, &self.val) let mut file = File::create(&thread_path).expect("Error creating file handle");
.with_context(|| format!("Unable to write to file {}", self.path.display()))?;
remove_file(&self.temp_file_path).with_context(|| { file.write_all(thread_val.as_slice())
format!( .expect("Failed to write data to file");
"Unable to remove temp file {}",
self.temp_file_path.display() remove_file(&thread_temp_path).expect("Error removing temp file");
) });
})?;
self.file_thread = Some(handle);
} }
Err(error) if error.kind() == ErrorKind::NotFound => { Err(error) if error.kind() == ErrorKind::NotFound => {
let file = File::create(&self.path)?; let handle = std::thread::spawn(move || {
let mut file = File::create(&thread_path).expect("Error creating file handle");
bincode::serialize_into(file, &self.val) file.write_all(thread_val.as_slice())
.with_context(|| format!("Unable to write to file {}", self.path.display()))?; .expect("Failed to write data to file");
});
self.file_thread = Some(handle);
} }
Err(error) => return Err(Error::IO(error)), Err(error) => return Err(Error::IO(error)),
} }
@ -324,6 +346,7 @@ where
val, val,
path: path.to_path_buf(), path: path.to_path_buf(),
temp_file_path, temp_file_path,
file_thread: None,
}), }),
Err(err) => { Err(err) => {
info!( info!(
@ -341,6 +364,7 @@ where
val, val,
path: path.to_path_buf(), path: path.to_path_buf(),
temp_file_path, temp_file_path,
file_thread: None,
}) })
} }
} }

View file

@ -35,7 +35,7 @@ fn main() -> anyhow::Result<()> {
&PathBuf::from(file_path), &PathBuf::from(file_path),
GemlaConfig { GemlaConfig {
generations_per_node: 10, generations_per_node: 10,
overwrite: true, overwrite: false,
}, },
))?; ))?;

View file

@ -72,10 +72,15 @@ where
self.data.readonly().0.as_ref().unwrap().height() self.data.readonly().0.as_ref().unwrap().height()
); );
self.data loop {
.mutate(|(d, _c)| Gemla::process_tree(d.as_mut().unwrap()))??; if Gemla::tree_processed(self.data.readonly().0.as_ref().unwrap())? {
info!("Processed tree"); info!("Processed tree");
break;
}
self.data
.mutate(|(d, _)| Gemla::process_tree(d.as_mut().unwrap()))??;
}
Ok(()) Ok(())
} }
@ -110,13 +115,31 @@ where
Ok(()) Ok(())
} }
fn tree_processed(tree: &SimulationTree<T>) -> Result<bool, Error> {
if tree.val.state() == &GeneticState::Finish {
match (&tree.left, &tree.right) {
(Some(l), Some(r)) => Ok(Gemla::tree_processed(l)? && Gemla::tree_processed(r)?),
(None, None) => Ok(true),
_ => Err(Error::Other(anyhow!("unable to process tree {:?}", tree))),
}
} else {
Ok(false)
}
}
fn process_tree(tree: &mut SimulationTree<T>) -> Result<(), Error> { fn process_tree(tree: &mut SimulationTree<T>) -> Result<(), Error> {
if tree.val.state() == &GeneticState::Initialize { if tree.val.state() == &GeneticState::Initialize {
match (&mut tree.left, &mut tree.right) { match (&mut tree.left, &mut tree.right) {
(Some(l), Some(r)) => { (Some(l), _) if l.val.state() != &GeneticState::Finish => {
Gemla::process_tree(&mut (*l))?; Gemla::process_tree(&mut (*l))?;
}
(_, Some(r)) if r.val.state() != &GeneticState::Finish => {
Gemla::process_tree(&mut (*r))?; Gemla::process_tree(&mut (*r))?;
}
(Some(l), Some(r))
if r.val.state() == &GeneticState::Finish
&& l.val.state() == &GeneticState::Finish =>
{
let left_node = (*l).val.node.as_ref().unwrap(); let left_node = (*l).val.node.as_ref().unwrap();
let right_node = (*r).val.node.as_ref().unwrap(); let right_node = (*r).val.node.as_ref().unwrap();
let merged_node = GeneticNode::merge(left_node, right_node)?; let merged_node = GeneticNode::merge(left_node, right_node)?;
@ -131,7 +154,7 @@ where
return Err(Error::Other(anyhow!("unable to process tree {:?}", tree))); return Err(Error::Other(anyhow!("unable to process tree {:?}", tree)));
} }
} }
} else { } else if tree.val.state() != &GeneticState::Finish {
Gemla::process_node(&mut tree.val)?; Gemla::process_node(&mut tree.val)?;
} }
@ -139,24 +162,20 @@ where
} }
fn process_node(node: &mut GeneticNodeWrapper<T>) -> Result<(), Error> { fn process_node(node: &mut GeneticNodeWrapper<T>) -> Result<(), Error> {
let node_time = Instant::now();
loop {
let node_state_time = Instant::now(); let node_state_time = Instant::now();
let node_state = node.state().clone(); let node_state = *node.state();
if node.process_node()? == GeneticState::Finish { node.process_node()?;
break;
}
trace!( trace!(
"{:?} completed in {:?}", "{:?} completed in {:?} for",
node_state, node_state,
node_state_time.elapsed() node_state_time.elapsed()
); );
}
info!("Processed node in {:?}", node_time.elapsed()); if node.state() == &GeneticState::Finish {
info!("Processed node");
}
Ok(()) Ok(())
} }