Refactoring asynchronous code

This commit is contained in:
vandomej 2021-11-06 23:27:21 -07:00
parent 3b89c8299b
commit bf1fd5a7d1
4 changed files with 156 additions and 146 deletions

View file

@ -25,13 +25,16 @@ where
} }
impl<T> Drop for FileLinked<T> impl<T> Drop for FileLinked<T>
where T: Serialize where
T: Serialize,
{ {
fn drop(&mut self) { fn drop(&mut self) {
if self.file_thread.is_some() if self.file_thread.is_some() {
{
let file_thread = self.file_thread.take(); let file_thread = self.file_thread.take();
file_thread.unwrap().join().expect("Error cleaning up file thread for file_linked object"); file_thread
.unwrap()
.join()
.expect("Error cleaning up file thread for file_linked object");
} }
} }
} }
@ -71,6 +74,8 @@ where
/// assert_eq!(linked_test.readonly().b, String::from("two")); /// assert_eq!(linked_test.readonly().b, String::from("two"));
/// assert_eq!(linked_test.readonly().c, 3.0); /// assert_eq!(linked_test.readonly().c, 3.0);
/// # /// #
/// # drop(linked_test);
/// #
/// # std::fs::remove_file("./temp").expect("Unable to remove file"); /// # std::fs::remove_file("./temp").expect("Unable to remove file");
/// # } /// # }
/// ``` /// ```
@ -109,6 +114,8 @@ where
/// assert_eq!(linked_test.readonly().b, String::from("two")); /// assert_eq!(linked_test.readonly().b, String::from("two"));
/// assert_eq!(linked_test.readonly().c, 3.0); /// assert_eq!(linked_test.readonly().c, 3.0);
/// # /// #
/// # drop(linked_test);
/// #
/// # std::fs::remove_file("./temp").expect("Unable to remove file"); /// # std::fs::remove_file("./temp").expect("Unable to remove file");
/// # } /// # }
/// ``` /// ```
@ -145,9 +152,8 @@ where
match File::open(&self.path) { match File::open(&self.path) {
Ok(_) => { Ok(_) => {
let handle = std::thread::spawn(move || { let handle = std::thread::spawn(move || {
copy(&thread_path, &thread_temp_path).expect("Error copying temp file"); copy(&thread_path, &thread_temp_path).expect("Unable to copy temp file");
let mut file = File::create(&thread_path).expect("Error creating file handle"); let mut file = File::create(&thread_path).expect("Error creating file handle");
@ -210,6 +216,8 @@ where
/// ///
/// assert_eq!(linked_test.readonly().a, 2); /// assert_eq!(linked_test.readonly().a, 2);
/// # /// #
/// # drop(linked_test);
/// #
/// # std::fs::remove_file("./temp").expect("Unable to remove file"); /// # std::fs::remove_file("./temp").expect("Unable to remove file");
/// # /// #
/// # Ok(()) /// # Ok(())
@ -261,6 +269,8 @@ where
/// ///
/// assert_eq!(linked_test.readonly().a, 2); /// assert_eq!(linked_test.readonly().a, 2);
/// # /// #
/// # drop(linked_test);
/// #
/// # std::fs::remove_file("./temp").expect("Unable to remove file"); /// # std::fs::remove_file("./temp").expect("Unable to remove file");
/// # /// #
/// # Ok(()) /// # Ok(())
@ -322,6 +332,8 @@ where
/// assert_eq!(linked_test.readonly().b, test.b); /// assert_eq!(linked_test.readonly().b, test.b);
/// assert_eq!(linked_test.readonly().c, test.c); /// assert_eq!(linked_test.readonly().c, test.c);
/// # /// #
/// # drop(linked_test);
/// #
/// # std::fs::remove_file("./temp").expect("Unable to remove file"); /// # std::fs::remove_file("./temp").expect("Unable to remove file");
/// # /// #
/// # Ok(()) /// # Ok(())
@ -417,6 +429,8 @@ mod tests {
"[1, 1, 3, 4, 5]" "[1, 1, 3, 4, 5]"
); );
drop(file_linked_list);
fs::remove_file("test.txt").expect("Unable to remove file"); fs::remove_file("test.txt").expect("Unable to remove file");
Ok(()) Ok(())

View file

@ -52,11 +52,11 @@ fn main() -> anyhow::Result<()> {
&PathBuf::from(file_path), &PathBuf::from(file_path),
GemlaConfig { GemlaConfig {
generations_per_node: 1, generations_per_node: 1,
overwrite: false, overwrite: true,
}, },
))?; ))?;
log_error(gemla.simulate(100).await)?; log_error(gemla.simulate(3).await)?;
Ok(()) Ok(())
}) })

