Browse Source

Some initial work on the yama 0.4.0 overhaul.

develop
Olivier 'reivilibre' 6 months ago
parent
commit
88c188e918
  1. 1163
      Cargo.lock
  2. 14
      Cargo.toml
  3. 22
      README.md
  4. 8
      src/chunking.rs
  5. 35
      src/commands.rs
  6. 4
      src/definitions.rs
  7. 270
      src/main.rs
  8. 4
      src/operations.rs
  9. 4
      src/operations/extraction_flow.rs
  10. 110
      src/pile.rs
  11. 66
      src/pile/local_sled.rs
  12. 0
      src/xdef.rs
  13. 55
      src/xpile.rs
  14. 2
      src/xpile/local_pile.rs

1163
Cargo.lock
File diff suppressed because it is too large
View File

14
Cargo.toml

@ -3,7 +3,7 @@ name = "yama"
version = "0.3.1"
authors = ["Olivier <olivier@librepush.net>"]
edition = "2018"
description = "Deduplicated content pile repository manager"
description = "Deduplicated, compressed and encrypted content pile manager"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -11,17 +11,14 @@ description = "Deduplicated content pile repository manager"
fastcdc = "1.0.2"
zstd = "0.5.1"
sshish = "0.1.0"
clap = "2.33.0"
lmdb-rkv = "0.12.3"
futures = "0.3.1"
clap = "3.0.0-beta.2"
# NB breaking changes in async-std 1.6.0
async-std = { version = "~1.4.0", features = ["unstable"] }
tokio = { version = "0.3", features = ["full"] }
blake = "2.0.0"
twox-hash = "1.5.0"
async-trait = "0.1.22"
serde = { version = "1.0.104", features = ["derive"] }
#serde_derive = "1.0.104"
serde_cbor = "0.8.2"
serde_bare = "0.3.0"
users = "0.9.1"
crossbeam = "0.7.3"
toml = "0.5.5"
@ -31,3 +28,6 @@ log = "0.4"
env_logger = "0.7.1"
indicatif = "0.14.0"
num_cpus = "1"
anyhow = "1.0"
thiserror = "1.0"
sled = "0.34.6"

22
README.md

@ -2,8 +2,30 @@
note: this readme is not yet updated to reality…
```
yama
[-w|--with [user@host:]path] [--with-encrypted true|false]
```
## Backup Profiles
## Remotes
In `yama.toml`, you can configure remotes:
```toml
[remote.bob]
encrypted = true
host = "bobmachine.xyz"
user = "bob"
path = "/home/bob/yama"
```
## Subcommands
### `check`: Check repository for consistency
Verifies the full repository satisfies the following consistency constraints:

8
src/chunking.rs

