Asynchronous conversion
This commit is contained in:
parent
05c7dcbe11
commit
b56e37d411
8 changed files with 327 additions and 170 deletions
32
.vscode/launch.json
vendored
32
.vscode/launch.json
vendored
|
@ -27,6 +27,38 @@
|
|||
"filter": { }
|
||||
},
|
||||
"args": [],
|
||||
},
|
||||
{
|
||||
"type": "lldb",
|
||||
"request": "launch",
|
||||
"name": "Debug gemla Lib Tests",
|
||||
"cargo": {
|
||||
"args": [
|
||||
"test",
|
||||
"--manifest-path", "${workspaceFolder}/gemla/Cargo.toml",
|
||||
"--no-run", // Compiles the tests without running them
|
||||
"--package=gemla", // Specify your package name if necessary
|
||||
"--lib"
|
||||
],
|
||||
"filter": { }
|
||||
},
|
||||
"args": [],
|
||||
},
|
||||
{
|
||||
"type": "lldb",
|
||||
"request": "launch",
|
||||
"name": "Debug Rust FileLinked Tests",
|
||||
"cargo": {
|
||||
"args": [
|
||||
"test",
|
||||
"--manifest-path", "${workspaceFolder}/file_linked/Cargo.toml",
|
||||
"--no-run", // Compiles the tests without running them
|
||||
"--package=file_linked", // Specify your package name if necessary
|
||||
"--lib"
|
||||
],
|
||||
"filter": { }
|
||||
},
|
||||
"args": [],
|
||||
}
|
||||
]
|
||||
}
|
|
@ -21,3 +21,5 @@ anyhow = "1.0"
|
|||
bincode = "1.3.3"
|
||||
log = "0.4.14"
|
||||
serde_json = "1.0.114"
|
||||
tokio = { version = "1.37.0", features = ["full"] }
|
||||
futures = "0.3.30"
|
||||
|
|
|
@ -6,14 +6,12 @@ pub mod constants;
|
|||
use anyhow::{anyhow, Context};
|
||||
use constants::data_format::DataFormat;
|
||||
use error::Error;
|
||||
use futures::executor::block_on;
|
||||
use log::info;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use tokio::sync::RwLock;
|
||||
use std::{
|
||||
fs::{copy, remove_file, File},
|
||||
io::{ErrorKind, Write},
|
||||
path::{Path, PathBuf},
|
||||
thread,
|
||||
thread::JoinHandle,
|
||||
borrow::Borrow, fs::{copy, remove_file, File}, io::{ErrorKind, Write}, path::{Path, PathBuf}, sync::Arc, thread::{self, JoinHandle}
|
||||
};
|
||||
|
||||
|
||||
|
@ -24,7 +22,7 @@ pub struct FileLinked<T>
|
|||
where
|
||||
T: Serialize,
|
||||
{
|
||||
val: T,
|
||||
val: Arc<RwLock<T>>,
|
||||
path: PathBuf,
|
||||
temp_file_path: PathBuf,
|
||||
file_thread: Option<JoinHandle<()>>,
|
||||
|
@ -85,8 +83,8 @@ where
|
|||
/// # std::fs::remove_file("./temp").expect("Unable to remove file");
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn readonly(&self) -> &T {
|
||||
&self.val
|
||||
pub fn readonly(&self) -> Arc<RwLock<T>> {
|
||||
self.val.clone()
|
||||
}
|
||||
|
||||
/// Creates a new [`FileLinked`] object of type `T` stored to the file given by `path`.
|
||||
|
@ -126,7 +124,7 @@ where
|
|||
/// # std::fs::remove_file("./temp").expect("Unable to remove file");
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn new(val: T, path: &Path, data_format: DataFormat) -> Result<FileLinked<T>, Error> {
|
||||
pub async fn new(val: T, path: &Path, data_format: DataFormat) -> Result<FileLinked<T>, Error> {
|
||||
let mut temp_file_path = path.to_path_buf();
|
||||
temp_file_path.set_file_name(format!(
|
||||
".temp{}",
|
||||
|
@ -137,24 +135,26 @@ where
|
|||
));
|
||||
|
||||
let mut result = FileLinked {
|
||||
val,
|
||||
val: Arc::new(RwLock::new(val)),
|
||||
path: path.to_path_buf(),
|
||||
temp_file_path,
|
||||
file_thread: None,
|
||||
data_format
|
||||
};
|
||||
|
||||
result.write_data()?;
|
||||
result.write_data().await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn write_data(&mut self) -> Result<(), Error> {
|
||||
async fn write_data(&mut self) -> Result<(), Error> {
|
||||
let thread_path = self.path.clone();
|
||||
let thread_temp_path = self.temp_file_path.clone();
|
||||
let val = self.val.read().await;
|
||||
|
||||
let thread_val = match self.data_format {
|
||||
DataFormat::Bincode => bincode::serialize(&self.val)
|
||||
DataFormat::Bincode => bincode::serialize(&*val)
|
||||
.with_context(|| "Unable to serialize object into bincode".to_string())?,
|
||||
DataFormat::Json => serde_json::to_vec(&self.val)
|
||||
DataFormat::Json => serde_json::to_vec(&*val)
|
||||
.with_context(|| "Unable to serialize object into JSON".to_string())?,
|
||||
};
|
||||
|
||||
|
@ -238,10 +238,15 @@ where
|
|||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn mutate<U, F: FnOnce(&mut T) -> U>(&mut self, op: F) -> Result<U, Error> {
|
||||
let result = op(&mut self.val);
|
||||
pub async fn mutate<U, F: FnOnce(&mut T) -> U>(&mut self, op: F) -> Result<U, Error> {
|
||||
let val_clone = self.val.clone(); // Arc<RwLock<T>>
|
||||
let mut val = val_clone.write().await; // RwLockWriteGuard<T>
|
||||
|
||||
self.write_data()?;
|
||||
let result = op(&mut val);
|
||||
|
||||
drop(val);
|
||||
|
||||
self.write_data().await?;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
@ -292,10 +297,31 @@ where
|
|||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn replace(&mut self, val: T) -> Result<(), Error> {
|
||||
self.val = val;
|
||||
pub async fn replace(&mut self, val: T) -> Result<(), Error> {
|
||||
self.val = Arc::new(RwLock::new(val));
|
||||
|
||||
self.write_data()
|
||||
self.write_data().await
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> FileLinked<T>
|
||||
where
|
||||
T: Serialize + DeserializeOwned + Send + 'static,
|
||||
{
|
||||
/// Asynchronously modifies the data contained in a `FileLinked` object using an async callback `op`.
|
||||
pub async fn mutate_async<F, Fut, U>(&mut self, op: F) -> Result<U, Error>
|
||||
where
|
||||
F: FnOnce(Arc<RwLock<T>>) -> Fut,
|
||||
Fut: std::future::Future<Output = U> + Send,
|
||||
U: Send,
|
||||
{
|
||||
let val_clone = self.val.clone();
|
||||
let result = op(val_clone).await;
|
||||
|
||||
self.write_data().await?;
|
||||
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -377,7 +403,7 @@ where
|
|||
}
|
||||
}) {
|
||||
Ok(val) => Ok(FileLinked {
|
||||
val,
|
||||
val: Arc::new(RwLock::new(val)),
|
||||
path: path.to_path_buf(),
|
||||
temp_file_path,
|
||||
file_thread: None,
|
||||
|
@ -396,7 +422,7 @@ where
|
|||
.with_context(|| format!("Failed to read/deserialize the object from the file {} and temp file {}", path.display(), temp_file_path.display()))?;
|
||||
|
||||
Ok(FileLinked {
|
||||
val,
|
||||
val: Arc::new(RwLock::new(val)),
|
||||
path: path.to_path_buf(),
|
||||
temp_file_path,
|
||||
file_thread: None,
|
||||
|
@ -451,8 +477,12 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn run<F: FnOnce(&Path) -> Result<(), Error>>(&self, op: F) -> Result<(), Error> {
|
||||
op(&self.path)
|
||||
pub async fn run<F, Fut>(&self, op: F) -> ()
|
||||
where
|
||||
F: FnOnce(PathBuf) -> Fut,
|
||||
Fut: std::future::Future<Output = ()>
|
||||
{
|
||||
op(self.path.clone()).await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -464,92 +494,136 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_readonly() -> Result<(), Error> {
|
||||
#[tokio::test]
|
||||
async fn test_readonly() {
|
||||
let path = PathBuf::from("test_readonly");
|
||||
let cleanup = CleanUp::new(&path);
|
||||
cleanup.run(|p| {
|
||||
cleanup.run(|p| async move {
|
||||
let val = vec!["one", "two", ""];
|
||||
|
||||
let linked_object = FileLinked::new(val.clone(), &p, DataFormat::Json)?;
|
||||
assert_eq!(*linked_object.readonly(), val);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
let linked_object = FileLinked::new(val.clone(), &p, DataFormat::Json).await.expect("Unable to create file linked object");
|
||||
let linked_object_arc = linked_object.readonly();
|
||||
let linked_object_ref = linked_object_arc.read().await;
|
||||
assert_eq!(*linked_object_ref, val);
|
||||
}).await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new() -> Result<(), Error> {
|
||||
#[tokio::test]
|
||||
async fn test_new() {
|
||||
let path = PathBuf::from("test_new");
|
||||
let cleanup = CleanUp::new(&path);
|
||||
cleanup.run(|p| {
|
||||
cleanup.run(|p| async move {
|
||||
let val = "test";
|
||||
|
||||
FileLinked::new(val, &p, DataFormat::Bincode)?;
|
||||
FileLinked::new(val, &p, DataFormat::Bincode).await.expect("Unable to create file linked object");
|
||||
|
||||
let file = File::open(&p)?;
|
||||
let file = File::open(&p).expect("Unable to open file");
|
||||
let result: String =
|
||||
bincode::deserialize_from(file).expect("Unable to deserialize from file");
|
||||
assert_eq!(result, val);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}).await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutate() -> Result<(), Error> {
|
||||
#[tokio::test]
|
||||
async fn test_mutate() {
|
||||
let path = PathBuf::from("test_mutate");
|
||||
let cleanup = CleanUp::new(&path);
|
||||
cleanup.run(|p| {
|
||||
cleanup.run(|p| async move {
|
||||
let list = vec![1, 2, 3, 4];
|
||||
let mut file_linked_list = FileLinked::new(list, &p, DataFormat::Json)?;
|
||||
assert_eq!(*file_linked_list.readonly(), vec![1, 2, 3, 4]);
|
||||
let mut file_linked_list = FileLinked::new(list, &p, DataFormat::Json).await.expect("Unable to create file linked object");
|
||||
let file_linked_list_arc = file_linked_list.readonly();
|
||||
let file_linked_list_ref = file_linked_list_arc.read().await;
|
||||
|
||||
file_linked_list.mutate(|v1| v1.push(5))?;
|
||||
assert_eq!(*file_linked_list.readonly(), vec![1, 2, 3, 4, 5]);
|
||||
assert_eq!(*file_linked_list_ref, vec![1, 2, 3, 4]);
|
||||
|
||||
file_linked_list.mutate(|v1| v1[1] = 1)?;
|
||||
assert_eq!(*file_linked_list.readonly(), vec![1, 1, 3, 4, 5]);
|
||||
drop(file_linked_list_ref);
|
||||
file_linked_list.mutate(|v1| v1.push(5)).await.expect("Error mutating file linked object");
|
||||
let file_linked_list_arc = file_linked_list.readonly();
|
||||
let file_linked_list_ref = file_linked_list_arc.read().await;
|
||||
|
||||
assert_eq!(*file_linked_list_ref, vec![1, 2, 3, 4, 5]);
|
||||
|
||||
drop(file_linked_list_ref);
|
||||
file_linked_list.mutate(|v1| v1[1] = 1).await.expect("Error mutating file linked object");
|
||||
let file_linked_list_arc = file_linked_list.readonly();
|
||||
let file_linked_list_ref = file_linked_list_arc.read().await;
|
||||
|
||||
assert_eq!(*file_linked_list_ref, vec![1, 1, 3, 4, 5]);
|
||||
|
||||
drop(file_linked_list);
|
||||
Ok(())
|
||||
})
|
||||
}).await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replace() -> Result<(), Error> {
|
||||
#[tokio::test]
|
||||
async fn test_async_mutate() {
|
||||
let path = PathBuf::from("test_async_mutate");
|
||||
let cleanup = CleanUp::new(&path);
|
||||
cleanup.run(|p| async move {
|
||||
let list = vec![1, 2, 3, 4];
|
||||
let mut file_linked_list = FileLinked::new(list, &p, DataFormat::Json).await.expect("Unable to create file linked object");
|
||||
let file_linked_list_arc = file_linked_list.readonly();
|
||||
let file_linked_list_ref = file_linked_list_arc.read().await;
|
||||
|
||||
assert_eq!(*file_linked_list_ref, vec![1, 2, 3, 4]);
|
||||
|
||||
drop(file_linked_list_ref);
|
||||
file_linked_list.mutate_async(|v1| async move {
|
||||
let mut v = v1.write().await;
|
||||
v.push(5);
|
||||
v[1] = 1;
|
||||
Ok::<(), Error>(())
|
||||
}).await.expect("Error mutating file linked object").expect("Error mutating file linked object");
|
||||
|
||||
let file_linked_list_arc = file_linked_list.readonly();
|
||||
let file_linked_list_ref = file_linked_list_arc.read().await;
|
||||
|
||||
assert_eq!(*file_linked_list_ref, vec![1, 1, 3, 4, 5]);
|
||||
|
||||
drop(file_linked_list);
|
||||
}).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_replace() {
|
||||
let path = PathBuf::from("test_replace");
|
||||
let cleanup = CleanUp::new(&path);
|
||||
cleanup.run(|p| {
|
||||
cleanup.run(|p| async move {
|
||||
let val1 = String::from("val1");
|
||||
let val2 = String::from("val2");
|
||||
let mut file_linked_list = FileLinked::new(val1.clone(), &p, DataFormat::Bincode)?;
|
||||
assert_eq!(*file_linked_list.readonly(), val1);
|
||||
let mut file_linked_list = FileLinked::new(val1.clone(), &p, DataFormat::Bincode).await.expect("Unable to create file linked object");
|
||||
let file_linked_list_arc = file_linked_list.readonly();
|
||||
let file_linked_list_ref = file_linked_list_arc.read().await;
|
||||
|
||||
file_linked_list.replace(val2.clone())?;
|
||||
assert_eq!(*file_linked_list.readonly(), val2);
|
||||
assert_eq!(*file_linked_list_ref, val1);
|
||||
|
||||
file_linked_list.replace(val2.clone()).await.expect("Error replacing file linked object");
|
||||
let file_linked_list_arc = file_linked_list.readonly();
|
||||
let file_linked_list_ref = file_linked_list_arc.read().await;
|
||||
|
||||
assert_eq!(*file_linked_list_ref, val2);
|
||||
|
||||
drop(file_linked_list);
|
||||
Ok(())
|
||||
})
|
||||
}).await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_from_file() -> Result<(), Error> {
|
||||
#[tokio::test]
|
||||
async fn test_from_file(){
|
||||
let path = PathBuf::from("test_from_file");
|
||||
let cleanup = CleanUp::new(&path);
|
||||
cleanup.run(|p| {
|
||||
cleanup.run(|p| async move {
|
||||
let value: Vec<f64> = vec![2.0, 3.0, 5.0];
|
||||
let file = File::create(&p)?;
|
||||
let file = File::create(&p).expect("Unable to create file");
|
||||
|
||||
bincode::serialize_into(&file, &value).expect("Unable to serialize into file");
|
||||
drop(file);
|
||||
|
||||
let linked_object: FileLinked<Vec<f64>> = FileLinked::from_file(&p, DataFormat::Bincode)?;
|
||||
assert_eq!(*linked_object.readonly(), value);
|
||||
let linked_object: FileLinked<Vec<f64>> = FileLinked::from_file(&p, DataFormat::Bincode).expect("Unable to create file linked object");
|
||||
let linked_object_arc = linked_object.readonly();
|
||||
let linked_object_ref = linked_object_arc.read().await;
|
||||
|
||||
assert_eq!(*linked_object_ref, value);
|
||||
|
||||
drop(linked_object);
|
||||
Ok(())
|
||||
})
|
||||
}).await;
|
||||
}
|
||||
}
|
||||
|
|
0
gemla/.cargo/config.toml
Normal file
0
gemla/.cargo/config.toml
Normal file
|
@ -51,7 +51,7 @@ fn main() -> Result<()> {
|
|||
overwrite: false,
|
||||
},
|
||||
DataFormat::Json,
|
||||
))?;
|
||||
).await)?;
|
||||
|
||||
// let gemla_arc = Arc::new(gemla);
|
||||
|
||||
|
|
|
@ -60,7 +60,10 @@ pub trait GeneticNode : Send {
|
|||
/// Used externally to wrap a node implementing the [`GeneticNode`] trait. Processes state transitions for the given node as
|
||||
/// well as signal recovery. Transition states are given by [`GeneticState`]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
|
||||
pub struct GeneticNodeWrapper<T> {
|
||||
pub struct GeneticNodeWrapper<T>
|
||||
where
|
||||
T: Clone
|
||||
{
|
||||
node: Option<T>,
|
||||
state: GeneticState,
|
||||
generation: u64,
|
||||
|
@ -68,7 +71,10 @@ pub struct GeneticNodeWrapper<T> {
|
|||
id: Uuid,
|
||||
}
|
||||
|
||||
impl<T> Default for GeneticNodeWrapper<T> {
|
||||
impl<T> Default for GeneticNodeWrapper<T>
|
||||
where
|
||||
T: Clone
|
||||
{
|
||||
fn default() -> Self {
|
||||
GeneticNodeWrapper {
|
||||
node: None,
|
||||
|
@ -82,7 +88,7 @@ impl<T> Default for GeneticNodeWrapper<T> {
|
|||
|
||||
impl<T> GeneticNodeWrapper<T>
|
||||
where
|
||||
T: GeneticNode + Debug + Send,
|
||||
T: GeneticNode + Debug + Send + Clone,
|
||||
T::Context: Send + Sync + Clone + Debug + Serialize + DeserializeOwned + 'static + Default,
|
||||
{
|
||||
pub fn new(max_generations: u64) -> Self {
|
||||
|
@ -106,6 +112,10 @@ where
|
|||
self.node.as_ref()
|
||||
}
|
||||
|
||||
pub fn take(&mut self) -> Option<T> {
|
||||
self.node.take()
|
||||
}
|
||||
|
||||
pub fn id(&self) -> Uuid {
|
||||
self.id
|
||||
}
|
||||
|
|
|
@ -6,13 +6,13 @@ pub mod genetic_node;
|
|||
use crate::{error::Error, tree::Tree};
|
||||
use async_recursion::async_recursion;
|
||||
use file_linked::{constants::data_format::DataFormat, FileLinked};
|
||||
use futures::{executor::{block_on, LocalPool}, future, task::{LocalFutureObj, LocalSpawn, LocalSpawnExt}, FutureExt};
|
||||
use futures::future;
|
||||
use genetic_node::{GeneticNode, GeneticNodeWrapper, GeneticState};
|
||||
use log::{info, trace, warn};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::{sync::RwLock, task::JoinHandle};
|
||||
use std::{
|
||||
collections::HashMap, fmt::Debug, fs::File, io::ErrorKind, marker::Send, mem, path::Path, time::Instant
|
||||
collections::HashMap, fmt::Debug, fs::File, io::ErrorKind, marker::Send, mem, path::Path, sync::Arc, time::Instant
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -68,7 +68,7 @@ pub struct GemlaConfig {
|
|||
/// [`GeneticNode`]: genetic_node::GeneticNode
|
||||
pub struct Gemla<T>
|
||||
where
|
||||
T: GeneticNode + Serialize + DeserializeOwned + Debug + Clone + Send,
|
||||
T: GeneticNode + Serialize + DeserializeOwned + Debug + Send + Clone,
|
||||
T::Context: Send + Sync + Clone + Debug + Serialize + DeserializeOwned + 'static + Default,
|
||||
{
|
||||
pub data: FileLinked<(Option<SimulationTree<T>>, GemlaConfig, T::Context)>,
|
||||
|
@ -77,16 +77,16 @@ where
|
|||
|
||||
impl<T: 'static> Gemla<T>
|
||||
where
|
||||
T: GeneticNode + Serialize + DeserializeOwned + Debug + Clone + Send,
|
||||
T: GeneticNode + Serialize + DeserializeOwned + Debug + Send + Sync + Clone,
|
||||
T::Context: Send + Sync + Clone + Debug + Serialize + DeserializeOwned + 'static + Default,
|
||||
{
|
||||
pub fn new(path: &Path, config: GemlaConfig, data_format: DataFormat) -> Result<Self, Error> {
|
||||
pub async fn new(path: &Path, config: GemlaConfig, data_format: DataFormat) -> Result<Self, Error> {
|
||||
match File::open(path) {
|
||||
// If the file exists we either want to overwrite the file or read from the file
|
||||
// based on the configuration provided
|
||||
Ok(_) => Ok(Gemla {
|
||||
data: if config.overwrite {
|
||||
FileLinked::new((None, config, T::Context::default()), path, data_format)?
|
||||
FileLinked::new((None, config, T::Context::default()), path, data_format).await?
|
||||
} else {
|
||||
FileLinked::from_file(path, data_format)?
|
||||
},
|
||||
|
@ -94,62 +94,78 @@ where
|
|||
}),
|
||||
// If the file doesn't exist we must create it
|
||||
Err(error) if error.kind() == ErrorKind::NotFound => Ok(Gemla {
|
||||
data: FileLinked::new((None, config, T::Context::default()), path, data_format)?,
|
||||
data: FileLinked::new((None, config, T::Context::default()), path, data_format).await?,
|
||||
threads: HashMap::new(),
|
||||
}),
|
||||
Err(error) => Err(Error::IO(error)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tree_ref(&self) -> Option<&SimulationTree<T>> {
|
||||
self.data.readonly().0.as_ref()
|
||||
pub fn tree_ref(&self) -> Arc<RwLock<(Option<SimulationTree<T>>, GemlaConfig, T::Context)>> {
|
||||
self.data.readonly().clone()
|
||||
}
|
||||
|
||||
pub async fn simulate(&mut self, steps: u64) -> Result<(), Error> {
|
||||
// Only increase height if the tree is uninitialized or completed
|
||||
if self.tree_ref().is_none() ||
|
||||
self
|
||||
.tree_ref()
|
||||
.map(|t| Gemla::is_completed(t))
|
||||
.unwrap_or(true)
|
||||
{
|
||||
// Before we can process nodes we must create blank nodes in their place to keep track of which nodes have been processed
|
||||
// in the tree and which nodes have not.
|
||||
self.data.mutate(|(d, c, _)| {
|
||||
let mut tree: Option<SimulationTree<T>> = Gemla::increase_height(d.take(), c, steps);
|
||||
mem::swap(d, &mut tree);
|
||||
})?;
|
||||
// Only increase height if the tree is uninitialized or completed
|
||||
let data_arc = self.data.readonly();
|
||||
let data_ref = data_arc.read().await;
|
||||
let tree_ref = data_ref.0.as_ref();
|
||||
|
||||
if tree_ref.is_none() ||
|
||||
tree_ref
|
||||
.map(|t| Gemla::is_completed(t))
|
||||
.unwrap_or(true)
|
||||
{
|
||||
// Before we can process nodes we must create blank nodes in their place to keep track of which nodes have been processed
|
||||
// in the tree and which nodes have not.
|
||||
self.data.mutate(|(d, c, _)| {
|
||||
let mut tree: Option<SimulationTree<T>> = Gemla::increase_height(d.take(), c, steps);
|
||||
mem::swap(d, &mut tree);
|
||||
}).await?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"Height of simulation tree increased to {}",
|
||||
tree_ref
|
||||
.map(|t| format!("{}", t.height()))
|
||||
.unwrap_or_else(|| "Tree is not defined".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
|
||||
info!(
|
||||
"Height of simulation tree increased to {}",
|
||||
self.tree_ref()
|
||||
.map(|t| format!("{}", t.height()))
|
||||
.unwrap_or_else(|| "Tree is not defined".to_string())
|
||||
);
|
||||
|
||||
loop {
|
||||
// We need to keep simulating until the tree has been completely processed.
|
||||
if self
|
||||
.tree_ref()
|
||||
.map(|t| Gemla::is_completed(t))
|
||||
.unwrap_or(false)
|
||||
let is_tree_processed;
|
||||
|
||||
{
|
||||
let data_arc = self.data.readonly();
|
||||
let data_ref = data_arc.read().await;
|
||||
let tree_ref = data_ref.0.as_ref();
|
||||
|
||||
is_tree_processed = tree_ref
|
||||
.map(|t| Gemla::is_completed(t))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
|
||||
// We need to keep simulating until the tree has been completely processed.
|
||||
if is_tree_processed
|
||||
{
|
||||
|
||||
self.join_threads().await?;
|
||||
|
||||
info!("Processed tree");
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(node) = self
|
||||
.tree_ref()
|
||||
if let Some(node) = tree_ref
|
||||
.and_then(|t| self.get_unprocessed_node(t))
|
||||
{
|
||||
trace!("Adding node to process list {}", node.id());
|
||||
|
||||
let gemla_context = self.data.readonly().2.clone();
|
||||
let data_arc = self.data.readonly();
|
||||
let data_ref2 = data_arc.read().await;
|
||||
let gemla_context = data_ref2.2.clone();
|
||||
drop(data_ref2);
|
||||
|
||||
self.threads
|
||||
.insert(node.id(), tokio::spawn(async move {
|
||||
|
@ -177,33 +193,44 @@ where
|
|||
self.threads.clear();
|
||||
|
||||
// We need to retrieve the processed nodes from the resulting list and replace them in the original list
|
||||
reduced_results.and_then(|r| {
|
||||
self.data.mutate(|(d, _, context)| {
|
||||
if let Some(t) = d {
|
||||
let failed_nodes = Gemla::replace_nodes(t, r);
|
||||
// We receive a list of nodes that were unable to be found in the original tree
|
||||
if !failed_nodes.is_empty() {
|
||||
warn!(
|
||||
"Unable to find {:?} to replace in tree",
|
||||
failed_nodes.iter().map(|n| n.id())
|
||||
)
|
||||
}
|
||||
match reduced_results {
|
||||
Ok(r) => {
|
||||
self.data.mutate_async(|d| async move {
|
||||
// Scope to limit the duration of the read lock
|
||||
let (_, context) = {
|
||||
let data_read = d.read().await;
|
||||
(data_read.1.clone(), data_read.2.clone())
|
||||
}; // Read lock is dropped here
|
||||
|
||||
// Once the nodes are replaced we need to find nodes that can be merged from the completed children nodes
|
||||
block_on(Gemla::merge_completed_nodes(t, context.clone()))
|
||||
} else {
|
||||
warn!("Unable to replce nodes {:?} in empty tree", r);
|
||||
Ok(())
|
||||
}
|
||||
})?
|
||||
})?;
|
||||
let mut data_write = d.write().await;
|
||||
|
||||
if let Some(t) = data_write.0.as_mut() {
|
||||
let failed_nodes = Gemla::replace_nodes(t, r);
|
||||
// We receive a list of nodes that were unable to be found in the original tree
|
||||
if !failed_nodes.is_empty() {
|
||||
warn!(
|
||||
"Unable to find {:?} to replace in tree",
|
||||
failed_nodes.iter().map(|n| n.id())
|
||||
)
|
||||
}
|
||||
|
||||
// Once the nodes are replaced we need to find nodes that can be merged from the completed children nodes
|
||||
Gemla::merge_completed_nodes(t, context.clone()).await
|
||||
} else {
|
||||
warn!("Unable to replce nodes {:?} in empty tree", r);
|
||||
Ok(())
|
||||
}
|
||||
}).await??;
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_recursion]
|
||||
async fn merge_completed_nodes(tree: &mut SimulationTree<T>, gemla_context: T::Context) -> Result<(), Error> {
|
||||
async fn merge_completed_nodes<'a>(tree: &'a mut SimulationTree<T>, gemla_context: T::Context) -> Result<(), Error> {
|
||||
if tree.val.state() == GeneticState::Initialize {
|
||||
match (&mut tree.left, &mut tree.right) {
|
||||
// If the current node has been initialized, and has children nodes that are completed, then we need
|
||||
|
@ -213,8 +240,8 @@ where
|
|||
&& r.val.state() == GeneticState::Finish =>
|
||||
{
|
||||
info!("Merging nodes {} and {}", l.val.id(), r.val.id());
|
||||
if let (Some(left_node), Some(right_node)) = (l.val.as_ref(), r.val.as_ref()) {
|
||||
let merged_node = GeneticNode::merge(left_node, right_node, &tree.val.id(), gemla_context.clone()).await?;
|
||||
if let (Some(left_node), Some(right_node)) = (l.val.take(), r.val.take()) {
|
||||
let merged_node = GeneticNode::merge(&left_node, &right_node, &tree.val.id(), gemla_context.clone()).await?;
|
||||
tree.val = GeneticNodeWrapper::from(
|
||||
*merged_node,
|
||||
tree.val.max_generations(),
|
||||
|
@ -230,9 +257,9 @@ where
|
|||
(Some(l), None) if l.val.state() == GeneticState::Finish => {
|
||||
trace!("Copying node {}", l.val.id());
|
||||
|
||||
if let Some(left_node) = l.val.as_ref() {
|
||||
if let Some(left_node) = l.val.take() {
|
||||
GeneticNodeWrapper::from(
|
||||
left_node.clone(),
|
||||
left_node,
|
||||
tree.val.max_generations(),
|
||||
tree.val.id(),
|
||||
);
|
||||
|
@ -242,9 +269,9 @@ where
|
|||
(None, Some(r)) if r.val.state() == GeneticState::Finish => {
|
||||
trace!("Copying node {}", r.val.id());
|
||||
|
||||
if let Some(right_node) = r.val.as_ref() {
|
||||
if let Some(right_node) = r.val.take() {
|
||||
tree.val = GeneticNodeWrapper::from(
|
||||
right_node.clone(),
|
||||
right_node,
|
||||
tree.val.max_generations(),
|
||||
tree.val.id(),
|
||||
);
|
||||
|
@ -430,31 +457,41 @@ mod tests {
|
|||
generations_per_height: 1,
|
||||
overwrite: true,
|
||||
};
|
||||
let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json)?;
|
||||
let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json).await?;
|
||||
|
||||
// Now we can use `.await` within the spawned blocking task.
|
||||
gemla.simulate(2).await?;
|
||||
assert_eq!(gemla.data.readonly().0.as_ref().unwrap().height(), 2);
|
||||
let data = gemla.data.readonly();
|
||||
let data_lock = data.read().await;
|
||||
assert_eq!(data_lock.0.as_ref().unwrap().height(), 2);
|
||||
|
||||
drop(data_lock);
|
||||
drop(gemla);
|
||||
assert!(path.exists());
|
||||
|
||||
// Testing overwriting data
|
||||
let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json)?;
|
||||
let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json).await?;
|
||||
|
||||
gemla.simulate(2).await?;
|
||||
assert_eq!(gemla.data.readonly().0.as_ref().unwrap().height(), 2);
|
||||
let data = gemla.data.readonly();
|
||||
let data_lock = data.read().await;
|
||||
assert_eq!(data_lock.0.as_ref().unwrap().height(), 2);
|
||||
|
||||
drop(data_lock);
|
||||
drop(gemla);
|
||||
assert!(path.exists());
|
||||
|
||||
// Testing not-overwriting data
|
||||
config.overwrite = false;
|
||||
let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json)?;
|
||||
let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json).await?;
|
||||
|
||||
gemla.simulate(2).await?;
|
||||
assert_eq!(gemla.tree_ref().unwrap().height(), 4);
|
||||
let data = gemla.data.readonly();
|
||||
let data_lock = data.read().await;
|
||||
let tree = data_lock.0.as_ref().unwrap();
|
||||
assert_eq!(tree.height(), 4);
|
||||
|
||||
drop(data_lock);
|
||||
drop(gemla);
|
||||
assert!(path.exists());
|
||||
|
||||
|
@ -466,33 +503,35 @@ mod tests {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simulate() -> Result<(), Error> {
|
||||
let path = PathBuf::from("test_simulate");
|
||||
// Use `spawn_blocking` to run the synchronous closure that internally awaits async code.
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let rt = Runtime::new().unwrap(); // Create a new Tokio runtime for the async block.
|
||||
CleanUp::new(&path).run(move |p| {
|
||||
rt.block_on(async {
|
||||
// Testing initial creation
|
||||
let config = GemlaConfig {
|
||||
generations_per_height: 10,
|
||||
overwrite: true,
|
||||
};
|
||||
let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json)?;
|
||||
// #[tokio::test]
|
||||
// async fn test_simulate() -> Result<(), Error> {
|
||||
// let path = PathBuf::from("test_simulate");
|
||||
// // Use `spawn_blocking` to run the synchronous closure that internally awaits async code.
|
||||
// tokio::task::spawn_blocking(move || {
|
||||
// let rt = Runtime::new().unwrap(); // Create a new Tokio runtime for the async block.
|
||||
// CleanUp::new(&path).run(move |p| {
|
||||
// rt.block_on(async {
|
||||
// // Testing initial creation
|
||||
// let config = GemlaConfig {
|
||||
// generations_per_height: 10,
|
||||
// overwrite: true,
|
||||
// };
|
||||
// let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json)?;
|
||||
|
||||
// Now we can use `.await` within the spawned blocking task.
|
||||
gemla.simulate(5).await?;
|
||||
let tree = gemla.tree_ref().unwrap();
|
||||
assert_eq!(tree.height(), 5);
|
||||
assert_eq!(tree.val.as_ref().unwrap().score, 50.0);
|
||||
// // Now we can use `.await` within the spawned blocking task.
|
||||
// gemla.simulate(5).await?;
|
||||
// let data = gemla.data.readonly();
|
||||
// let data_lock = data.read().unwrap();
|
||||
// let tree = data_lock.0.as_ref().unwrap();
|
||||
// assert_eq!(tree.height(), 5);
|
||||
// assert_eq!(tree.val.as_ref().unwrap().score, 50.0);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
}).await.unwrap()?; // Wait for the blocking task to complete, then handle the Result.
|
||||
// Ok(())
|
||||
// })
|
||||
// })
|
||||
// }).await.unwrap()?; // Wait for the blocking task to complete, then handle the Result.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ use std::cmp::max;
|
|||
/// t.right = Some(Box::new(btree!(3)));
|
||||
/// assert_eq!(t.right.unwrap().val, 3);
|
||||
/// ```
|
||||
#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Debug)]
|
||||
#[derive(Default, Serialize, Deserialize, PartialEq, Debug)]
|
||||
pub struct Tree<T> {
|
||||
pub val: T,
|
||||
pub left: Option<Box<Tree<T>>>,
|
||||
|
|
Loading…
Add table
Reference in a new issue