Skip to content

Commit

Permalink
Merge pull request #311 from korpling/comparable-update-events
Browse files Browse the repository at this point in the history
Fix WAL persistance
  • Loading branch information
thomaskrause authored Dec 30, 2024
2 parents 18b57d2 + e39b1bd commit 82dd9c7
Show file tree
Hide file tree
Showing 33 changed files with 292 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release_capi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
omitPrereleaseDuringUpdate: true
deploy_macos_binaries:
if: ${{ github.event.action == 'completed' || github.event.label.name == 'test-release-process' || (github.event_name == 'release' && github.event.action == 'published') }}
runs-on: macos-12
runs-on: macos-14
steps:
- id: latest-release
uses: pozetroninc/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
RUST_LOG: debug
test_mac:
name: Execute automated tests on OSX
runs-on: macos-12
runs-on: macos-14
steps:
- uses: actions/checkout@v2
- uses: actions-rust-lang/[email protected]
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `UpdateEvent` now implements `PartialEq` to make possible to compare changes.

### Fixed

- Deserializing a write-ahead log failed because it was located at the wrong
sub-directory and the deserialization routine for the map had a bug.

## [3.5.1] - 2024-09-25

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion capi/src/cerror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct CauseIterator<'a> {
current: Option<&'a dyn StdError>,
}

impl<'a> std::iter::Iterator for CauseIterator<'a> {
impl std::iter::Iterator for CauseIterator<'_> {
type Item = Error;

fn next(&mut self) -> std::option::Option<Error> {
Expand Down
14 changes: 8 additions & 6 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,36 @@ bincode = "1.2"
clru = "0.6.1"
itertools = "0.10"
lazy_static = "1.4"
toml = "0.8"
log = "0.4"
memmap2 = "0.9"
normpath = "1.1.1"
num-traits = "0.2"
percent-encoding = "2.1"
quick-xml = "0.28"
rand = { version = "0.8", features = ["small_rng"] }
rayon = { version = "1.3", default-features = false }
rand = {version = "0.8", features = ["small_rng"]}
rayon = {version = "1.3", default-features = false}
regex = "1"
regex-syntax = "0.8"
rustc-hash = "1.0"
serde = { version = "1.0", features = ["rc"] }
serde = {version = "1.0", features = ["rc"]}
serde_bytes = "0.11"
serde_derive = "1.0"
smallvec = "1.6"
smartstring = { version = "1", features = ["serde"] }
smartstring = {version = "1", features = ["serde"]}
sstable = "0.11"
strum = "0.21"
strum_macros = "0.21"
tempfile = "3.1"
thiserror = "1"
toml = "0.8"
transient-btree-index = "0.5"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["heapapi"] }
winapi = {version = "0.3", features = ["heapapi"]}

[dev-dependencies]
env_logger = "0.9"
fake = "2.2"
insta = {version = "1.38.0", features = ["json"]}
pretty_assertions = "1.3"
serde_json = "1.0"
2 changes: 1 addition & 1 deletion core/src/dfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<'a> CycleSafeDFS<'a> {
}
}

impl<'a> Iterator for CycleSafeDFS<'a> {
impl Iterator for CycleSafeDFS<'_> {
type Item = Result<DFSStep>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
60 changes: 59 additions & 1 deletion core/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl<CT: ComponentType> Graph<CT> {
std::fs::create_dir_all(&current_path)?;

// If successfull write log
let log_path = location.join("update_log.bin");
let log_path = current_path.join("update_log.bin");

// Create a temporary directory in the same file system as the output
let temporary_dir = tempfile::tempdir_in(&current_path)?;
Expand Down Expand Up @@ -1155,4 +1155,62 @@ mod tests {
db.ensure_loaded_parallel(&[component]).unwrap();
assert_eq!(0, db.components.len());
}

#[test]
fn load_with_wal_file() {
let mut db = Graph::<DefaultComponentType>::new(false).unwrap();
let example_node = 0;
db.node_annos
.insert(
example_node,
Annotation {
key: NODE_TYPE_KEY.as_ref().clone(),
val: "corpus".into(),
},
)
.unwrap();
db.node_annos
.insert(
example_node,
Annotation {
key: NODE_NAME_KEY.as_ref().clone(),
val: "root".into(),
},
)
.unwrap();

let tmp = tempfile::tempdir().unwrap();
// Save and remember the location, so that updates are recorded in a WAL
// file
db.persist_to(tmp.path()).unwrap();

// Add an node annotation with apply_update
let mut u = GraphUpdate::new();
u.add_event(UpdateEvent::AddNodeLabel {
node_name: "root".into(),
anno_ns: "example".into(),
anno_name: "anno-name".into(),
anno_value: "anno-value".into(),
})
.unwrap();
db.apply_update(&mut u, |_| {}).unwrap();

std::mem::drop(db);

// Check that loading the database again contains the changes
let mut db = Graph::<DefaultComponentType>::new(false).unwrap();
db.load_from(tmp.path(), true).unwrap();
let anno_value = db
.node_annos
.get_value_for_item(
&example_node,
&AnnoKey {
name: "anno-name".into(),
ns: "example".into(),
},
)
.unwrap()
.unwrap();
assert_eq!("anno-value", anno_value);
}
}
2 changes: 1 addition & 1 deletion core/src/graph/storage/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl<'a> UnionEdgeContainer<'a> {
}
}