@ -210,9 +210,9 @@ impl RecursiveExtractor {
}
/* can't figure it out
pub async fn unchunk_to_end(recursive_chunk_ref: RecursiveChunkRef, pile: &mut (dyn Pile + Send)) -> YamaResult<Vec<u8>> {
pub async fn unchunk_to_end(recursive_chunk_ref: RecursiveChunkRef, xpile: &mut (dyn Pile + Send)) -> YamaResult<Vec<u8>> {
let mut buf = Vec::new();
let mut pile_mutex = Arc::new(Mutex::new(pile));
let mut pile_mutex = Arc::new(Mutex::new(xpile));
unchunk_to_end_internal(&recursive_chunk_ref.chunk_id, recursive_chunk_ref.depth, pile_mutex, &mut buf).await?;
Ok(buf)
}
@ -220,8 +220,8 @@ pub async fn unchunk_to_end(recursive_chunk_ref: RecursiveChunkRef, pile: &mut (
fn unchunk_to_end_internal(chunk_id: &ChunkId, depth: u32, pile_mutex: Arc<Mutex<&mut (dyn Pile + Send)>>, buf: &mut Vec<u8>) -> BoxFuture<'static, Result<(), String>> {
async move {
let chunk = {
let pile = pile_mutex.lock().expect("Poisoned Pile");
pile.get_chunk(chunk_id).await
let xpile = pile_mutex.lock().expect("Poisoned Pile");
xpile.get_chunk(chunk_id).await
.map_err(|e| e.to_string())?
.ok_or_else(|| format!("Invalid reference: no chunk {}", chunkid_to_hex(chunk_id)))?
};

35
src/commands.rs

@ -0,0 +1,35 @@
use std::path::Path;
use tokio::fs::File;
use anyhow::bail;
use tokio::io::AsyncWriteExt;
use crate::pile::PileDescriptor;
use clap::crate_version;
pub async fn init(dir: &Path) -> anyhow::Result<()> {
let yama_toml = dir.join("yama.toml");
if yama_toml.exists() {
bail!("yama.toml already exists. Cannot create yama xpile here.");
}
let pile_db = sled::open(dir.join("pile.sled"))?;
pile_db.flush()?;
let mut file = File::create(yama_toml).await?;
let desc = PileDescriptor {
yama_version: crate_version!().to_owned(),
profiles: Default::default(),
remotes: Default::default()
};
file.write_all(&toml::to_vec(&desc)?).await?;
Ok(())
}

4
src/definitions.rs

@ -0,0 +1,4 @@
pub type ChunkId = [u8; 32];
pub type XXHash = u64;
pub const XXH64_SEED: u64 = 424242;

270
src/main.rs

@ -1,208 +1,82 @@
use crate::def::Exclusions;
use crate::operations::{
check_local_pile, extract_local_local, list_local, list_local_tree, parse_pointer_subpath_pair,
store_local_local, verify_local_local,
};
use crate::pile::local_pile::LocalPile;
use clap::{crate_authors, crate_description, crate_version, App, Arg, SubCommand};
use std::path::Path;
use clap::{crate_authors, crate_description, crate_version, App, Arg, Clap};
use std::path::{Path, PathBuf};
use std::process::exit;
use log::error;
mod chunking;
mod def;
mod operations;
mod commands;
mod pile;
mod tree;
mod util;
mod definitions;
fn main() {
#[derive(Clap)]
#[clap(version = crate_version!(), author = crate_authors!())]
struct Opts {
/// Chooses a different xpile to be the working xpile.
/// If specified, must be the name of a remote in yama.toml.
#[clap(short, long)]
with: Option<String>,
#[clap(subcommand)]
command: PileCommand
}
#[derive(Clap)]
enum PileCommand {
Init {
},
Store {
/// Name of the pointer to store.
pointer_name: String,
/// Name of a parent pointer to do a differential pointer.
/// Helps with space saving and performance.
#[clap(short)]
parent_name: Option<String>,
/// Specify this if you want to overwrite an existing pointer.
#[clap(short, long)]
force: bool,
destination: PathBuf
},
Retrieve {
/// Name of the pointer to retrieve.
pointer_name: String,
/// Limited expression(s) of files to retrieve.
#[clap(short, long)]
subset: Vec<PathBuf>,
destination: PathBuf
},
Check {
#[clap(long)]
gc: bool
},
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
let matches = App::new("山")
.version(crate_version!())
.author(crate_authors!())
.about(crate_description!())
/*.arg(
Arg::with_name("v")
.short("v")
.multiple(true)
.help("Sets the level of verbosity"),
)*/
.subcommand(
SubCommand::with_name("init")
.about("creates a new pile in the current working directory"),
)
.subcommand(
SubCommand::with_name("check")
.about("checks the consistency of this repository")
.arg(
Arg::with_name("debug")
.short("v")
.help("print debug information verbosely"),
)
.arg(
Arg::with_name("gc")
.long("gc")
.help("Remove unused chunks to free up space."),
),
)
.subcommand(SubCommand::with_name("lsp").about("List tree pointers"))
.subcommand(
SubCommand::with_name("lst")
.arg(
Arg::with_name("POINTER")
.required(true)
.index(1)
.help("Pointer, with optional :path for subtree"),
)
.about("List contents of (sub-)tree"),
)
.subcommand(
SubCommand::with_name("store")
.arg(
Arg::with_name("TREE")
.required(true)
.index(1)
.help("Path to tree / file(s) on filesystem"),
)
.arg(
Arg::with_name("POINTER")
.required(true)
.index(2)
.help("Pointerspec"),
)
.arg(
Arg::with_name("exclusions")
.short("x")
.long("exclusions")
.takes_value(true)
.help("Path to exclusions file"),
)
.arg(
Arg::with_name("differential")
.short("d")
.long("differential")
.takes_value(true)
.help("Pointerspec to make a differential backup against"),
),
)
.subcommand(
SubCommand::with_name("extract")
.arg(
Arg::with_name("POINTER")
.required(true)
.index(1)
.help("Pointer, with optional :path for subextraction"),
)
.arg(
Arg::with_name("TARGET")
.required(true)
.index(2)
.help("Path to target on filesystem, or subtarget for subextraction"),
),
)
.subcommand(
SubCommand::with_name("verify")
.arg(
Arg::with_name("TREE")
.required(true)
.index(1)
.help("Path to tree / file(s) on filesystem"),
)
.arg(
Arg::with_name("POINTER")
.required(true)
.index(2)
.help("Pointerspec"),
)
.arg(
Arg::with_name("exclusions")
.short("x")
.long("exclusions")
.takes_value(true)
.help("Path to exclusions file"),
)
.arg(Arg::with_name("painfully").long("painfully").help(
"Exercise the extraction routines and compare byte-for-byte equality. (slow)",
)),
)
.get_matches();
if let Some(_submatches) = matches.subcommand_matches("init") {
if Path::new("yama.toml").exists() {
error!("Refusing to overwrite existing pile");
exit(2);
} else {
LocalPile::create(Path::new(".")).expect("Failed to initialise pile.");
}
} else if let Some(submatches) = matches.subcommand_matches("store") {
let exclusions = submatches
.value_of("exclusions")
.map(|path| Exclusions::load(Path::new(path)))
.transpose()
.expect("Failed to load exclusions.");
store_local_local(
Path::new(submatches.value_of("TREE").unwrap()),
Path::new("."), // TODO
submatches.value_of("POINTER").unwrap(),
submatches.value_of("differential"),
exclusions,
)
.expect("Problem with store_local_local");
} else if let Some(_submatches) = matches.subcommand_matches("lsp") {
list_local(Path::new(".")) // TODO
.expect("Problem with list_local");
} else if let Some(submatches) = matches.subcommand_matches("extract") {
let (pointer, subpath_opt) = parse_pointer_subpath_pair(
submatches
.value_of("POINTER")
.expect("Pointer not specified."),
);
extract_local_local(
Path::new("."), // TODO
pointer,
subpath_opt,
Path::new(submatches.value_of("TARGET").expect("No target specified.")),
)
.expect("Extraction failed");
} else if let Some(submatches) = matches.subcommand_matches("lst") {
let (pointer, subpath_opt) = parse_pointer_subpath_pair(
submatches
.value_of("POINTER")
.expect("Pointer not specified."),
);
list_local_tree(
Path::new("."), // TODO do this
pointer,
subpath_opt,
)
.expect("List local tree failed");
} else if let Some(_submatches) = matches.subcommand_matches("check") {
let result = check_local_pile(Path::new(".")) // TODO do this
.expect("Failed to check pile");
if result == 0 {
eprintln!("check OK");
} else {
eprintln!(
"check FAILED with {} faults. This is SEVERE. Your pile is corrupted. !!!",
result
);
eprintln!("!!! PILE CORRUPTED !!!");
let opts: Opts = Opts::parse();
match opts.command {
PileCommand::Store {
pointer_name, parent_name, force, destination
} => {}
PileCommand::Retrieve {
pointer_name, subset, destination
} => {}
PileCommand::Check {
gc
} => {}
PileCommand::Init {} => {
commands::init(".".as_ref()).await?;
}
} else if let Some(submatches) = matches.subcommand_matches("verify") {
let exclusions = submatches
.value_of("exclusions")
.map(|path| Exclusions::load(Path::new(path)))
.transpose()
.expect("Failed to load exclusions.");
verify_local_local(
Path::new(submatches.value_of("TREE").unwrap()),
Path::new("."), // TODO
submatches.value_of("POINTER").unwrap(),
exclusions,
submatches.is_present("painfully"),
)
.expect("Problem with verify_local_local");
}
Ok(())
}

4
src/operations.rs

@ -360,7 +360,7 @@ pub fn parse_pointer_subpath_pair(pair: &str) -> (&str, Option<&str>) {
}
/* TODO TODO TODO clean up / remove this
fn perform_storage_chunking(populate_node: &mut TreeNode, pile: &mut (dyn Pile + Send)) -> YamaResult<()> {
fn perform_storage_chunking(populate_node: &mut TreeNode, xpile: &mut (dyn Pile + Send)) -> YamaResult<()> {
let mut paths = Vec::new();
populate_node.visit_mut(&mut |tn, path| {
@ -376,7 +376,7 @@ fn perform_storage_chunking(populate_node: &mut TreeNode, pile: &mut (dyn Pile +
s.spawn(move |_| {
for (chunk_id, chunk_data) in recv.iter() {
task::block_on(async {
pile.put_chunk(&chunk_id, &chunk_data)
xpile.put_chunk(&chunk_id, &chunk_data)
});
}
});

4
src/operations/extraction_flow.rs

@ -294,7 +294,7 @@ async fn verifying_extractor(
}
/*
async fn extractor(inputs: Receiver<ExtractorInput>, pile: Box<dyn Pile>) -> Result<(), String> {
async fn extractor(inputs: Receiver<ExtractorInput>, xpile: Box<dyn Pile>) -> Result<(), String> {
let mut chunkref_stack: Vec<RecursiveChunkRef> = Vec::new();
while let Some(msg) = inputs.recv().await {
chunkref_stack.clear();
@ -304,7 +304,7 @@ async fn extractor(inputs: Receiver<ExtractorInput>, pile: Box<dyn Pile>) -> Res
.map_err(|e| e.to_string())?;
while let Some(popped_ref) = chunkref_stack.pop() {
let chunk = pile.get_chunk(&popped_ref.chunk_id).await
let chunk = xpile.get_chunk(&popped_ref.chunk_id).await
.map_err(|e| e.to_string())?
.ok_or_else(|| format!("CORRUPTION: Missing chunk {} when extracting", chunkid_to_hex(&popped_ref.chunk_id)))?;
if popped_ref.depth == 0 {

110
src/pile.rs

@ -1,51 +1,73 @@
use std::collections::HashMap;
use std::path::PathBuf;
use serde::{Serialize, Deserialize};
use crate::definitions::{ChunkId, XXHash};
use tokio::stream::Stream;
use async_trait::async_trait;
use crate::def::{ChunkId, PointerData, XXHash, YamaResult};
mod interface {
use std::borrow::Cow;
use crate::def::{ChunkId, PointerData};
pub enum PileProcessorRequest<'a> {
GetDictionary,
GetChunk {
chunk_id: ChunkId,
},
PutChunk {
chunk_id: ChunkId,
chunk_data: Cow<'a, [u8]>,
},
/* DelChunk {
chunk_id:
}, */
XXHashChunk {
chunk_id: ChunkId,
},
GetPointer {
pointer_id: Cow<'a, str>,
},
PutPointer {
pointer_id: Cow<'a, str>,
pointer_data: PointerData,
},
// DelPointer,
ListPointers,
}
pub mod local_sled;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PileDescriptor {
pub yama_version: String,
pub profiles: HashMap<String, PathBuf>,
pub remotes: HashMap<String, RemoteDescriptor>
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RemoteDescriptor {
pub encrypted: bool,
pub host: Option<String>,
pub user: Option<String>,
pub path: PathBuf
}
pub enum Keyspace {
Chunk,
ChunkHash,
Pointer
}
/// should be Cloneable to get another reference which uses the same pile.
#[async_trait]
pub trait Pile: Send + Sync {
/// Returns another handle to the Pile.
fn clone_pile_handle(&self) -> Box<dyn Pile>;
async fn get_dictionary(&self) -> YamaResult<Vec<u8>>;
async fn get_chunk(&self, chunk_id: &ChunkId) -> YamaResult<Option<Vec<u8>>>;
async fn put_chunk(&self, chunk_id: &ChunkId, data: Vec<u8>) -> YamaResult<()>;
async fn xxhash_chunk(&self, chunk_id: &ChunkId) -> YamaResult<Option<XXHash>>;
async fn get_pointer(&self, pointer_id: &str) -> YamaResult<Option<PointerData>>;
async fn put_pointer(&self, pointer_id: &str, pointer_data: PointerData) -> YamaResult<()>;
async fn list_pointers(&self) -> YamaResult<Vec<String>>;
pub trait RawPile: Send + Sync {
async fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>>;
async fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()>;
async fn list_keys(&self, kind: Keyspace) -> anyhow::Result<Box<dyn Stream<Item=Vec<u8>>>>;
async fn list_keyvalue_pairs(&self, kind: Keyspace) -> anyhow::Result<Box<dyn Stream<Item=(Vec<u8>, Vec<u8>)>>>;
async fn flush(&self) -> anyhow::Result<()>;
}
pub mod local_pile;
pub struct Pile<R: RawPile> {
pub raw_pile: R
}
impl<R: RawPile> Pile<R> {
pub fn new(raw_pile: R) -> Self {
Pile {
raw_pile
}
}
pub async fn read_chunk(&self, key: ChunkId, verify: bool) -> anyhow::Result<Option<Vec<u8>>> {
unimplemented!()
}
/// hash will be computed if not specified.
pub async fn write_chunk(&self, key: ChunkId, hash: Option<XXHash>) -> anyhow::Result<()> {
unimplemented!()
}
pub async fn read_chunk_hash(&self, key: ChunkId) -> anyhow::Result<Option<XXHash>> {
unimplemented!()
}
pub async fn read_pointer(&self, key: &str) -> anyhow::Result<Option<()>> {
unimplemented!()
}
pub async fn list_pointers(&self) -> anyhow::Result<Vec<String>> {
unimplemented!()
}
}

66
src/pile/local_sled.rs

@ -0,0 +1,66 @@
use crate::pile::{RawPile, Keyspace};
use sled::{Db, Tree};
use async_trait::async_trait;
use tokio::stream::Stream;
use std::path::Path;
pub struct LocalSledRawPile {
pub db: Db,
pub chunks_tree: Tree,
pub hashes_tree: Tree,
pub pointers_tree: Tree
}
impl LocalSledRawPile {
pub fn open(directory: &Path) -> anyhow::Result<Self> {
let sled_path = directory.join("pile.sled");
let db = sled::open(sled_path)?;
let chunks_tree = db.open_tree("chunks")?;
let hashes_tree = db.open_tree("chunkhashes")?;
let pointers_tree = db.open_tree("pointers")?;
Ok(LocalSledRawPile {
db,
chunks_tree,
hashes_tree,
pointers_tree
})
}
}
#[async_trait]
impl RawPile for LocalSledRawPile {
async fn read(&self, kind: Keyspace, key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
let retrieval = match kind {
Keyspace::Chunk => {
self.chunks_tree.get(key)?
}
Keyspace::ChunkHash => {
self.hashes_tree.get(key)?
}
Keyspace::Pointer => {
self.pointers_tree.get(key)?
}
};
// TODO(perf): wouldn't it be nice to not need to copy these?
Ok(retrieval.map(|x| x.to_vec()))
}
async fn write(&self, kind: Keyspace, key: &[u8], value: &[u8]) -> anyhow::Result<()> {
unimplemented!()
}
async fn list_keys(&self, kind: Keyspace) -> anyhow::Result<Box<dyn Stream<Item=Vec<u8>>>> {
unimplemented!()
}
async fn list_keyvalue_pairs(&self, kind: Keyspace) -> anyhow::Result<Box<dyn Stream<Item=(Vec<u8>, Vec<u8>)>>> {
unimplemented!()
}
async fn flush(&self) -> anyhow::Result<()> {
self.db.flush_async().await?;
Ok(())
}
}

0
src/def.rs → src/xdef.rs

55
src/xpile.rs

@ -0,0 +1,55 @@
use async_trait::async_trait;
use crate::def::{ChunkId, PointerData, XXHash, YamaResult};
mod interface {
use std::borrow::Cow;
use crate::def::{ChunkId, PointerData};
pub enum PileProcessorRequest<'a> {
GetDictionary,
GetChunk {
chunk_id: ChunkId,
},
PutChunk {
chunk_id: ChunkId,
chunk_data: Cow<'a, [u8]>,
},
/* DelChunk {
chunk_id:
}, */
XXHashChunk {
chunk_id: ChunkId,
},
GetPointer {
pointer_id: Cow<'a, str>,
},
PutPointer {
pointer_id: Cow<'a, str>,
pointer_data: PointerData,
},
// DelPointer,
ListPointers,
}
}
/// should be Cloneable to get another handle to the xpile.
#[async_trait]
pub trait Pile: Send + Sync + Clone {
/// Returns another handle to the Pile.
async fn get_dictionary(&self) -> YamaResult<Vec<u8>>;
async fn get_chunk(&self, chunk_id: &ChunkId) -> YamaResult<Option<Vec<u8>>>;
async fn put_chunk(&self, chunk_id: &ChunkId, data: &[u8], xxhash: &[u8]) -> YamaResult<()>;
async fn get_xxhash_chunk(&self, chunk_id: &ChunkId) -> YamaResult<Option<Vec<u8>>>;
async fn get_pointer(&self, pointer_id: &str) -> YamaResult<Option<PointerData>>;
async fn put_pointer(&self, pointer_id: &str, pointer_data: PointerData) -> YamaResult<()>;
async fn list_pointers(&self) -> YamaResult<Vec<String>>;
}
#[async_trait]
pub trait RawPile: Send + Sync + Clone {
async fn
}
pub mod local_pile;

2
src/pile/local_pile.rs → src/xpile/local_pile.rs

@ -15,7 +15,7 @@ use lmdb::{Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, Trans
use async_trait::async_trait;
use crate::def::{ChunkId, PointerData, XXHash, YamaResult, XXH64_SEED};
use crate::pile::Pile;
use crate::xpile::Pile;
use serde::{Deserialize, Serialize};
use std::thread::JoinHandle;
Loading…
Cancel
Save