Browse Source

Works for one-off and incremental store/extract

develop
commit
6948ba3821
  1. 9
      .gitignore
  2. 1230
      Cargo.lock
  3. 30
      Cargo.toml
  4. 94
      README.md
  5. BIN
      example_zstd.dict
  6. 251
      src/chunking.rs
  7. 313
      src/def.rs
  8. 151
      src/main.rs
  9. 327
      src/operations.rs
  10. 339
      src/operations/chunking_flow.rs
  11. 223
      src/operations/extraction_flow.rs
  12. 53
      src/pile.rs
  13. 405
      src/pile/local_pile.rs
  14. 257
      src/tree.rs
  15. 0
      src/util.rs

9
.gitignore

@ -0,0 +1,9 @@
/target
**/*.rs.bk
/.idea
/yama.iml
/.project
/.gdb_history

1230
Cargo.lock
File diff suppressed because it is too large
View File

30
Cargo.toml

@ -0,0 +1,30 @@
[package]
name = "yama"
version = "0.0.1"
authors = ["Ollie"]
edition = "2018"
description = "Deduplicated content pile repository manager"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
fastcdc = "1.0.2"
zstd = "0.5.1"
sshish = "0.1.0"
clap = "2.33.0"
lmdb-rkv = "0.12.3"
futures = "0.3.1"
async-std = { version = "1.4.0", features = ["unstable"] }
blake = "2.0.0"
twox-hash = "1.5.0"
async-trait = "0.1.22"
serde = { version = "1.0.104", features = ["derive"] }
#serde_derive = "1.0.104"
serde_cbor = "0.8.2"
users = "0.9.1"
crossbeam = "0.7.3"
toml = "0.5.5"
glob = "0.3.0"
nix = "0.17.0"
log = "0.4"
env_logger = "0.7.1"

94
README.md

@ -0,0 +1,94 @@
# 山 (yama): deduplicated heap repository
note: this readme is not yet updated to reality…
## Subcommands
### `check`: Check repository for consistency
Verifies the full repository satisfies the following consistency constraints:
- all chunks have the correct hash
- all pointers have a valid structure, recursively
Usage: `yama check [--gc]`
The amount of space occupied and occupied by unused chunks is reported.
If `--gc` is specified, unused chunks will be removed.
### `lsp`: List tree pointers
Usage: `yama lsp`
### `rmp`: Remove tree pointers
Usage: `yama rmp pointer/path [--force]`
If `--force` is not specified and the pointer is depended upon by another, then deletion is aborted with an error.
### `store`: Store tree into repository
Usage: `yama store [--dry-run] [ssh://user@host]/path/to/dir pointer/path [--exclusions path/to/exclusions.txt] [--differential pointer/parent]`
The pointer must not exist and it will be created. If `--differential` is specified with an existing parent pointer, then the diretory listing is specified as a differential list to the parent.
The intention of this is to reduce the size of the directory list.
#### Exclusion lists
Exclusion lists have pretty much the same format as `.gitignore`, one glob per line of files to not include, relative to the tree root.
### `extract`: Extract file(s) from repository
Usage: `yama extract [--dry-run] pointer/path[:path] [ssh://user@host]/path/to/local/dir[/]`
If no path specified, extract root /. Trailing slash means that the file will be extracted as a child of the specified directory.
### `remote`: Run operations on a remote repository
Usage: `yama remote ssh://user@host/path/to/repo <subcommand>`
#### remote `store`: Store local tree into remote repository
Usage is identical to `yama store` except store path must be local.
#### remote `extract`: Extract remote repository into local tree
Usage is identical to `yama extract` except target path must be local.
### `slave`: Remote-controlled yama
Communicates over stdin/stdout to perform specified operations. Used when a yama command involves SSH.
## Repository Storage Details
Pointers are stored in `pointers.lmdb` and chunks are stored in `chunks.lmdb`.
It is expected that exclusion files will be kept in the same directory with the repository, if they are to be used
on a recurring basis.
Chunks are compressed with `zstd`. It must first be trained and a training dictionary placed in `repo root/zstd.dict`.
**This dictionary file must not be lost or altered after chunks have been made using it. Doing so will void the integrity of the entire repository.**
Chunks are hashed with BLAKE256, and chunks will have their xxHash calculated before being deduplicated away. (Collision being detected will result in abortion of the backup. It is expected to never happen but nevertheless we may not be sure.)
## Remote Protocol Details
* Compression is performed on the host where the data resides.
* Only required chunks are compressed and diffused across the SSH connection.
* There needs to be some mechanism to offer, decline and accept chunks, without buffers overflowing and bringing hosts down.
## Processor Details
## Other notes
`zstd --train FILEs -o zstd.dict`
* Candidate size: `find ~/Programming -size -4k -size +64c -type f -exec grep -Iq . {} \; -printf "%s\n" | jq -s 'add'`
* Want to sample:
* `find ~/Programming -size -4k -size +64c -type f -exec grep -Iq . {} \; -exec cp {} -t /tmp/d/ \;`
* `du -sh`
* `find > file.list`
* `wc -l < file.list` → gives a № lines
* `shuf -n 4242 file.list | xargs -x zstd --train -o zstd.dict` for 4242 files. Chokes if it receives a filename with a space, just re-run until you get a working set.

BIN
example_zstd.dict

251
src/chunking.rs

@ -0,0 +1,251 @@
use crate::def::{YamaResult, ChunkId, RecursiveChunkRef};
use fastcdc::FastCDC;
use crate::pile::Pile;
use std::fmt::Write;
use std::mem;
use zstd::block::Decompressor;
use futures::future::BoxFuture;
use futures::FutureExt;
pub const SENSIBLE_THRESHOLD: usize = 1024 * 1024;
// 256 kiB
pub const FASTCDC_MIN: usize = 256 * 1024;
// 1 MiB
pub const FASTCDC_AVG: usize = 1024 * 1024;
// 8 MiB
pub const FASTCDC_MAX: usize = 8 * 1024 * 1024;
pub struct RecursiveChunker {
deeper_chunker: Option<Box<RecursiveChunker>>,
unchunked: Vec<u8>,
threshold: usize,
}
impl RecursiveChunker {
pub fn new(recurse_threshold: usize) -> Self {
RecursiveChunker {
deeper_chunker: None,
unchunked: Vec::new(),
threshold: recurse_threshold,
}
}
/// finalise: true iff this is the last chunk (we will not reject a chunk which may have been
/// truncated)
fn do_chunking<F>(&mut self, chunk_output: &mut F, finalise: bool) -> YamaResult<Vec<u8>>
where F: FnMut(&[u8]) -> YamaResult<ChunkId> {
let fastcdc = FastCDC::new(&self.unchunked, FASTCDC_MIN, FASTCDC_AVG, FASTCDC_MAX);
let mut new_chunks: Vec<u8> = Vec::new();
let mut consumed_until: Option<usize> = None;
//let deeper = self.deeper_chunker.as_mut().expect("Deeper chunker must be present");
for chunk in fastcdc {
let is_final = chunk.offset + chunk.length == self.unchunked.len();
if !is_final || finalise {
consumed_until = Some(chunk.offset + chunk.length);
let chunk_id = chunk_output(&self.unchunked[chunk.offset..chunk.offset + chunk.length])?;
new_chunks.extend_from_slice(&chunk_id);
}
}
if let Some(consumed_until) = consumed_until {
if consumed_until > 0 {
self.unchunked.drain(0..consumed_until);
}
}
Ok(new_chunks)
}
pub fn write<F>(&mut self, data: &[u8], chunk_output: &mut F) -> YamaResult<()>
where F: FnMut(&[u8]) -> YamaResult<ChunkId> {
self.unchunked.extend_from_slice(&data);
if self.unchunked.len() > self.threshold {
if self.deeper_chunker.is_none() {
// start chunking
self.deeper_chunker = Some(Box::new(RecursiveChunker::new(self.threshold)));
}
let new_chunks = self.do_chunking(chunk_output, false)?;
self.deeper_chunker.as_mut().unwrap()
.write(&new_chunks, chunk_output)?;
}
Ok(())
}
pub fn finish<F>(mut self, chunk_output: &mut F) -> YamaResult<RecursiveChunkRef>
where F: FnMut(&[u8]) -> YamaResult<ChunkId> {
if self.deeper_chunker.is_some() {
// we are chunking so make this the last chunk
let new_chunks = self.do_chunking(chunk_output, true)?;
let mut deeper = self.deeper_chunker.unwrap();
deeper.write(&new_chunks, chunk_output)?;
let mut rcr = deeper.finish(chunk_output)?;
// as there is a level of chunking, increase the depth
rcr.depth += 1;
Ok(rcr)
} else {
// no chunking, so depth=0 (raw) and just emit our unchunked data
let chunk_id = chunk_output(&self.unchunked)?;
Ok(RecursiveChunkRef {
chunk_id,
depth: 0,
})
}
}
}
pub fn chunkid_to_hex(chunkid: &ChunkId) -> String {
let mut s = String::new();
for &byte in chunkid.iter() {
write!(&mut s, "{:02x}", byte).expect("Unable to write");
}
s
}
pub struct RecursiveExtractor {
deeper_extractor: Option<Box<RecursiveExtractor>>,
chunk_id_queue: Vec<u8>,
chunk_id_offset: usize,
}
impl RecursiveExtractor {
pub fn new(mut chunkref: RecursiveChunkRef) -> Self {
if chunkref.depth == 0 {
RecursiveExtractor {
deeper_extractor: None,
chunk_id_queue: chunkref.chunk_id.to_vec(),
chunk_id_offset: 0
}
} else {
chunkref.depth -= 1;
RecursiveExtractor {
deeper_extractor: Some(Box::new(RecursiveExtractor::new(chunkref))),
chunk_id_queue: Vec::new(),
chunk_id_offset: 0
}
}
}
pub async fn read_next(&mut self, pile: &mut Box<dyn Pile>) -> YamaResult<Option<Vec<u8>>> {
let mut ztd_decompressor = zstd::block::Decompressor::with_dict(pile.get_dictionary().await?);
self.read_next_int(pile, &mut ztd_decompressor).await
}
// the 'a is crucially important
fn read_next_int_bf<'a>(&'a mut self, pile: &'a mut Box<dyn Pile>, ztd_decompressor: &'a mut Decompressor) -> BoxFuture<'a, YamaResult<Option<Vec<u8>>>> {
async move {
self.read_next_int(pile, ztd_decompressor).await
}.boxed()
}
async fn read_next_int(&mut self, pile: &mut Box<dyn Pile>, ztd_decompressor: &mut Decompressor) -> YamaResult<Option<Vec<u8>>> {
let mut next_chunk_id: ChunkId = Default::default();
let chunkid_len = next_chunk_id.len();
if self.chunk_id_queue.len() - self.chunk_id_offset < chunkid_len {
if let Some(extractor) = &mut self.deeper_extractor {
self.chunk_id_queue.drain(0..self.chunk_id_offset);
self.chunk_id_offset = 0;
if let Some(more_chunk_ids) = extractor.read_next_int_bf(pile, ztd_decompressor).await? {
self.chunk_id_queue.extend(more_chunk_ids);
} else if self.chunk_id_queue.is_empty() {
return Ok(None);
} else {
return Err("Partial chunk ID left over, deeper dried out, this is dodgy :/".into());
}
} else if self.chunk_id_queue.len() - self.chunk_id_offset == 0 {
return Ok(None);
} else {
return Err("Partial chunk ID left over, no deeper, this is dodgy :/".into());
}
}
if self.chunk_id_queue.len() - self.chunk_id_offset < chunkid_len {
return Err("Partial chunk ID left over, already tried refill, this is dodgy :/".into());
}
let cio = self.chunk_id_offset;
next_chunk_id.copy_from_slice(&self.chunk_id_queue[cio..cio+chunkid_len]);
self.chunk_id_offset += chunkid_len;
let next_chunk = pile.get_chunk(&next_chunk_id).await?;
if let Some(chunk) = next_chunk {
let chunk = ztd_decompressor.decompress(&chunk,4 * FASTCDC_MAX)
.map_err(|e| e.to_string())?;
Ok(Some(chunk))
} else {
Err(format!("Invalid reference: no chunk {}", chunkid_to_hex(&next_chunk_id)).into())
}
}
}
/* can't figure it out
pub async fn unchunk_to_end(recursive_chunk_ref: RecursiveChunkRef, pile: &mut (dyn Pile + Send)) -> YamaResult<Vec<u8>> {
let mut buf = Vec::new();
let mut pile_mutex = Arc::new(Mutex::new(pile));
unchunk_to_end_internal(&recursive_chunk_ref.chunk_id, recursive_chunk_ref.depth, pile_mutex, &mut buf).await?;
Ok(buf)
}
fn unchunk_to_end_internal(chunk_id: &ChunkId, depth: u32, pile_mutex: Arc<Mutex<&mut (dyn Pile + Send)>>, buf: &mut Vec<u8>) -> BoxFuture<'static, Result<(), String>> {
async move {
let chunk = {
let pile = pile_mutex.lock().expect("Poisoned Pile");
pile.get_chunk(chunk_id).await
.map_err(|e| e.to_string())?
.ok_or_else(|| format!("Invalid reference: no chunk {}", chunkid_to_hex(chunk_id)))?
};
if depth == 0 {
buf.extend_from_slice(&chunk);
} else {
let mut sub_chunk_id: ChunkId = Default::default();
for sub_chunk_id_slice in chunk.chunks(sub_chunk_id.len()) {
sub_chunk_id.clone_from_slice(sub_chunk_id_slice);
unchunk_to_end_internal(&sub_chunk_id, depth - 1, pile_mutex, buf).await?;
}
}
Ok(())
}.boxed()
}
*/
pub async fn unchunk_to_end(recursive_chunk_ref: RecursiveChunkRef, pile: &mut dyn Pile) -> YamaResult<Vec<u8>> {
// can't do recursive async so we get creative
let mut prev_buf = Vec::new();
let mut next_buf = Vec::new();
next_buf.extend_from_slice(&recursive_chunk_ref.chunk_id);
let mut ztd_decompressor = zstd::block::Decompressor::with_dict(pile.get_dictionary().await?);
for _ in 0..recursive_chunk_ref.depth+1 {
mem::swap(&mut prev_buf, &mut next_buf);
next_buf.clear();
let mut sub_chunk_id: ChunkId = Default::default();
for sub_chunk_id_slice in prev_buf.chunks(sub_chunk_id.len()) {
sub_chunk_id.clone_from_slice(sub_chunk_id_slice);
let chunk = pile.get_chunk(&sub_chunk_id).await
.map_err(|e| e.to_string())?
.ok_or_else(|| format!("Invalid reference: no chunk {}", chunkid_to_hex(&sub_chunk_id)))?;
let chunk = ztd_decompressor.decompress(&chunk, 4 * FASTCDC_MAX)?;
next_buf.extend_from_slice(&chunk);
}
}
Ok(next_buf)
}
#[inline]
pub fn calculate_chunkid(chunk: &[u8]) -> ChunkId {
let mut chunk_id: ChunkId = Default::default();
blake::hash(256, &chunk, &mut chunk_id)
.expect("BLAKE problem");
chunk_id
}