impl<'a> EdgeContainer for UnionEdgeContainer<'a> {
impl EdgeContainer for UnionEdgeContainer<'_> {
fn get_outgoing_edges<'b>(
&'b self,
node: NodeID,
Expand Down
10 changes: 5 additions & 5 deletions core/src/graph/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use sstable::{SSIterator, Table, TableBuilder, TableIterator};
use tempfile::NamedTempFile;

/// Describes a single update on the graph.
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum UpdateEvent {
/// Add a node with a name and type.
AddNode {
Expand Down Expand Up @@ -304,10 +304,7 @@ impl<'de> Visitor<'de> for GraphUpdateVisitor {

let mut event_counter = 0;

while let Some((id, event)) = access
.next_entry::<u64, GraphUpdate>()
.map_err(M::Error::custom)?
{
while let Some((id, event)) = access.next_entry::<u64, UpdateEvent>()? {
event_counter = id;
let key = id.create_key();
let value = serialization.serialize(&event).map_err(M::Error::custom)?;
Expand Down Expand Up @@ -338,3 +335,6 @@ impl<'de> Deserialize<'de> for GraphUpdate {
deserializer.deserialize_map(GraphUpdateVisitor {})
}
}

#[cfg(test)]
mod tests;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
source: core/src/graph/update/tests.rs
expression: seralized_string
---
{
"1": {
"AddNode": {
"node_name": "parent",
"node_type": "corpus"
}
},
"2": {
"AddNode": {
"node_name": "child",
"node_type": "corpus"
}
},
"3": {
"AddEdge": {
"source_node": "child",
"target_node": "parent",
"layer": "annis",
"component_type": "PartOf",
"component_name": ""
}
}
}
122 changes: 122 additions & 0 deletions core/src/graph/update/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use insta::assert_snapshot;

use super::*;

#[test]
fn serialize_deserialize_bincode() {
let example_updates = vec![
UpdateEvent::AddNode {
node_name: "parent".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddNode {
node_name: "child".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddEdge {
source_node: "child".into(),
target_node: "parent".into(),
layer: "annis".into(),
component_type: "PartOf".into(),
component_name: "".into(),
},
];

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_bytes: Vec<u8> = bincode::serialize(&updates).unwrap();
let deseralized_update: GraphUpdate = bincode::deserialize(&seralized_bytes).unwrap();

assert_eq!(3, deseralized_update.len().unwrap());
let deseralized_events: Vec<UpdateEvent> = deseralized_update
.iter()
.unwrap()
.map(|e| e.unwrap().1)
.collect();
assert_eq!(example_updates, deseralized_events);
}

#[test]
fn serialize_deserialize_bincode_empty() {
let example_updates: Vec<UpdateEvent> = Vec::new();

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_bytes: Vec<u8> = bincode::serialize(&updates).unwrap();
let deseralized_update: GraphUpdate = bincode::deserialize(&seralized_bytes).unwrap();

assert_eq!(0, deseralized_update.len().unwrap());
assert_eq!(true, deseralized_update.is_empty().unwrap());
}

#[test]
fn serialize_json() {
let example_updates = vec![
UpdateEvent::AddNode {
node_name: "parent".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddNode {
node_name: "child".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddEdge {
source_node: "child".into(),
target_node: "parent".into(),
layer: "annis".into(),
component_type: "PartOf".into(),
component_name: "".into(),
},
];

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_string = serde_json::to_string_pretty(&updates).unwrap();
assert_snapshot!(seralized_string);
}

#[test]
fn serialize_deserialize_json() {
let example_updates = vec![
UpdateEvent::AddNode {
node_name: "parent".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddNode {
node_name: "child".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddEdge {
source_node: "child".into(),
target_node: "parent".into(),
layer: "annis".into(),
component_type: "PartOf".into(),
component_name: "".into(),
},
];

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_string = serde_json::to_string_pretty(&updates).unwrap();
let deseralized_update: GraphUpdate = serde_json::from_str(&seralized_string).unwrap();

assert_eq!(3, deseralized_update.len().unwrap());
let deseralized_events: Vec<UpdateEvent> = deseralized_update
.iter()
.unwrap()
.map(|e| e.unwrap().1)
.collect();
assert_eq!(example_updates, deseralized_events);
}
4 changes: 2 additions & 2 deletions core/src/util/disk_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ where
}
}

impl<'a, K, V> Iterator for CombinedRange<'a, K, V>
impl<K, V> Iterator for CombinedRange<'_, K, V>
where
K: Ord,
for<'de> K: 'static + Clone + KeySerializer + Send,
Expand Down Expand Up @@ -556,7 +556,7 @@ where
}
}

impl<'a, K, V> FusedIterator for CombinedRange<'a, K, V>
impl<K, V> FusedIterator for CombinedRange<'_, K, V>
where
K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send,
for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
Expand Down
Loading

0 comments on commit 82dd9c7

Please sign in to comment.