Concurrency working

This commit is contained in:
vandomej 2021-10-21 11:32:12 -07:00
parent 1becc7b0e3
commit 433fd04836
6 changed files with 209 additions and 57 deletions

View file

@ -15,7 +15,7 @@ categories = ["simulation"]
[dependencies] [dependencies]
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
uuid = { version = "0.7", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
clap = { version = "~2.27.0", features = ["yaml"] } clap = { version = "~2.27.0", features = ["yaml"] }
toml = "0.5.8" toml = "0.5.8"
regex = "1" regex = "1"
@ -24,4 +24,6 @@ thiserror = "1.0"
anyhow = "1.0" anyhow = "1.0"
rand = "0.8.4" rand = "0.8.4"
log = "0.4.14" log = "0.4.14"
env_logger = "0.9.0" env_logger = "0.9.0"
tokio = { version = "1.12.0", features = ["full"] }
futures = "0.3.17"

15
gemla/nodes.toml Normal file
View file

@ -0,0 +1,15 @@
[[nodes]]
fabric_addr = "10.0.0.1:9999"
bridge_bind = "10.0.0.1:8888"
mem = "100 GiB"
cpu = 8
# [[nodes]]
# fabric_addr = "10.0.0.2:9999"
# mem = "100 GiB"
# cpu = 16
# [[nodes]]
# fabric_addr = "10.0.0.3:9999"
# mem = "100 GiB"
# cpu = 16

View file

@ -18,7 +18,8 @@ use test_state::TestState;
/// ///
/// Use the -h, --h, or --help flag to see usage syntax. /// Use the -h, --h, or --help flag to see usage syntax.
/// TODO /// TODO
fn main() -> anyhow::Result<()> { #[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init(); env_logger::init();
info!("Starting"); info!("Starting");
@ -34,12 +35,12 @@ fn main() -> anyhow::Result<()> {
let mut gemla = log_error(Gemla::<TestState>::new( let mut gemla = log_error(Gemla::<TestState>::new(
&PathBuf::from(file_path), &PathBuf::from(file_path),
GemlaConfig { GemlaConfig {
generations_per_node: 10, generations_per_node: 1,
overwrite: false, overwrite: false,
}, },
))?; ))?;
log_error(gemla.simulate(10))?; log_error(gemla.simulate(100).await)?;
// let mut f = std::fs::File::create("./test")?; // let mut f = std::fs::File::create("./test")?;
// write!(f, "{}", serde_json::to_string(&gemla.data.readonly().0)?)?; // write!(f, "{}", serde_json::to_string(&gemla.data.readonly().0)?)?;

View file

@ -8,7 +8,7 @@ use std::convert::TryInto;
const POPULATION_SIZE: u64 = 5; const POPULATION_SIZE: u64 = 5;
const POPULATION_REDUCTION_SIZE: u64 = 3; const POPULATION_REDUCTION_SIZE: u64 = 3;
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TestState { pub struct TestState {
pub population: Vec<i64>, pub population: Vec<i64>,
} }

View file

@ -46,33 +46,43 @@ pub trait GeneticNode {
/// Used externally to wrap a node implementing the [`GeneticNode`] trait. Processes state transitions for the given node as /// Used externally to wrap a node implementing the [`GeneticNode`] trait. Processes state transitions for the given node as
/// well as signal recovery. Transition states are given by [`GeneticState`] /// well as signal recovery. Transition states are given by [`GeneticState`]
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GeneticNodeWrapper<T> { pub struct GeneticNodeWrapper<T>
where
T: Clone,
{
pub node: Option<T>, pub node: Option<T>,
state: GeneticState, state: GeneticState,
generation: u64, generation: u64,
pub total_generations: u64, pub total_generations: u64,
id: uuid::Uuid,
} }
impl<T> GeneticNodeWrapper<T> impl<T> GeneticNodeWrapper<T>
where where
T: GeneticNode + Debug, T: GeneticNode + Debug + Clone,
{ {
pub fn get_id(&self) -> uuid::Uuid {
self.id
}
pub fn new(total_generations: u64) -> Self { pub fn new(total_generations: u64) -> Self {
GeneticNodeWrapper { GeneticNodeWrapper {
node: None, node: None,
state: GeneticState::Initialize, state: GeneticState::Initialize,
generation: 0, generation: 0,
total_generations, total_generations,
id: uuid::Uuid::new_v4(),
} }
} }
pub fn from(data: T, total_generations: u64) -> Self { pub fn from(data: T, total_generations: u64, id: uuid::Uuid) -> Self {
GeneticNodeWrapper { GeneticNodeWrapper {
node: Some(data), node: Some(data),
state: GeneticState::Simulate, state: GeneticState::Simulate,
generation: 0, generation: 0,
total_generations, total_generations,
id,
} }
} }

View file

@ -8,7 +8,7 @@ use crate::tree::Tree;
use anyhow::anyhow; use anyhow::anyhow;
use file_linked::FileLinked; use file_linked::FileLinked;
use genetic_node::{GeneticNode, GeneticNodeWrapper, GeneticState}; use genetic_node::{GeneticNode, GeneticNodeWrapper, GeneticState};
use log::{info, trace}; use log::{info, trace, warn};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt::Debug; use std::fmt::Debug;
@ -32,16 +32,20 @@ pub struct GemlaConfig {
/// individuals. /// individuals.
/// ///
/// [`GeneticNode`]: genetic_node::GeneticNode /// [`GeneticNode`]: genetic_node::GeneticNode
pub struct Gemla<T> pub struct Gemla<'a, T>
where where
T: Serialize, T: Serialize + Clone,
{ {
pub data: FileLinked<(Option<SimulationTree<T>>, GemlaConfig)>, pub data: FileLinked<(Option<SimulationTree<T>>, GemlaConfig)>,
threads: std::collections::HashMap<
uuid::Uuid,
futures::prelude::future::BoxFuture<'a, Result<GeneticNodeWrapper<T>, Error>>,
>,
} }
impl<T> Gemla<T> impl<'a, T: 'a> Gemla<'a, T>
where where
T: GeneticNode + Serialize + DeserializeOwned + Debug, T: GeneticNode + Serialize + DeserializeOwned + Debug + Clone + std::marker::Send,
{ {
pub fn new(path: &Path, config: GemlaConfig) -> Result<Self, Error> { pub fn new(path: &Path, config: GemlaConfig) -> Result<Self, Error> {
match File::open(path) { match File::open(path) {
@ -54,16 +58,18 @@ where
} else { } else {
FileLinked::from_file(path)? FileLinked::from_file(path)?
}, },
threads: std::collections::HashMap::new(),
}) })
} }
Err(error) if error.kind() == ErrorKind::NotFound => Ok(Gemla { Err(error) if error.kind() == ErrorKind::NotFound => Ok(Gemla {
data: FileLinked::new((None, config), path)?, data: FileLinked::new((None, config), path)?,
threads: std::collections::HashMap::new(),
}), }),
Err(error) => Err(Error::IO(error)), Err(error) => Err(Error::IO(error)),
} }
} }
pub fn simulate(&mut self, steps: u64) -> Result<(), Error> { pub async fn simulate(&mut self, steps: u64) -> Result<(), Error> {
self.data self.data
.mutate(|(d, c)| Gemla::increase_height(d, c, steps))??; .mutate(|(d, c)| Gemla::increase_height(d, c, steps))??;
@ -74,17 +80,168 @@ where
loop { loop {
if Gemla::tree_processed(self.data.readonly().0.as_ref().unwrap())? { if Gemla::tree_processed(self.data.readonly().0.as_ref().unwrap())? {
self.join_threads().await?;
info!("Processed tree"); info!("Processed tree");
break; break;
} }
self.data let node_to_process = self.find_process_node();
.mutate(|(d, _)| Gemla::process_tree(d.as_mut().unwrap()))??;
if let Some(node) = node_to_process {
trace!("Adding node to process list {}", node.get_id());
// if self.threads.len() > 5 {
// self.join_threads().await?;
// } else {
self.threads
.insert(node.get_id(), Box::pin(Gemla::process_node(node)));
// }
} else {
trace!("No node found to process, joining threads");
self.join_threads().await?;
}
} }
Ok(()) Ok(())
} }
async fn join_threads(&mut self) -> Result<(), Error> {
if self.threads.len() > 0 {
trace!("Joining threads for nodes {:?}", self.threads.keys());
let results = futures::future::join_all(self.threads.values_mut()).await;
let reduced_results: Result<Vec<GeneticNodeWrapper<T>>, Error> =
results.into_iter().collect();
self.threads.clear();
reduced_results.and_then(|r| {
if !self
.data
.mutate(|(d, _)| Gemla::replace_nodes(d.as_mut().unwrap(), r))?
{
warn!("Unable to find nodes to replace in tree")
}
self.data
.mutate(|(d, _)| Gemla::merge_completed_nodes(d.as_mut().unwrap()))??;
Ok(())
})?;
}
Ok(())
}
fn merge_completed_nodes(tree: &mut SimulationTree<T>) -> Result<(), Error> {
if tree.val.state() == &GeneticState::Initialize {
match (&mut tree.left, &mut tree.right) {
(Some(l), Some(r))
if l.val.state() == &GeneticState::Finish
&& r.val.state() == &GeneticState::Finish =>
{
info!("Merging nodes {} and {}", l.val.get_id(), r.val.get_id());
let left_node = l.val.node.as_ref().unwrap();
let right_node = r.val.node.as_ref().unwrap();
let merged_node = GeneticNode::merge(left_node, right_node)?;
tree.val = GeneticNodeWrapper::from(
*merged_node,
tree.val.total_generations,
tree.val.get_id(),
);
}
(Some(l), Some(r)) => {
Gemla::merge_completed_nodes(l)?;
Gemla::merge_completed_nodes(r)?;
}
(Some(l), None) if l.val.state() == &GeneticState::Finish => {
trace!("Copying node {}", l.val.get_id());
let left_node = l.val.clone();
tree.val = GeneticNodeWrapper::from(
left_node.node.unwrap(),
tree.val.total_generations,
tree.val.get_id(),
);
}
(Some(l), None) => Gemla::merge_completed_nodes(l)?,
(None, Some(r)) if r.val.state() == &GeneticState::Finish => {
trace!("Copying node {}", r.val.get_id());
let right_node = r.val.clone();
tree.val = GeneticNodeWrapper::from(
right_node.node.unwrap(),
tree.val.total_generations,
tree.val.get_id(),
);
}
(None, Some(r)) => Gemla::merge_completed_nodes(r)?,
(_, _) => (),
}
}
Ok(())
}
fn find_process_node_helper(&self, tree: &SimulationTree<T>) -> Option<GeneticNodeWrapper<T>> {
if tree.val.state() != &GeneticState::Finish
&& !self.threads.contains_key(&tree.val.get_id())
{
match (&tree.left, &tree.right) {
(Some(l), Some(r))
if l.val.state() == &GeneticState::Finish
&& r.val.state() == &GeneticState::Finish =>
{
Some(tree.val.clone())
}
(Some(l), Some(r)) => self
.find_process_node_helper(&*l)
.or_else(|| self.find_process_node_helper(&*r)),
(Some(l), None) => self.find_process_node_helper(&*l),
(None, Some(r)) => self.find_process_node_helper(&*r),
(None, None) => Some(tree.val.clone()),
}
} else {
None
}
}
fn find_process_node(&self) -> Option<GeneticNodeWrapper<T>> {
let tree = self.data.readonly().0.as_ref();
tree.and_then(|t| self.find_process_node_helper(&t))
}
fn replace_node(
tree: &mut SimulationTree<T>,
node: GeneticNodeWrapper<T>,
) -> Option<GeneticNodeWrapper<T>> {
if tree.val.get_id() == node.get_id() {
tree.val = node;
None
} else {
match (&mut tree.left, &mut tree.right) {
(Some(l), Some(r)) => {
Gemla::replace_node(l, node).and_then(|n| Gemla::replace_node(r, n))
}
(Some(l), None) => Gemla::replace_node(l, node),
(None, Some(r)) => Gemla::replace_node(r, node),
_ => Some(node),
}
}
}
fn replace_nodes(tree: &mut SimulationTree<T>, nodes: Vec<GeneticNodeWrapper<T>>) -> bool {
nodes
.into_iter()
.map(|n| Gemla::replace_node(tree, n).is_none())
.reduce(|a, b| a && b)
.unwrap_or(false)
}
fn increase_height( fn increase_height(
tree: &mut Option<SimulationTree<T>>, tree: &mut Option<SimulationTree<T>>,
config: &GemlaConfig, config: &GemlaConfig,
@ -127,57 +284,24 @@ where
} }
} }
fn process_tree(tree: &mut SimulationTree<T>) -> Result<(), Error> { async fn process_node(mut node: GeneticNodeWrapper<T>) -> Result<GeneticNodeWrapper<T>, Error> {
if tree.val.state() == &GeneticState::Initialize {
match (&mut tree.left, &mut tree.right) {
(Some(l), _) if l.val.state() != &GeneticState::Finish => {
Gemla::process_tree(&mut (*l))?;
}
(_, Some(r)) if r.val.state() != &GeneticState::Finish => {
Gemla::process_tree(&mut (*r))?;
}
(Some(l), Some(r))
if r.val.state() == &GeneticState::Finish
&& l.val.state() == &GeneticState::Finish =>
{
let left_node = (*l).val.node.as_ref().unwrap();
let right_node = (*r).val.node.as_ref().unwrap();
let merged_node = GeneticNode::merge(left_node, right_node)?;
tree.val = GeneticNodeWrapper::from(*merged_node, tree.val.total_generations);
Gemla::process_node(&mut tree.val)?;
}
(None, None) => {
Gemla::process_node(&mut tree.val)?;
}
_ => {
return Err(Error::Other(anyhow!("unable to process tree {:?}", tree)));
}
}
} else if tree.val.state() != &GeneticState::Finish {
Gemla::process_node(&mut tree.val)?;
}
Ok(())
}
fn process_node(node: &mut GeneticNodeWrapper<T>) -> Result<(), Error> {
let node_state_time = Instant::now(); let node_state_time = Instant::now();
let node_state = *node.state(); let node_state = *node.state();
node.process_node()?; node.process_node()?;
trace!( trace!(
"{:?} completed in {:?} for", "{:?} completed in {:?} for {}",
node_state, node_state,
node_state_time.elapsed() node_state_time.elapsed(),
node.get_id()
); );
if node.state() == &GeneticState::Finish { if node.state() == &GeneticState::Finish {
info!("Processed node"); info!("Processed node {}", node.get_id());
} }
Ok(()) Ok(node)
} }
} }