diff --git a/gemla/Cargo.toml b/gemla/Cargo.toml index be3a442..4f2df1e 100644 --- a/gemla/Cargo.toml +++ b/gemla/Cargo.toml @@ -15,7 +15,7 @@ categories = ["simulation"] [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -uuid = { version = "0.7", features = ["serde", "v4"] } +uuid = { version = "0.8", features = ["serde", "v4"] } clap = { version = "~2.27.0", features = ["yaml"] } toml = "0.5.8" regex = "1" @@ -24,4 +24,6 @@ thiserror = "1.0" anyhow = "1.0" rand = "0.8.4" log = "0.4.14" -env_logger = "0.9.0" \ No newline at end of file +env_logger = "0.9.0" +tokio = { version = "1.12.0", features = ["full"] } +futures = "0.3.17" \ No newline at end of file diff --git a/gemla/nodes.toml b/gemla/nodes.toml new file mode 100644 index 0000000..976061d --- /dev/null +++ b/gemla/nodes.toml @@ -0,0 +1,15 @@ +[[nodes]] +fabric_addr = "10.0.0.1:9999" +bridge_bind = "10.0.0.1:8888" +mem = "100 GiB" +cpu = 8 + +# [[nodes]] +# fabric_addr = "10.0.0.2:9999" +# mem = "100 GiB" +# cpu = 16 + +# [[nodes]] +# fabric_addr = "10.0.0.3:9999" +# mem = "100 GiB" +# cpu = 16 \ No newline at end of file diff --git a/gemla/src/bin/bin.rs b/gemla/src/bin/bin.rs index d670a6c..585ba13 100644 --- a/gemla/src/bin/bin.rs +++ b/gemla/src/bin/bin.rs @@ -18,7 +18,8 @@ use test_state::TestState; /// /// Use the -h, --h, or --help flag to see usage syntax. /// TODO -fn main() -> anyhow::Result<()> { +#[tokio::main] +async fn main() -> anyhow::Result<()> { env_logger::init(); info!("Starting"); @@ -34,12 +35,12 @@ fn main() -> anyhow::Result<()> { let mut gemla = log_error(Gemla::::new( &PathBuf::from(file_path), GemlaConfig { - generations_per_node: 10, + generations_per_node: 1, overwrite: false, }, ))?; - log_error(gemla.simulate(10))?; + log_error(gemla.simulate(100).await)?; // let mut f = std::fs::File::create("./test")?; // write!(f, "{}", serde_json::to_string(&gemla.data.readonly().0)?)?; diff --git a/gemla/src/bin/test_state/mod.rs b/gemla/src/bin/test_state/mod.rs index 9d2c599..d91c21d 100644 --- a/gemla/src/bin/test_state/mod.rs +++ b/gemla/src/bin/test_state/mod.rs @@ -8,7 +8,7 @@ use std::convert::TryInto; const POPULATION_SIZE: u64 = 5; const POPULATION_REDUCTION_SIZE: u64 = 3; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct TestState { pub population: Vec, } diff --git a/gemla/src/core/genetic_node.rs b/gemla/src/core/genetic_node.rs index 84dd618..ab0d229 100644 --- a/gemla/src/core/genetic_node.rs +++ b/gemla/src/core/genetic_node.rs @@ -46,33 +46,43 @@ pub trait GeneticNode { /// Used externally to wrap a node implementing the [`GeneticNode`] trait. Processes state transitions for the given node as /// well as signal recovery. Transition states are given by [`GeneticState`] -#[derive(Debug, Serialize, Deserialize)] -pub struct GeneticNodeWrapper { +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct GeneticNodeWrapper +where + T: Clone, +{ pub node: Option, state: GeneticState, generation: u64, pub total_generations: u64, + id: uuid::Uuid, } impl GeneticNodeWrapper where - T: GeneticNode + Debug, + T: GeneticNode + Debug + Clone, { + pub fn get_id(&self) -> uuid::Uuid { + self.id + } + pub fn new(total_generations: u64) -> Self { GeneticNodeWrapper { node: None, state: GeneticState::Initialize, generation: 0, total_generations, + id: uuid::Uuid::new_v4(), } } - pub fn from(data: T, total_generations: u64) -> Self { + pub fn from(data: T, total_generations: u64, id: uuid::Uuid) -> Self { GeneticNodeWrapper { node: Some(data), state: GeneticState::Simulate, generation: 0, total_generations, + id, } } diff --git a/gemla/src/core/mod.rs b/gemla/src/core/mod.rs index aea4c7b..7c16d80 100644 --- a/gemla/src/core/mod.rs +++ b/gemla/src/core/mod.rs @@ -8,7 +8,7 @@ use crate::tree::Tree; use anyhow::anyhow; use file_linked::FileLinked; use genetic_node::{GeneticNode, GeneticNodeWrapper, GeneticState}; -use log::{info, trace}; +use log::{info, trace, warn}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -32,16 +32,20 @@ pub struct GemlaConfig { /// individuals. /// /// [`GeneticNode`]: genetic_node::GeneticNode -pub struct Gemla +pub struct Gemla<'a, T> where - T: Serialize, + T: Serialize + Clone, { pub data: FileLinked<(Option>, GemlaConfig)>, + threads: std::collections::HashMap< + uuid::Uuid, + futures::prelude::future::BoxFuture<'a, Result, Error>>, + >, } -impl Gemla +impl<'a, T: 'a> Gemla<'a, T> where - T: GeneticNode + Serialize + DeserializeOwned + Debug, + T: GeneticNode + Serialize + DeserializeOwned + Debug + Clone + std::marker::Send, { pub fn new(path: &Path, config: GemlaConfig) -> Result { match File::open(path) { @@ -54,16 +58,18 @@ where } else { FileLinked::from_file(path)? }, + threads: std::collections::HashMap::new(), }) } Err(error) if error.kind() == ErrorKind::NotFound => Ok(Gemla { data: FileLinked::new((None, config), path)?, + threads: std::collections::HashMap::new(), }), Err(error) => Err(Error::IO(error)), } } - pub fn simulate(&mut self, steps: u64) -> Result<(), Error> { + pub async fn simulate(&mut self, steps: u64) -> Result<(), Error> { self.data .mutate(|(d, c)| Gemla::increase_height(d, c, steps))??; @@ -74,17 +80,168 @@ where loop { if Gemla::tree_processed(self.data.readonly().0.as_ref().unwrap())? { + self.join_threads().await?; + info!("Processed tree"); break; } - self.data - .mutate(|(d, _)| Gemla::process_tree(d.as_mut().unwrap()))??; + let node_to_process = self.find_process_node(); + + if let Some(node) = node_to_process { + trace!("Adding node to process list {}", node.get_id()); + + // if self.threads.len() > 5 { + // self.join_threads().await?; + // } else { + self.threads + .insert(node.get_id(), Box::pin(Gemla::process_node(node))); + // } + } else { + trace!("No node found to process, joining threads"); + + self.join_threads().await?; + } } Ok(()) } + async fn join_threads(&mut self) -> Result<(), Error> { + if self.threads.len() > 0 { + trace!("Joining threads for nodes {:?}", self.threads.keys()); + + let results = futures::future::join_all(self.threads.values_mut()).await; + let reduced_results: Result>, Error> = + results.into_iter().collect(); + + self.threads.clear(); + + reduced_results.and_then(|r| { + if !self + .data + .mutate(|(d, _)| Gemla::replace_nodes(d.as_mut().unwrap(), r))? + { + warn!("Unable to find nodes to replace in tree") + } + + self.data + .mutate(|(d, _)| Gemla::merge_completed_nodes(d.as_mut().unwrap()))??; + + Ok(()) + })?; + + } + + Ok(()) + } + + fn merge_completed_nodes(tree: &mut SimulationTree) -> Result<(), Error> { + if tree.val.state() == &GeneticState::Initialize { + match (&mut tree.left, &mut tree.right) { + (Some(l), Some(r)) + if l.val.state() == &GeneticState::Finish + && r.val.state() == &GeneticState::Finish => + { + info!("Merging nodes {} and {}", l.val.get_id(), r.val.get_id()); + + let left_node = l.val.node.as_ref().unwrap(); + let right_node = r.val.node.as_ref().unwrap(); + let merged_node = GeneticNode::merge(left_node, right_node)?; + tree.val = GeneticNodeWrapper::from( + *merged_node, + tree.val.total_generations, + tree.val.get_id(), + ); + } + (Some(l), Some(r)) => { + Gemla::merge_completed_nodes(l)?; + Gemla::merge_completed_nodes(r)?; + } + (Some(l), None) if l.val.state() == &GeneticState::Finish => { + trace!("Copying node {}", l.val.get_id()); + + let left_node = l.val.clone(); + tree.val = GeneticNodeWrapper::from( + left_node.node.unwrap(), + tree.val.total_generations, + tree.val.get_id(), + ); + } + (Some(l), None) => Gemla::merge_completed_nodes(l)?, + (None, Some(r)) if r.val.state() == &GeneticState::Finish => { + trace!("Copying node {}", r.val.get_id()); + + let right_node = r.val.clone(); + tree.val = GeneticNodeWrapper::from( + right_node.node.unwrap(), + tree.val.total_generations, + tree.val.get_id(), + ); + } + (None, Some(r)) => Gemla::merge_completed_nodes(r)?, + (_, _) => (), + } + } + + Ok(()) + } + + fn find_process_node_helper(&self, tree: &SimulationTree) -> Option> { + if tree.val.state() != &GeneticState::Finish + && !self.threads.contains_key(&tree.val.get_id()) + { + match (&tree.left, &tree.right) { + (Some(l), Some(r)) + if l.val.state() == &GeneticState::Finish + && r.val.state() == &GeneticState::Finish => + { + Some(tree.val.clone()) + } + (Some(l), Some(r)) => self + .find_process_node_helper(&*l) + .or_else(|| self.find_process_node_helper(&*r)), + (Some(l), None) => self.find_process_node_helper(&*l), + (None, Some(r)) => self.find_process_node_helper(&*r), + (None, None) => Some(tree.val.clone()), + } + } else { + None + } + } + + fn find_process_node(&self) -> Option> { + let tree = self.data.readonly().0.as_ref(); + tree.and_then(|t| self.find_process_node_helper(&t)) + } + + fn replace_node( + tree: &mut SimulationTree, + node: GeneticNodeWrapper, + ) -> Option> { + if tree.val.get_id() == node.get_id() { + tree.val = node; + None + } else { + match (&mut tree.left, &mut tree.right) { + (Some(l), Some(r)) => { + Gemla::replace_node(l, node).and_then(|n| Gemla::replace_node(r, n)) + } + (Some(l), None) => Gemla::replace_node(l, node), + (None, Some(r)) => Gemla::replace_node(r, node), + _ => Some(node), + } + } + } + + fn replace_nodes(tree: &mut SimulationTree, nodes: Vec>) -> bool { + nodes + .into_iter() + .map(|n| Gemla::replace_node(tree, n).is_none()) + .reduce(|a, b| a && b) + .unwrap_or(false) + } + fn increase_height( tree: &mut Option>, config: &GemlaConfig, @@ -127,57 +284,24 @@ where } } - fn process_tree(tree: &mut SimulationTree) -> Result<(), Error> { - if tree.val.state() == &GeneticState::Initialize { - match (&mut tree.left, &mut tree.right) { - (Some(l), _) if l.val.state() != &GeneticState::Finish => { - Gemla::process_tree(&mut (*l))?; - } - (_, Some(r)) if r.val.state() != &GeneticState::Finish => { - Gemla::process_tree(&mut (*r))?; - } - (Some(l), Some(r)) - if r.val.state() == &GeneticState::Finish - && l.val.state() == &GeneticState::Finish => - { - let left_node = (*l).val.node.as_ref().unwrap(); - let right_node = (*r).val.node.as_ref().unwrap(); - let merged_node = GeneticNode::merge(left_node, right_node)?; - - tree.val = GeneticNodeWrapper::from(*merged_node, tree.val.total_generations); - Gemla::process_node(&mut tree.val)?; - } - (None, None) => { - Gemla::process_node(&mut tree.val)?; - } - _ => { - return Err(Error::Other(anyhow!("unable to process tree {:?}", tree))); - } - } - } else if tree.val.state() != &GeneticState::Finish { - Gemla::process_node(&mut tree.val)?; - } - - Ok(()) - } - - fn process_node(node: &mut GeneticNodeWrapper) -> Result<(), Error> { + async fn process_node(mut node: GeneticNodeWrapper) -> Result, Error> { let node_state_time = Instant::now(); let node_state = *node.state(); node.process_node()?; trace!( - "{:?} completed in {:?} for", + "{:?} completed in {:?} for {}", node_state, - node_state_time.elapsed() + node_state_time.elapsed(), + node.get_id() ); if node.state() == &GeneticState::Finish { - info!("Processed node"); + info!("Processed node {}", node.get_id()); } - Ok(()) + Ok(node) } }