View file

@ -47,47 +47,61 @@ pub trait GeneticNode {
/// Used externally to wrap a node implementing the [`GeneticNode`] trait. Processes state transitions for the given node as /// Used externally to wrap a node implementing the [`GeneticNode`] trait. Processes state transitions for the given node as
/// well as signal recovery. Transition states are given by [`GeneticState`] /// well as signal recovery. Transition states are given by [`GeneticState`]
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GeneticNodeWrapper<T> pub struct GeneticNodeWrapper<T> {
where node: Option<T>,
T: Clone,
{
pub node: Option<T>,
state: GeneticState, state: GeneticState,
generation: u64, generation: u64,
pub total_generations: u64, max_generations: u64,
id: uuid::Uuid, id: uuid::Uuid,
} }
impl<T> GeneticNodeWrapper<T> impl<T> Default for GeneticNodeWrapper<T> {
where fn default() -> Self {
T: GeneticNode + Debug + Clone,
{
pub fn get_id(&self) -> uuid::Uuid {
self.id
}
pub fn new(total_generations: u64) -> Self {
GeneticNodeWrapper { GeneticNodeWrapper {
node: None, node: None,
state: GeneticState::Initialize, state: GeneticState::Initialize,
generation: 0, generation: 0,
total_generations, max_generations: 1,
id: uuid::Uuid::new_v4(), id: uuid::Uuid::new_v4(),
} }
} }
}
pub fn from(data: T, total_generations: u64, id: uuid::Uuid) -> Self { impl<T> GeneticNodeWrapper<T>
where
T: GeneticNode + Debug,
{
pub fn new(max_generations: u64) -> Self {
GeneticNodeWrapper::<T> {
max_generations,
..Default::default()
}
}
pub fn from(data: T, max_generations: u64, id: uuid::Uuid) -> Self {
GeneticNodeWrapper { GeneticNodeWrapper {
node: Some(data), node: Some(data),
state: GeneticState::Simulate, state: GeneticState::Simulate,
generation: 0, generation: 0,
total_generations, max_generations,
id, id,
} }
} }
pub fn state(&self) -> &GeneticState { pub fn as_ref(&self) -> Option<&T> {
&self.state self.node.as_ref()
}
pub fn id(&self) -> uuid::Uuid {
self.id
}
pub fn max_generations(&self) -> u64 {
self.max_generations
}
pub fn state(&self) -> GeneticState {
self.state
} }
pub fn process_node(&mut self) -> Result<GeneticState, Error> { pub fn process_node(&mut self) -> Result<GeneticState, Error> {
@ -103,7 +117,7 @@ where
.simulate() .simulate()
.with_context(|| format!("Error simulating node: {:?}", self))?; .with_context(|| format!("Error simulating node: {:?}", self))?;
self.state = if self.generation >= self.total_generations { self.state = if self.generation >= self.max_generations {
GeneticState::Finish GeneticState::Finish
} else { } else {
GeneticState::Mutate GeneticState::Mutate

View file

@ -5,7 +5,6 @@ pub mod genetic_node;
use crate::error::Error; use crate::error::Error;
use crate::tree::Tree; use crate::tree::Tree;
use anyhow::anyhow;
use file_linked::FileLinked; use file_linked::FileLinked;
use genetic_node::{GeneticNode, GeneticNodeWrapper, GeneticState}; use genetic_node::{GeneticNode, GeneticNodeWrapper, GeneticState};
use log::{info, trace, warn}; use log::{info, trace, warn};
@ -18,7 +17,7 @@ use std::mem::swap;
use std::path::Path; use std::path::Path;
use std::time::Instant; use std::time::Instant;
type SimulationTree<T> = Tree<GeneticNodeWrapper<T>>; type SimulationTree<T> = Box<Tree<GeneticNodeWrapper<T>>>;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct GemlaConfig { pub struct GemlaConfig {
@ -49,18 +48,14 @@ where
{ {
pub fn new(path: &Path, config: GemlaConfig) -> Result<Self, Error> { pub fn new(path: &Path, config: GemlaConfig) -> Result<Self, Error> {
match File::open(path) { match File::open(path) {
Ok(file) => { Ok(_) => Ok(Gemla {
drop(file); data: if config.overwrite {
FileLinked::new((None, config), path)?
Ok(Gemla { } else {
data: if config.overwrite { FileLinked::from_file(path)?
FileLinked::new((None, config), path)? },
} else { threads: std::collections::HashMap::new(),
FileLinked::from_file(path)? }),
},
threads: std::collections::HashMap::new(),
})
}
Err(error) if error.kind() == ErrorKind::NotFound => Ok(Gemla { Err(error) if error.kind() == ErrorKind::NotFound => Ok(Gemla {
data: FileLinked::new((None, config), path)?, data: FileLinked::new((None, config), path)?,
threads: std::collections::HashMap::new(), threads: std::collections::HashMap::new(),
@ -70,8 +65,14 @@ where
} }
pub async fn simulate(&mut self, steps: u64) -> Result<(), Error> { pub async fn simulate(&mut self, steps: u64) -> Result<(), Error> {
self.data // Before we can process nodes we must create blank nodes in their place to keep track of which nodes have been processed
.mutate(|(d, c)| Gemla::increase_height(d, c, steps))??; // in the tree and which nodes have not.
self.data.mutate(|(d, c)| {
let mut tree: Option<SimulationTree<T>> = Gemla::increase_height(d.take(), c, steps);
swap(d, &mut tree);
})?;
// println!("{}", serde_json::to_string(&self.data.readonly().0).expect(""));
info!( info!(
"Height of simulation tree increased to {}", "Height of simulation tree increased to {}",
@ -79,20 +80,21 @@ where
); );
loop { loop {
if Gemla::tree_processed(self.data.readonly().0.as_ref().unwrap())? { if Gemla::is_completed(self.data.readonly().0.as_ref().unwrap()) {
self.join_threads().await?; self.join_threads().await?;
info!("Processed tree"); info!("Processed tree");
break; break;
} }
let node_to_process = self.find_process_node(); let node_to_process =
self.get_unprocessed_node(self.data.readonly().0.as_ref().unwrap());
if let Some(node) = node_to_process { if let Some(node) = node_to_process {
trace!("Adding node to process list {}", node.get_id()); trace!("Adding node to process list {}", node.id());
self.threads self.threads
.insert(node.get_id(), Box::pin(Gemla::process_node(node))); .insert(node.id(), Box::pin(Gemla::process_node(node)));
} else { } else {
trace!("No node found to process, joining threads"); trace!("No node found to process, joining threads");
@ -104,7 +106,7 @@ where
} }
async fn join_threads(&mut self) -> Result<(), Error> { async fn join_threads(&mut self) -> Result<(), Error> {
if self.threads.len() > 0 { if !self.threads.is_empty() {
trace!("Joining threads for nodes {:?}", self.threads.keys()); trace!("Joining threads for nodes {:?}", self.threads.keys());
let results = futures::future::join_all(self.threads.values_mut()).await; let results = futures::future::join_all(self.threads.values_mut()).await;
@ -114,11 +116,15 @@ where
self.threads.clear(); self.threads.clear();
reduced_results.and_then(|r| { reduced_results.and_then(|r| {
if !self let failed_nodes = self
.data .data
.mutate(|(d, _)| Gemla::replace_nodes(d.as_mut().unwrap(), r))? .mutate(|(d, _)| Gemla::replace_nodes(d.as_mut().unwrap(), r))?;
{
warn!("Unable to find nodes to replace in tree") if !failed_nodes.is_empty() {
warn!(
"Unable to find {:?} to replace in tree",
failed_nodes.iter().map(|n| n.id())
)
} }
self.data self.data
@ -132,46 +138,44 @@ where
} }
fn merge_completed_nodes(tree: &mut SimulationTree<T>) -> Result<(), Error> { fn merge_completed_nodes(tree: &mut SimulationTree<T>) -> Result<(), Error> {
if tree.val.state() == &GeneticState::Initialize { if tree.val.state() == GeneticState::Initialize {
match (&mut tree.left, &mut tree.right) { match (&mut tree.left, &mut tree.right) {
(Some(l), Some(r)) (Some(l), Some(r))
if l.val.state() == &GeneticState::Finish if l.val.state() == GeneticState::Finish
&& r.val.state() == &GeneticState::Finish => && r.val.state() == GeneticState::Finish =>
{ {
info!("Merging nodes {} and {}", l.val.get_id(), r.val.get_id()); info!("Merging nodes {} and {}", l.val.id(), r.val.id());
let left_node = l.val.node.as_ref().unwrap(); let left_node = l.val.as_ref().unwrap();
let right_node = r.val.node.as_ref().unwrap(); let right_node = r.val.as_ref().unwrap();
let merged_node = GeneticNode::merge(left_node, right_node)?; let merged_node = GeneticNode::merge(left_node, right_node)?;
tree.val = GeneticNodeWrapper::from( tree.val = GeneticNodeWrapper::from(
*merged_node, *merged_node,
tree.val.total_generations, tree.val.max_generations(),
tree.val.get_id(), tree.val.id(),
); );
} }
(Some(l), Some(r)) => { (Some(l), Some(r)) => {
Gemla::merge_completed_nodes(l)?; Gemla::merge_completed_nodes(l)?;
Gemla::merge_completed_nodes(r)?; Gemla::merge_completed_nodes(r)?;
} }
(Some(l), None) if l.val.state() == &GeneticState::Finish => { (Some(l), None) if l.val.state() == GeneticState::Finish => {
trace!("Copying node {}", l.val.get_id()); trace!("Copying node {}", l.val.id());
let left_node = l.val.clone();
tree.val = GeneticNodeWrapper::from( tree.val = GeneticNodeWrapper::from(
left_node.node.unwrap(), l.val.as_ref().unwrap().clone(),
tree.val.total_generations, tree.val.max_generations(),
tree.val.get_id(), tree.val.id(),
); );
} }
(Some(l), None) => Gemla::merge_completed_nodes(l)?, (Some(l), None) => Gemla::merge_completed_nodes(l)?,
(None, Some(r)) if r.val.state() == &GeneticState::Finish => { (None, Some(r)) if r.val.state() == GeneticState::Finish => {
trace!("Copying node {}", r.val.get_id()); trace!("Copying node {}", r.val.id());
let right_node = r.val.clone();
tree.val = GeneticNodeWrapper::from( tree.val = GeneticNodeWrapper::from(
right_node.node.unwrap(), r.val.as_ref().unwrap().clone(),
tree.val.total_generations, tree.val.max_generations(),
tree.val.get_id(), tree.val.id(),
); );
} }
(None, Some(r)) => Gemla::merge_completed_nodes(r)?, (None, Some(r)) => Gemla::merge_completed_nodes(r)?,
@ -182,22 +186,20 @@ where
Ok(()) Ok(())
} }
fn find_process_node_helper(&self, tree: &SimulationTree<T>) -> Option<GeneticNodeWrapper<T>> { fn get_unprocessed_node(&self, tree: &SimulationTree<T>) -> Option<GeneticNodeWrapper<T>> {
if tree.val.state() != &GeneticState::Finish if tree.val.state() != GeneticState::Finish && !self.threads.contains_key(&tree.val.id()) {
&& !self.threads.contains_key(&tree.val.get_id())
{
match (&tree.left, &tree.right) { match (&tree.left, &tree.right) {
(Some(l), Some(r)) (Some(l), Some(r))
if l.val.state() == &GeneticState::Finish if l.val.state() == GeneticState::Finish
&& r.val.state() == &GeneticState::Finish => && r.val.state() == GeneticState::Finish =>
{ {
Some(tree.val.clone()) Some(tree.val.clone())
} }
(Some(l), Some(r)) => self (Some(l), Some(r)) => self
.find_process_node_helper(&*l) .get_unprocessed_node(l)
.or_else(|| self.find_process_node_helper(&*r)), .or_else(|| self.get_unprocessed_node(r)),
(Some(l), None) => self.find_process_node_helper(&*l), (Some(l), None) => self.get_unprocessed_node(l),
(None, Some(r)) => self.find_process_node_helper(&*r), (None, Some(r)) => self.get_unprocessed_node(r),
(None, None) => Some(tree.val.clone()), (None, None) => Some(tree.val.clone()),
} }
} else { } else {
@ -205,83 +207,63 @@ where
} }
} }
fn find_process_node(&self) -> Option<GeneticNodeWrapper<T>> { fn replace_nodes(
let tree = self.data.readonly().0.as_ref();
tree.and_then(|t| self.find_process_node_helper(&t))
}
fn replace_node(
tree: &mut SimulationTree<T>, tree: &mut SimulationTree<T>,
node: GeneticNodeWrapper<T>, mut nodes: Vec<GeneticNodeWrapper<T>>,
) -> Option<GeneticNodeWrapper<T>> { ) -> Vec<GeneticNodeWrapper<T>> {
if tree.val.get_id() == node.get_id() { if let Some(i) = nodes.iter().position(|n| n.id() == tree.val.id()) {
tree.val = node; tree.val = nodes.remove(i);
None
} else {
match (&mut tree.left, &mut tree.right) {
(Some(l), Some(r)) => {
Gemla::replace_node(l, node).and_then(|n| Gemla::replace_node(r, n))
}
(Some(l), None) => Gemla::replace_node(l, node),
(None, Some(r)) => Gemla::replace_node(r, node),
_ => Some(node),
}
} }
}
fn replace_nodes(tree: &mut SimulationTree<T>, nodes: Vec<GeneticNodeWrapper<T>>) -> bool { match (&mut tree.left, &mut tree.right) {
nodes (Some(l), Some(r)) => Gemla::replace_nodes(r, Gemla::replace_nodes(l, nodes)),
.into_iter() (Some(l), None) => Gemla::replace_nodes(l, nodes),
.map(|n| Gemla::replace_node(tree, n).is_none()) (None, Some(r)) => Gemla::replace_nodes(r, nodes),
.reduce(|a, b| a && b) _ => nodes,
.unwrap_or(false) }
} }
fn increase_height( fn increase_height(
tree: &mut Option<SimulationTree<T>>, tree: Option<SimulationTree<T>>,
config: &GemlaConfig, config: &GemlaConfig,
amount: u64, amount: u64,
) -> Result<(), Error> { ) -> Option<SimulationTree<T>> {
for _ in 0..amount { if amount == 0 {
if tree.is_none() { tree
swap( } else {
tree, let right_branch_height =
&mut Some(btree!(GeneticNodeWrapper::new(config.generations_per_node))), tree.as_ref().map(|t| t.height() as u64).unwrap_or(0) + amount - 1;
);
} else {
let height = tree.as_mut().unwrap().height() as u64;
let temp = tree.take();
swap(
tree,
&mut Some(btree!(
GeneticNodeWrapper::new(config.generations_per_node),
temp.unwrap(),
btree!(GeneticNodeWrapper::new(
height * config.generations_per_node
))
)),
);
}
}
Ok(()) Some(Box::new(Tree::new(
GeneticNodeWrapper::new(config.generations_per_node),
Gemla::increase_height(tree, config, amount - 1),
if right_branch_height > 0 {
Some(Box::new(btree!(GeneticNodeWrapper::new(
right_branch_height * config.generations_per_node
))))
} else {
None
},
)))
}
} }
fn tree_processed(tree: &SimulationTree<T>) -> Result<bool, Error> { fn is_completed(tree: &SimulationTree<T>) -> bool {
if tree.val.state() == &GeneticState::Finish { if tree.val.state() == GeneticState::Finish {
match (&tree.left, &tree.right) { match (&tree.left, &tree.right) {
(Some(l), Some(r)) => Ok(Gemla::tree_processed(l)? && Gemla::tree_processed(r)?), (Some(l), Some(r)) => Gemla::is_completed(l) && Gemla::is_completed(r),
(None, None) => Ok(true), (Some(l), None) => Gemla::is_completed(l),
_ => Err(Error::Other(anyhow!("unable to process tree {:?}", tree))), (None, Some(r)) => Gemla::is_completed(r),
(None, None) => true,
} }
} else { } else {
Ok(false) false
} }
} }
async fn process_node(mut node: GeneticNodeWrapper<T>) -> Result<GeneticNodeWrapper<T>, Error> { async fn process_node(mut node: GeneticNodeWrapper<T>) -> Result<GeneticNodeWrapper<T>, Error> {
let node_state_time = Instant::now(); let node_state_time = Instant::now();
let node_state = *node.state(); let node_state = node.state();
node.process_node()?; node.process_node()?;
@ -289,11 +271,11 @@ where
"{:?} completed in {:?} for {}", "{:?} completed in {:?} for {}",
node_state, node_state,
node_state_time.elapsed(), node_state_time.elapsed(),
node.get_id() node.id()
); );
if node.state() == &GeneticState::Finish { if node.state() == GeneticState::Finish {
info!("Processed node {}", node.get_id()); info!("Processed node {}", node.id());
} }
Ok(node) Ok(node)