diff --git a/Cargo.lock b/Cargo.lock index 667cab3..7c2d509 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6,6 +6,7 @@ dependencies = [ "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)", + "uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -13,6 +14,28 @@ name = "bitflags" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "itoa" version = "0.4.3" @@ -66,6 +89,31 @@ dependencies = [ "proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rand" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_core" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "redox_syscall" version = "0.1.40" @@ -137,6 +185,14 @@ name = "unicode-xid" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "uuid" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "vcpkg" version = "0.2.6" @@ -163,6 +219,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] "checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12" +"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b" "checksum libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)" = "10923947f84a519a45c8fefb7dd1b3e8c08747993381adee176d7a82b4195311" "checksum libsqlite3-sys 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "742b695cbfb89e549dca6960a55e6802f67d352e33e97859ee46dee835211b0f" @@ -171,6 +230,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c" "checksum proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)" = "77619697826f31a02ae974457af0b29b723e5619e113e9397b8b82c6bd253f09" "checksum quote 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "53fa22a1994bd0f9372d7a816207d8a2677ad0325b073f5c5332760f0fb62b5c" +"checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c" +"checksum rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1961a422c4d189dfb50ffa9320bf1f2a9bd54ecb92792fb9477f99a1045f3372" +"checksum rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0905b6b7079ec73b314d4c748701f6931eb79fd97c668caa3f1899b22b32c6db" "checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1" "checksum rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "39bae767eb27866f5c0be918635ae54af705bc09db11be2c43a3c6b361cf3462" "checksum ryu 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "eb9e9b8cde282a9fe6a42dd4681319bfb63f121b8a8ee9439c6f4107e58a46f7" @@ -180,6 +242,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum syn 0.15.22 (registry+https://github.com/rust-lang/crates.io-index)" = "ae8b29eb5210bc5cf63ed6149cbf9adfc82ac0be023d8735c176ee74a2db4da7" "checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" +"checksum uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dab5c5526c5caa3d106653401a267fed923e7046f35895ffcb5ca42db64942e6" "checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d" "checksum winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "92c1eb33641e276cfa214a0522acad57be5c56b10cb348b3c5117db75f3ac4b0" "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" diff --git a/Cargo.toml b/Cargo.toml index 6a7de88..ff74e9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,4 @@ rusqlite = "0.15.0" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +uuid = { version = "0.7", features = ["v4"] } diff --git a/clean b/clean new file mode 100755 index 0000000..48d06e7 --- /dev/null +++ b/clean @@ -0,0 +1 @@ +rm dfs.db && python dfs_skel/createdb.py diff --git a/src/bin/copy.rs b/src/bin/copy.rs index d353c2e..7f9672a 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -13,9 +13,28 @@ use std::fs::File; use std::fs; fn main() { - let mut file = fs::read("/home/joe/Downloads/ideaIU-2018.3.tar.gz").unwrap(); - let mut stream = TcpStream::connect("localhost:6771").unwrap(); - stream.write(&file).unwrap(); + let mut stream = TcpStream::connect("localhost:6770").unwrap(); + serde_json::to_writer( + &mut stream, + &Packet { + p_type: PacketType::PutFile, + json: Some(serde_json::to_string( + &PutFile { name: String::from("Somefile"), size: 32 }).unwrap()), + }) + .unwrap(); + println!("Sent file"); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); + + let files: Vec = serde_json::from_reader(&mut stream).unwrap(); + for f in files { + println!("Chunk ID: {}", f.chunk_id); + } + +// std::process::exit(0); +// let mut file = fs::read("/home/joe/Downloads/ideaIU-2018.3.tar.gz").unwrap(); +// let mut stream = TcpStream::connect("localhost:6771").unwrap(); +// stream.write(&file).unwrap(); +// stream.flush().unwrap(); +// stream.shutdown(Shutdown::Write).unwrap(); } diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index 3b2300d..d958930 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -14,29 +14,29 @@ use std::fs::File; fn main() { -// register_with_meta_server(); + register_with_meta_server(); let listener = TcpListener::bind("localhost:6771").unwrap(); for stream in listener.incoming() { let mut stream = stream.unwrap(); - let mut buf = Vec::new(); - match stream.read_to_end(&mut buf) { - Ok(size) => { - println!("Total bytes: {}", size); - let mut copy = File::create("new_version").unwrap(); - copy.write_all(&buf[..]).unwrap(); - }, - Err(e) => println!("{}", e), - } -// match serde_json::from_reader(&mut stream) { -// Ok(packet @ Packet { .. }) => match packet.p_type { -// PacketType::GetFiles => shutdown(&mut stream), -// PacketType::PutFiles => put(&mut stream, &packet.json.unwrap(), &mut Vec::new()), -// PacketType::ShutdownDataNode => shutdown(&mut stream), -// _ => (), +// let mut buf = Vec::new(); +// match stream.read_to_end(&mut buf) { +// Ok(size) => { +// println!("Total bytes: {}", size); +// let mut copy = File::create("new_version").unwrap(); +// copy.write_all(&buf[..]).unwrap(); // }, -// Err(e) => println!("Error parsing json {}", e.to_string()), -// }; +// Err(e) => println!("{}", e), +// } + match serde_json::from_reader(&mut stream) { + Ok(packet @ Packet { .. }) => match packet.p_type { +// PacketType::GetFiles => shutdown(&mut stream), +// PacketType::PutFile => put(&mut stream, &packet.json.unwrap(), &mut Vec::new()), + PacketType::ShutdownDataNode => shutdown(&mut stream), + _ => (), + }, + Err(e) => println!("Error parsing json: {}", e.to_string()), + }; } } @@ -47,7 +47,7 @@ fn register_with_meta_server() { &Packet { p_type: PacketType::NodeRegistration, json: Some(serde_json::to_string( - &NodeRegistration { register: true, ip: String::from("localhost"), port: 6770 }).unwrap()), + &NodeRegistration { register: true, ip: String::from("localhost"), port: 6771 }).unwrap()), }) .unwrap(); println!("Registered myself"); @@ -57,9 +57,9 @@ fn register_with_meta_server() { println!("{:?}", result); } -fn put(stream: &mut TcpStream, json: &String, files: &mut Vec) { - let files: PutFiles = serde_json::from_str(json).unwrap(); -} +//fn put(stream: &mut TcpStream, json: &String, files: &mut Vec) { +// let files: PutFiles = serde_json::from_str(json).unwrap(); +//} fn shutdown(stream: &mut TcpStream) { let mut stream = TcpStream::connect("localhost:6770").unwrap(); @@ -68,7 +68,7 @@ fn shutdown(stream: &mut TcpStream) { &Packet { p_type: PacketType::NodeRegistration, json: Some(serde_json::to_string( - &NodeRegistration { register: false, ip: String::from("localhost"), port: 6770 }).unwrap()), + &NodeRegistration { register: false, ip: String::from("localhost"), port: 6771 }).unwrap()), }) .unwrap(); println!("Unregistered myself"); diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index 41783b3..c51cc90 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -3,6 +3,7 @@ extern crate rusqlite; extern crate serde; extern crate serde_json; extern crate serde_derive; +extern crate uuid; use a03::*; use rusqlite::types::ToSql; @@ -22,8 +23,9 @@ fn main() { Ok(packet @ Packet { .. }) => match packet.p_type { PacketType::ListFiles => list(&mut stream, &conn), PacketType::NodeRegistration => - node_registration(&mut stream, &packet.json.unwrap(), &conn), -// PacketType::AddFile => + node_registration(&mut stream, &conn, &packet.json.unwrap()), + PacketType::PutFile => + put_file(&mut stream, &conn, &packet.json.unwrap()), _ => (), }, Err(e) => println!("Error parsing json {}", e.to_string()), @@ -32,6 +34,26 @@ fn main() { } } +fn put_file(stream: &mut TcpStream, conn: &Connection, message: &str) { + let file : PutFile = serde_json::from_str(message).unwrap(); + add_file(&conn, &file.name, file.size as i32); + let mut nodes: Vec = Vec::new(); + for dn in get_data_nodes(&conn) { + nodes.push(AvailableNodes { + ip: dn.ip, + port: dn.port, + chunk_id: uuid::Uuid::new_v4().to_string(), + }); + } + match serde_json::to_writer( + stream, + &nodes + ) { + Ok(_) => println!("{}", "Sent nodes with chunks"), + Err(e) => println!("{}", e), + }; +} + fn list(stream: &mut TcpStream, conn: &Connection) { match serde_json::to_writer( stream, @@ -44,7 +66,7 @@ fn list(stream: &mut TcpStream, conn: &Connection) { }; } -fn node_registration(stream: &mut TcpStream, json: &String, conn: &Connection) { +fn node_registration(stream: &mut TcpStream, conn: &Connection, json: &String) { let endpoint : NodeRegistration = serde_json::from_str(json).unwrap(); let message = if endpoint.register { // TODO: We probably should check if the endpoint already exists! @@ -74,7 +96,8 @@ fn report_success(stream: &mut TcpStream, message: &str) { fn add_data_node(conn: &Connection, address: &str, port: i32) { match conn.execute( - "INSERT INTO dnode (address, port) VALUES (?1, ?2)", + "INSERT OR REPLACE INTO dnode (nid, address, port) +VALUES ((SELECT nid FROM dnode WHERE address = ?1 AND port = ?2), ?1, ?2)", &[&address as &ToSql, &port], ) { Ok(n) => println!("{} rows updated", n), @@ -82,14 +105,14 @@ fn add_data_node(conn: &Connection, address: &str, port: i32) { }; } -fn remove_data_node(conn: &Connection, address: &str, port: i32) { +fn remove_data_node(conn: &Connection, address: &str, port: i32) -> bool { match conn.execute( "DELETE FROM dnode WHERE address=?1 AND port=?2", &[&address as &ToSql, &port], ) { - Ok(n) => println!("{} rows updated", n), - Err(e) => println!("DELETE error: {}", e), - }; + Ok(n) => n > 0, + Err(_) => false, + } } fn get_data_node(conn: &Connection, address: &str, port: i32) -> Option { @@ -120,7 +143,8 @@ fn get_data_nodes(conn: &Connection) -> Vec { fn add_file(conn: &Connection, fname: &String, fsize: i32) { conn.execute( - "INSERT INTO inode (fname, fsize) VALUES (?1, ?2)", + "INSERT OR REPLACE INTO inode (fid, fname, fsize) +VALUES ((SELECT fid FROM inode WHERE fname = ?1), ?1, ?2)", &[&fname as &ToSql, &fsize]) .unwrap(); } @@ -249,6 +273,13 @@ cid TEXT NOT NULL DEFAULT \"0\")", assert_eq!(dnode.is_none(), true); } + #[test] + fn returns_false_dnode_if_it_doesnt_exist() { + let conn = get_test_db(); + let removed = remove_data_node(&conn, "127.0.0.1", 65533); + assert_eq!(removed, false); + } + #[test] fn gets_all_data_nodes() { let conn = get_test_db(); @@ -270,6 +301,22 @@ cid TEXT NOT NULL DEFAULT \"0\")", } } + #[test] + fn adds_node_multiple_times_but_id_doesnt_change() { + let conn = get_test_db(); + let ip = "127.0.0.1"; + let port = 65533; + add_data_node(&conn, &ip, port); + add_data_node(&conn, &ip, port); + add_data_node(&conn, &ip, port); + let ds = get_data_nodes(&conn); + assert_eq!(ds.len(), 1); + let dn = get_data_node(&conn, ip, port).unwrap(); + assert_eq!(dn.id, 1); + assert_eq!(dn.ip, ip); + assert_eq!(dn.port, port as u32); + } + #[test] fn adds_file() { let conn = get_test_db(); @@ -279,6 +326,22 @@ cid TEXT NOT NULL DEFAULT \"0\")", assert_eq!(files[0], "my_1337_virus 32 bytes"); } + #[test] + fn adds_file_multiple_times_but_id_doesnt_change() { + let conn = get_test_db(); + let fname = String::from("my_1337_virus"); + add_file(&conn, &fname, 32); + add_file(&conn, &fname, 32); + add_file(&conn, &fname, 32); + add_file(&conn, &fname, 32); + let files = get_files(&conn); + assert_eq!(files.len(), 1); + let file = get_file_info(&conn, &fname); + assert_eq!(file.id, 1); + assert_eq!(file.name, "my_1337_virus"); + assert_eq!(file.size, 32); + } + #[test] fn removes_file() { let conn = get_test_db(); diff --git a/src/lib.rs b/src/lib.rs index f16185a..df82560 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,10 +10,10 @@ use std::net::Ipv4Addr; pub enum PacketType { NodeRegistration, ListFiles, - AddFile, - PutFiles, + PutFile, GetFiles, AddDataBlocks, + ShutdownDataNode, Success, } @@ -36,8 +36,16 @@ pub struct NodeRegistration { } #[derive(Serialize, Deserialize, Debug)] -pub struct PutFiles { - pub files: Vec +pub struct PutFile { + pub name: String, + pub size: u32 +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct AvailableNodes { + pub ip: String, + pub port: u32, + pub chunk_id: String } #[derive(Serialize, Deserialize, Debug)]