313
src/def.rs

@ -0,0 +1,313 @@
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashSet};
use glob::{Pattern, MatchOptions};
use std::convert::TryFrom;
use std::path::Path;
use std::fs::File;
use std::io::{BufReader, BufRead};
pub type ChunkId = [u8; 32];
pub type XXHash = u64;
pub const XXH64_SEED: u64 = 424242;
pub type YamaResult<T> = Result<T, Box<dyn std::error::Error>>;
// yet unused: pub type YamaResultSend<T> = Result<T, Box<dyn std::error::Error + Send>>;
//pub type YamaResultA<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PointerData {
pub chunk_ref: RecursiveChunkRef,
pub parent_pointer: Option<String>,
pub uid_lookup: BTreeMap<u16, String>,
pub gid_lookup: BTreeMap<u16, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RecursiveChunkRef {
pub chunk_id: ChunkId,
pub depth: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TreeNode {
#[serde(rename="n")]
pub name: String,
//#[serde(flatten)]
#[serde(rename="c")]
pub content: TreeNodeContent,
}
impl TreeNode {
/// whether the metadata invalidates these two nodes being equal, thus requiring a backup
pub fn metadata_invalidates(&self, other: &TreeNode, check_name: bool) -> bool {
if check_name {
self.name != other.name || self.content.metadata_invalidates(&other.content)
} else {
self.content.metadata_invalidates(&other.content)
}
}
/// Guarantees consistent visit order.
pub fn visit_mut<F>(&mut self, visitor: &mut F, path_prefix: &str) -> YamaResult<()>
where F: FnMut(&mut Self, &str) -> YamaResult<()> {
let mut my_path_buf = String::new();
my_path_buf.push_str(path_prefix);
if !my_path_buf.is_empty() {
my_path_buf.push('/');
}
my_path_buf.push_str(&self.name);
visitor(self, &my_path_buf)?;
if let TreeNodeContent::Directory { children, .. } = &mut self.content {
for child in children.iter_mut() {
child.visit_mut(visitor, &my_path_buf)?;
}
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub enum TreeNodeContent {
NormalFile {
// modification time in ms
#[serde(rename="m")]
mtime: u64,
#[serde(flatten)]
#[serde(rename="o")]
ownership: FilesystemOwnership,
#[serde(flatten)]
#[serde(rename="p")]
permissions: FilesystemPermissions,
// TODO size: u64 or not
// can perhaps cache chunk-wise (but not sure.)
#[serde(rename="c")]
content: RecursiveChunkRef,
},
Directory {
#[serde(flatten)]
#[serde(rename="o")]
ownership: FilesystemOwnership,
#[serde(flatten)]
#[serde(rename="p")]
permissions: FilesystemPermissions,
#[serde(rename="C")]
children: Vec<TreeNode>,
},
SymbolicLink {
#[serde(flatten)]
#[serde(rename="o")]
ownership: FilesystemOwnership,
#[serde(rename="t")]
target: String,
},
// TODO is there any other kind of file we need to store?
Deleted,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FilesystemOwnership {
pub uid: u16,
pub gid: u16,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FilesystemPermissions {
pub mode: u32
}
impl TreeNodeContent {
pub fn metadata_invalidates(&self, other: &TreeNodeContent) -> bool {
match self {
TreeNodeContent::NormalFile { mtime, ownership, permissions, .. } => {
if let TreeNodeContent::NormalFile {
mtime: other_mtime,
ownership: other_ownership,
permissions: other_permissions,
..
} = other {
mtime != other_mtime || ownership != other_ownership || permissions != other_permissions
} else {
true
}
}
TreeNodeContent::Directory { ownership, permissions, children } => {
if let TreeNodeContent::Directory {
ownership: other_ownership,
permissions: other_permissions,
children: other_children
} = other {
if ownership != other_ownership || permissions != other_permissions {
return true;
}
children.iter().zip(other_children.iter()).any(|(left, right)| {
left.metadata_invalidates(right, true)
})
} else {
true
}
}
TreeNodeContent::SymbolicLink { ownership, target } => {
if let TreeNodeContent::SymbolicLink {
ownership: other_ownership,
target: other_target
} = other {
ownership != other_ownership || target != other_target
} else {
true
}
}
TreeNodeContent::Deleted => {
// unreachable
false
}
}
}
}
pub struct Exclusions {
pub rules: Vec<ExclusionRule>
}
impl Exclusions {
pub fn load(path: &Path) -> YamaResult<Exclusions> {
let file = File::open(path)?;
let bufreader = BufReader::new(file);
let mut rules = Vec::new();
for line in bufreader.lines() {
let line = line?;
let trim_line = line.trim();
if !trim_line.is_empty() {
rules.push(ExclusionRule::try_from(trim_line)?);
}
}
Ok(Exclusions {
rules
})
}
}
pub struct ExclusionRule {
pub glob: Pattern,
pub effect: Option<String>,
pub negated: bool,
}
impl TryFrom<&str> for ExclusionRule {
type Error = String;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let mut effect = None;
let mut negated = false;
let mut glob_str = value;
let split: Vec<&str> = value.splitn(2, "?⇒").collect();
if split.len() == 2 {
// this is a conditional rule
glob_str = split[0].trim();
effect = Some(split[0].trim().to_owned());
}
if glob_str.starts_with("!") {
negated = true;
glob_str = &glob_str[1..];
}
Ok(ExclusionRule {
glob: Pattern::new(glob_str)
.map_err(|e| e.to_string())?,
effect,
negated,
})
}
}
impl Exclusions {
pub fn apply_to(&self, node: &mut TreeNode) -> YamaResult<()> {
self.apply_to_rec(node, "", &mut HashSet::new())
}
fn apply_to_rec(&self, node: &mut TreeNode, path_rel: &str, exclusions: &mut HashSet<String>) -> YamaResult<()> {
let match_options = MatchOptions {
case_sensitive: true,
require_literal_separator: true,
require_literal_leading_dot: false,
};
if let TreeNodeContent::Directory {
ref mut children, ..
} = node.content {
let mut child_pathrel = String::new();
for rule in self.rules.iter() {
for child in children.iter() {
child_pathrel.clear();
child_pathrel.push_str(path_rel);
child_pathrel.push('/');
child_pathrel.push_str(&child.name);
if rule.glob.matches_with(&child_pathrel, match_options) {
if let Some(relative_effect) = &rule.effect {
let mut path_pieces: Vec<&str> = child_pathrel.split("/")
.skip(1).collect();
let relative_pieces = relative_effect.split("/");
for (idx, relpiece) in relative_pieces.enumerate() {
match relpiece {
"" => {
if idx == 0 {
// this is an absolute path
// doubt we will use this feature much :/
path_pieces.clear();
path_pieces.push("");
}
}
"." => {/* nop */},
".." => {
if path_pieces.len() > 1 {
path_pieces.pop();
}
},
other => {
path_pieces.push(other);
}
}
}
child_pathrel = path_pieces.join("/");
}
if rule.negated {
exclusions.remove(&child_pathrel);
} else {
exclusions.insert(child_pathrel.clone());
}
}
}
}
// filter out excluded children
children.retain(|child| {
child_pathrel.clear();
child_pathrel.push_str(path_rel);
child_pathrel.push('/');
child_pathrel.push_str(&child.name);
!exclusions.contains(&child_pathrel)
});
for child in children.iter_mut() {
child_pathrel.clear();
child_pathrel.push_str(path_rel);
child_pathrel.push('/');
child_pathrel.push_str(&child.name);
self.apply_to_rec(child, &child_pathrel, exclusions)?;
}
}
Ok(())
}
}

151
src/main.rs

@ -0,0 +1,151 @@
use clap::{crate_authors, crate_description, crate_version, App, Arg, SubCommand};
use crate::operations::{store_local_local, list_local, parse_pointer_subpath_pair, extract_local_local, list_local_tree};
use std::path::Path;
use crate::pile::local_pile::LocalPile;
use crate::def::Exclusions;
use std::process::exit;
use log::{error};
mod def;
mod pile;
mod tree;
mod chunking;
mod operations;
mod util;
fn main() {
env_logger::init();
let matches = App::new("山")
.version(crate_version!())
.author(crate_authors!())
.about(crate_description!())
/*.arg(
Arg::with_name("v")
.short("v")
.multiple(true)
.help("Sets the level of verbosity"),
)*/
.subcommand(
SubCommand::with_name("init")
.about("creates a new pile in the current working directory")
)
.subcommand(
SubCommand::with_name("check")
.about("checks the consistency of this repository")
.arg(
Arg::with_name("debug")
.short("v")
.help("print debug information verbosely"),
)
.arg(
Arg::with_name("gc")
.long("gc")
.help("Remove unused chunks to free up space."),
),
)
.subcommand(SubCommand::with_name("lsp").about("List tree pointers"))
.subcommand(
SubCommand::with_name("lst")
.arg(
Arg::with_name("POINTER")
.required(true)
.index(1)
.help("Pointer, with optional :path for subtree")
)
.about("List contents of (sub-)tree")
)
.subcommand(
SubCommand::with_name("store")
.arg(
Arg::with_name("TREE")
.required(true)
.index(1)
.help("Path to tree / file(s) on filesystem")
)
.arg(
Arg::with_name("POINTER")
.required(true)
.index(2)
.help("Pointerspec")
)
.arg(
Arg::with_name("exclusions")
.short("x")
.long("exclusions")
.takes_value(true)
.help("Path to exclusions file")
)
.arg(
Arg::with_name("differential")
.short("d")
.long("differential")
.takes_value(true)
.help("Pointerspec to make a differential backup against")
))
.subcommand(
SubCommand::with_name("extract")
.arg(
Arg::with_name("POINTER")
.required(true)
.index(1)
.help("Pointer, with optional :path for subextraction")
)
.arg(
Arg::with_name("TARGET")
.required(true)
.index(2)
.help("Path to target on filesystem, or subtarget for subextraction")
)
)
.get_matches();
if let Some(_submatches) = matches.subcommand_matches("init") {
if Path::new("yama.toml").exists() {
error!("Refusing to overwrite existing pile");
exit(2);
} else {
LocalPile::create(Path::new("."))
.expect("Failed to initialise pile.");
}
} else if let Some(submatches) = matches.subcommand_matches("store") {
let exclusions = submatches.value_of("exclusions")
.map(|path| {
Exclusions::load(Path::new(path))
})
.transpose()
.expect("Failed to load exclusions.");
store_local_local(
Path::new(submatches.value_of("TREE").unwrap()),
Path::new("."), // TODO
submatches.value_of("POINTER").unwrap(),
submatches.value_of("differential"),
exclusions
).expect("Problem with store_local_local");
} else if let Some(_submatches) = matches.subcommand_matches("lsp") {
list_local(Path::new(".")) // TODO
.expect("Problem with list_local");
} else if let Some(submatches) = matches.subcommand_matches("extract") {
let (pointer, subpath_opt) = parse_pointer_subpath_pair(submatches.value_of("POINTER")
.expect("Pointer not specified."));
extract_local_local(
Path::new("."), // TODO
pointer,
subpath_opt,
Path::new(submatches.value_of("TARGET")
.expect("No target specified."))
)
.expect("Extraction failed");
} else if let Some(submatches) = matches.subcommand_matches("lst") {
let (pointer, subpath_opt) = parse_pointer_subpath_pair(submatches.value_of("POINTER")
.expect("Pointer not specified."));
list_local_tree(
Path::new("."), // TODO do this
pointer,
subpath_opt
)
.expect("List local tree failed");
} else if let Some(_submatches) = matches.subcommand_matches("check") {
unimplemented!()
}
}

327
src/operations.rs

@ -0,0 +1,327 @@
use crate::def::{YamaResult, Exclusions, PointerData, TreeNode, TreeNodeContent, RecursiveChunkRef};
use std::path::Path;
use crate::pile::{local_pile::LocalPile, Pile};
use futures::executor::block_on;
use crate::chunking::{unchunk_to_end, chunkid_to_hex};
use crate::tree::{differentiate_node, create_uidgid_lookup_tables};
use std::collections::BTreeMap;
use std::collections::btree_map::Entry;
use async_std::task;
use crate::operations::chunking_flow::ChunkingFlowConfiguration;
use std::fs::File;
use std::io::Write;
use crate::operations::extraction_flow::ExtractionFlowConfiguration;
use log::{info};
mod chunking_flow;
mod extraction_flow;
pub fn store_local_local(tree: &Path, pile: &Path, pointer: &str, parent_pointer: Option<&str>, exclusions: Option<Exclusions>) -> YamaResult<()> {
info!("Going to perform dir scan, might take a while …");
let mut dir_scan = crate::tree::scan(tree)?
.ok_or("No scan performed; does the file exist")?;
info!("Dir scan completed.");
if let Some(exclusions) = exclusions {
exclusions.apply_to(&mut dir_scan)?;
}
let mut local_pile = LocalPile::open(pile)?;
if let Some(parent_pointer) = parent_pointer {
let (_parent_pointer_data, parent_node) = load_integrated_pointer(parent_pointer, &mut local_pile)?;
dir_scan = differentiate_node(dir_scan, &parent_node)?;
}
// eprintln!("dir scan: {:#?}", dir_scan);
let mut uids = BTreeMap::new();
let mut gids = BTreeMap::new();
create_uidgid_lookup_tables(&dir_scan, &mut uids, &mut gids)?;
// eprintln!("uids: {:#?}", uids);
// eprintln!("gids: {:#?}", gids);
// now perform the actual chunking
let pointer_ref: RecursiveChunkRef = {
let tree = tree.to_owned();
let local_pile = local_pile.clone_pile_handle();
task::block_on(async {
ChunkingFlowConfiguration::default()
.run(tree, dir_scan, local_pile).await
.map_err(|e| e.to_string())
})?
};
info!("chunking finished");
fn map_identity<K, V>((k, v): (K, Option<V>)) -> Option<(K, V)> {
match v {
None => None,
Some(v) => Some((k, v)),
}
}
task::block_on(local_pile.put_pointer(pointer, PointerData {
chunk_ref: pointer_ref,
parent_pointer: parent_pointer.map(|s| s.to_owned()),
uid_lookup: uids.into_iter().filter_map(map_identity).collect(),
gid_lookup: gids.into_iter().filter_map(map_identity).collect(),
}))?;
info!("Pointer updated. store operation COMPLETED.");
Ok(())
}
pub fn extract_local_local(pile: &Path, pointer: &str, subpath_opt: Option<&str>, target: &Path)
-> YamaResult<()> {
let mut local_pile = LocalPile::open(pile)?;
let (_pointer_data, mut node) = load_integrated_pointer(pointer, &mut local_pile)?;
if let Some(subpath) = subpath_opt {
node = get_subnode_of(node, subpath)?;
}
// TODO(0.3 onwards) what about uid and gid lookup maps?
// probably want to offer the choice of id-based and name-based extraction, with fallbacks
// in both ways.
task::block_on(async {
ExtractionFlowConfiguration::default()
.run(node, target.to_path_buf(), local_pile.clone_pile_handle()).await
.map_err(|e| e.to_string())
})?;
Ok(())
}
pub fn list_local_tree(pile: &Path, pointer: &str, subpath_opt: Option<&str>)
-> YamaResult<()> {
let mut local_pile = LocalPile::open(pile)?;
let (_pointer_data, mut node) = load_integrated_pointer(pointer, &mut local_pile)?;
if let Some(subpath) = subpath_opt {
node = get_subnode_of(node, subpath)?;
}
print_node_recursive(&mut node)?;
Ok(())
}
fn get_subnode_of(mut node: TreeNode, subpath: &str) -> YamaResult<TreeNode> {
for component in subpath.split("/") {
if component == "" {
continue;
}
if let TreeNodeContent::Directory {
children, ..
} = node.content {
let subchild = children
.into_iter()
.filter(|node| node.name == component)
.next();
node = subchild.ok_or("Subextraction path does not exist; check your subextraction path for errors.")?;
} else {
return Err("Cannot subextract directory; check your subextraction path for errors.".into());
}
}
Ok(node)
}
pub fn list_local(pile: &Path) -> YamaResult<()> {
let local_pile = LocalPile::open(pile)?;
for pointer in task::block_on(local_pile.list_pointers())?.iter() {
println!("{}", pointer);
}
Ok(())
}
fn print_node_recursive(tree_node: &mut TreeNode) -> YamaResult<()> {
tree_node.visit_mut(&mut |node, path| {
eprint!("{} ", path);
match &node.content {
TreeNodeContent::NormalFile { content, .. } => {
eprintln!("file @ {} (depth={})",
chunkid_to_hex(&content.chunk_id), content.depth);
}
TreeNodeContent::Directory { .. } => {
eprintln!("directory");
}
TreeNodeContent::SymbolicLink { target, .. } => {
eprintln!("symlink → {}", target);
}
TreeNodeContent::Deleted => {
eprintln!("deleted");
}
}
Ok(())
}, "")
}
fn load_integrated_pointer(pointer_path: &str, pile: &mut (dyn Pile + Send)) -> YamaResult<(PointerData, TreeNode)> {
// TODO refactor this, as it needs recursive integration logic,
// perhaps up to a specified integration depth (or maybe just make integration occur
// when dependents are about to be removed from the database).
// TODO Is this^ TODO still applicable?
let result: Result<(PointerData, TreeNode), String> = block_on(async {
let pointer_data = pile.get_pointer(pointer_path).await
.map_err(|e| e.to_string() + " (lip)")?
.ok_or_else(|| "Invalid pointer reference: ".to_owned() + pointer_path)?;
let node_bytes = unchunk_to_end(
pointer_data.chunk_ref.clone(),
pile,
).await.map_err(|e| e.to_string() + " (lip:ute)")?;
File::create("/tmp/lip_data").expect("create debug file")
.write_all(&node_bytes).expect("write debug file"); // TODO DEBUG
let node: TreeNode = serde_cbor::de::from_slice(&node_bytes)
.map_err(|e| e.to_string() + " (lip:cbor)")?;
Ok((pointer_data, node))
});
let (mut pointer_data, mut node) = result?;
if let Some(parent_pointer) = pointer_data.parent_pointer.as_ref() {
let (parent_pointer_data, parent_node) = load_integrated_pointer(parent_pointer, pile)?;
integrate_node(&mut node, &parent_node);
// integrate UID and GID maps
let mut uid_lookup = parent_pointer_data.uid_lookup;
let mut gid_lookup = parent_pointer_data.gid_lookup;
uid_lookup.extend(pointer_data.uid_lookup);
gid_lookup.extend(pointer_data.gid_lookup);
pointer_data.uid_lookup = uid_lookup;
pointer_data.gid_lookup = gid_lookup;
}
Ok((pointer_data, node))
}
fn integrate_node(new: &mut TreeNode, old: &TreeNode) {
if let TreeNodeContent::Directory { children: old_children, .. } = &old.content {
if let TreeNodeContent::Directory {
children, ..
} = &mut new.content {
let mut map = BTreeMap::new();
while !children.is_empty() {
let treenode = children.remove(children.len() - 1);
map.insert(treenode.name.clone(), treenode);
}
for old_child in old_children {
match map.entry(old_child.name.clone()) {
Entry::Vacant(vac) => {
vac.insert(old_child.clone());
}
Entry::Occupied(mut occ) => {
integrate_node(occ.get_mut(), old_child);
}
}
}
for (_, child) in map.into_iter() {
children.push(child);
}
}
}
}
pub fn parse_pointer_subpath_pair(pair: &str) -> (&str, Option<&str>) {
let splitted: Vec<&str> = pair.splitn(2, ":").collect();
if splitted.len() == 2 {
(splitted[0], Some(splitted[1]))
} else {
assert_eq!(splitted.len(), 1);
(splitted[0], None)
}
}
/* TODO TODO TODO clean up / remove this
fn perform_storage_chunking(populate_node: &mut TreeNode, pile: &mut (dyn Pile + Send)) -> YamaResult<()> {
let mut paths = Vec::new();
populate_node.visit_mut(&mut |tn, path| {
if let TreeNodeContent::NormalFile { .. } = tn.content {
paths.push(path.to_owned());
}
Ok(())
}, "")?;
let (send, recv) = mpsc::sync_channel::<(ChunkId, Vec<u8>)>(256);
let mut chunkrefs_for_paths = crossbeam::scope(|s| {
s.spawn(move |_| {
for (chunk_id, chunk_data) in recv.iter() {
task::block_on(async {
pile.put_chunk(&chunk_id, &chunk_data)
});
}
});
let mut chunkrefs_for_paths: BTreeMap<usize, Option<RecursiveChunkRef>> = {
let send_mutex = Mutex::new(send);
paths.par_iter()
.enumerate()
.map(|(idx, path)| {
let sender = {
send_mutex
.lock()
.expect("poisoned")
.clone()
};
// hmm how to handle tihs
// we don't want to compress chunks if we already have them
// so we can't compress them in here; have to do in background task.
//chunk_file(path, sender)?;
unimplemented!();
(idx, Some(RecursiveChunkRef {
chunk_id: Default::default(),
depth: 0,
}))
})
.collect::<BTreeMap<usize, Option<RecursiveChunkRef>>>()
};
chunkrefs_for_paths
}).expect("crossbeam fail :S");
let mut index = 0;
populate_node.visit_mut(&mut |tn, path| {
if let TreeNodeContent::NormalFile { ref mut content, .. } = &mut tn.content {
assert_eq!(&paths[index], path);
match chunkrefs_for_paths.remove(&index).expect("Chunkref not there??") {
Some(chunkref) => {
*content = chunkref;
}
None => {
tn.content = TreeNodeContent::Deleted;
}
}
index += 1;
}
Ok(())
}, "")?;
Ok(())
}
*/

339
src/operations/chunking_flow.rs

@ -0,0 +1,339 @@
use std::collections::BTreeMap;
use std::fs::File;
use std::hash::Hasher;
use std::io::{ErrorKind, Read};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use async_std::sync::{Receiver, Sender};
use async_std::task;
use log::{trace, error};
use crate::chunking::{chunkid_to_hex, RecursiveChunker};
use crate::chunking;
use crate::def::{ChunkId, RecursiveChunkRef, TreeNode, TreeNodeContent, XXH64_SEED, XXHash, YamaResult};
use crate::pile::Pile;
pub struct ChunkingFlowConfiguration {
pub max_chunkers: u32,
pub offerer_queue_size: usize,
pub max_offerers: u32,
pub compressor_queue_size: usize,
pub max_compressors: u32,
pub uploader_queue_size: usize,
pub max_uploaders: u32,
pub zstd_level: i32,
}
impl Default for ChunkingFlowConfiguration {
fn default() -> Self {
ChunkingFlowConfiguration {
max_chunkers: 4,
offerer_queue_size: 256,
max_offerers: 32,
compressor_queue_size: 32,
max_compressors: 4,
uploader_queue_size: 64,
max_uploaders: 4,
zstd_level: 12,
/* pretty darn slow but I'm willing to accept that */
}
}
}
impl ChunkingFlowConfiguration {
pub async fn run(&self, tree_root: PathBuf, mut tree_node: TreeNode, pile: Box<dyn Pile>)
-> YamaResult<RecursiveChunkRef> {
unsafe fn extend_lifetime<'b, T>(r: &'b T) -> &'static T {
std::mem::transmute::<&'b T, &'static T>(r)
}
let filepaths: Arc<Vec<String>> = Arc::new(unpack_dirscan_filepaths(&mut tree_node)?);
let inputs = unsafe {
let fp_static = extend_lifetime(&filepaths);
Arc::new(Mutex::new(fp_static.iter().enumerate()))
};
let outputs = Arc::new(Mutex::new(BTreeMap::new()));
let (mut chunker_send, offerer_recv) = async_std::sync::channel::<(ChunkId, XXHash, Vec<u8>)>(self.offerer_queue_size);
let (offerer_send, compressor_recv) = async_std::sync::channel::<(ChunkId, Vec<u8>)>(self.compressor_queue_size);
let (compressor_send, uploader_recv) = async_std::sync::channel::<(ChunkId, Vec<u8>)>(self.uploader_queue_size);
let mut chunker_handlers = Vec::new();
let mut other_handlers = Vec::new();
for _ in 0..self.max_chunkers {
let tree_root = tree_root.to_owned();
let chunker_send = chunker_send.clone();
let inputs = inputs.clone();
let outputs = outputs.clone();
chunker_handlers.push(task::spawn_blocking(move || chunker(tree_root, inputs, outputs, chunker_send)));
}
for _ in 0..self.max_offerers {
let offerer_recv = offerer_recv.clone();
let offerer_send = offerer_send.clone();
let pile = pile.clone_pile_handle();
other_handlers.push(task::spawn(offerer(offerer_recv, offerer_send, pile)));
}
let zstd_dictionary = Arc::new(pile.get_dictionary().await
.expect("Can't carry on without a Zstd dictionary."));
for _ in 0..self.max_compressors {
let compressor_recv = compressor_recv.clone();
let compressor_send = compressor_send.clone();
let zstd_dictionary = zstd_dictionary.clone();
other_handlers.push(task::spawn(compressor(compressor_recv, compressor_send, zstd_dictionary,
self.zstd_level)));
}
for _ in 0..self.max_uploaders {
let uploader_recv = uploader_recv.clone();
let pile = pile.clone_pile_handle();
other_handlers.push(task::spawn(uploader(uploader_recv, pile)));
}
// we drop these so the receivers get properly hung up later
drop(compressor_send);
drop(offerer_send);
let mut errors: Option<String> = None;
trace!("waiting for chunkers to complete");
for handler in chunker_handlers {
if let Err(err_string) = handler.await {
match errors.as_mut() {
None => {
errors = Some(err_string);
}
Some(errors) => {
errors.push('\n');
errors.push_str(&err_string);
}
}
}
}
trace!("chunkers complete; chunking tree");
let mut outputs = Arc::try_unwrap(outputs)
.map_err(|_| "(dangling Arcs)".to_owned())?
.into_inner()
.map_err(|e| e.to_string())?;
repack_dirscan_filepaths(&mut tree_node, &filepaths, &mut outputs)?;
let treenode_chunkref = {
let send_chunk = &mut |data: &[u8]| { send_chunk_implementation(&mut chunker_send, data) };
let send_data = serde_cbor::ser::to_vec_packed(&tree_node)?;
/*
File::create("/tmp/send_data").expect("create debug file")
.write_all(&send_data).expect("write debug file"); // TODO DEBUG
*/
// TODO debug only
let check: TreeNode = serde_cbor::from_slice(&send_data).expect("Serialisation doesn't produce deserialisable result.");
assert_eq!(tree_node, check, "SER CHECK failed");
drop(tree_node);
let mut chunker = RecursiveChunker::new(chunking::SENSIBLE_THRESHOLD);
chunker.write(&send_data, send_chunk)?;
drop(send_data);
chunker.finish(send_chunk)?
};
// we are done chunking, hang up
drop(chunker_send);
trace!("chunked tree ({}); waiting for all workers to finish", chunkid_to_hex(&treenode_chunkref.chunk_id));
for handler in other_handlers {
if let Err(err_string) = handler.await {
match errors.as_mut() {
None => {
errors = Some(err_string);
}
Some(errors) => {
errors.push('\n');
errors.push_str(&err_string);
}
}
}
}
trace!("workers complete");
match errors {
Some(errors) => {
error!("chunking flow had errors:\n{}\n-----", errors);
Err(errors.into())
}
None => {
Ok(treenode_chunkref)
}
}
}
}
fn send_chunk_implementation(chunk_send: &mut Sender<(ChunkId, XXHash, Vec<u8>)>, data: &[u8]) -> YamaResult<ChunkId> {
let chunk_id = chunking::calculate_chunkid(data);
let xxhash = {
let mut hasher = twox_hash::XxHash64::with_seed(XXH64_SEED);
hasher.write(data);
hasher.finish()
};
task::block_on(chunk_send.send(
(chunk_id, xxhash, data.to_vec())
));
Ok(chunk_id)
}
fn chunker<'a, I>(tree_root: PathBuf, inputs: Arc<Mutex<I>>, outputs: Arc<Mutex<BTreeMap<usize, Option<RecursiveChunkRef>>>>,
mut chunk_sender: Sender<(ChunkId, XXHash, Vec<u8>)>) -> Result<(), String>
where I: Iterator<Item=(usize, &'a String)> {
// &_ works around https://github.com/rust-lang/rust/issues/58639
let send_chunk = &mut |data: &_| send_chunk_implementation(&mut chunk_sender, data);
loop {
// get new task
let (next_idx, next_path) = {
let mut inputs_iterator = inputs.lock()
.expect("Poisoned");
if let Some((idx, next)) = inputs_iterator.next() {
trace!("chunker < {}:{}", idx, next);
let absolute_file_path = tree_root.parent()
.unwrap_or_else(|| &tree_root)
.to_owned().join(&Path::new(&next));
(idx, absolute_file_path)
} else {
// Nothing left to do
trace!("chunker //");
break;
}
};
// load the file and chunk it away!
match File::open(next_path) {
Ok(mut file) => {
let mut buf = vec![1; 8 * 1024 * 1024];
let mut chunker = RecursiveChunker::new(chunking::SENSIBLE_THRESHOLD);
loop {
let read = file.read(&mut buf)
.map_err(|e| e.to_string())?;
if read == 0 {
break;
}
chunker.write(&buf[0..read], send_chunk)
.map_err(|e| e.to_string())?;
}
let chunkref = chunker.finish(send_chunk)
.map_err(|e| e.to_string())?;
outputs.lock()
.expect("poison")
.insert(next_idx, Some(chunkref));
}
Err(e) => {
let e_kind = e.kind();
if e_kind == ErrorKind::PermissionDenied || e_kind == ErrorKind::NotFound {
// vanished / can't read.
error!("chunker E {:?}", e);
outputs.lock()
.expect("poison")
.insert(next_idx, None);
}
}
}
}
Ok(())