From 1f7d26f9bd192ca612a39b95d2ab1a6bcd1e9ff7 Mon Sep 17 00:00:00 2001 From: Joseph Ferano Date: Sun, 9 Dec 2018 15:33:50 -0400 Subject: [PATCH] Now able to send large data over TCP. Implemented a lot of the remaining SQL related calls --- src/bin/copy.rs | 8 +- src/bin/data_node.rs | 31 ++++- src/bin/meta_data.rs | 316 ++++++++++++++++++++++++++++++++----------- src/lib.rs | 13 +- 4 files changed, 277 insertions(+), 91 deletions(-) diff --git a/src/bin/copy.rs b/src/bin/copy.rs index a508afa..d353c2e 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -13,7 +13,9 @@ use std::fs::File; use std::fs; fn main() { - let mut file = fs::read("dfs.db").unwrap(); - let mut copy = File::create("copy").unwrap(); - copy.write_all(&file[..]).unwrap(); + let mut file = fs::read("/home/joe/Downloads/ideaIU-2018.3.tar.gz").unwrap(); + let mut stream = TcpStream::connect("localhost:6771").unwrap(); + stream.write(&file).unwrap(); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); } diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index 21f1591..3b2300d 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -10,20 +10,33 @@ use std::thread; use std::io::Read; use std::io::Write; use std::net::TcpListener; +use std::fs::File; fn main() { - register_with_meta_server(); +// register_with_meta_server(); let listener = TcpListener::bind("localhost:6771").unwrap(); + for stream in listener.incoming() { let mut stream = stream.unwrap(); - match serde_json::from_reader(&mut stream) { - Ok(packet @ Packet { .. }) => match packet.p_type { - PacketType::ShutdownDataNode => shutdown(&mut stream), - _ => (), + let mut buf = Vec::new(); + match stream.read_to_end(&mut buf) { + Ok(size) => { + println!("Total bytes: {}", size); + let mut copy = File::create("new_version").unwrap(); + copy.write_all(&buf[..]).unwrap(); }, - Err(e) => println!("Error parsing json {}", e.to_string()), - }; + Err(e) => println!("{}", e), + } +// match serde_json::from_reader(&mut stream) { +// Ok(packet @ Packet { .. }) => match packet.p_type { +// PacketType::GetFiles => shutdown(&mut stream), +// PacketType::PutFiles => put(&mut stream, &packet.json.unwrap(), &mut Vec::new()), +// PacketType::ShutdownDataNode => shutdown(&mut stream), +// _ => (), +// }, +// Err(e) => println!("Error parsing json {}", e.to_string()), +// }; } } @@ -44,6 +57,10 @@ fn register_with_meta_server() { println!("{:?}", result); } +fn put(stream: &mut TcpStream, json: &String, files: &mut Vec) { + let files: PutFiles = serde_json::from_str(json).unwrap(); +} + fn shutdown(stream: &mut TcpStream) { let mut stream = TcpStream::connect("localhost:6770").unwrap(); serde_json::to_writer( diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index a692d77..28600c8 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -12,22 +12,20 @@ use std::io::Write; use std::net::{TcpListener, TcpStream}; fn main() { - let mut data_nodes: Vec = Vec::new(); + // TODO: We need to check if the DB exists, if not, ask the user to run the python script + // Maybe we can even run it for them + let conn = Connection::open("dfs.db").unwrap(); let mut file_list: Vec = Vec::new(); - file_list.push(String::from("/")); - file_list.push(String::from("/home")); - file_list.push(String::from("/home/joe")); - file_list.push(String::from("/bin")); let listener = TcpListener::bind("localhost:6770").unwrap(); for stream in listener.incoming() { let mut stream = stream.unwrap(); match serde_json::from_reader(&mut stream) { Ok(packet @ Packet { .. }) => match packet.p_type { - PacketType::ListFiles => list(&mut stream, &file_list[..]), - PacketType::PutFiles => put(&mut stream, &packet.json.unwrap(), &mut file_list), + PacketType::ListFiles => list(&mut stream, &conn), PacketType::NodeRegistration => - node_registration(&mut stream, &packet.json.unwrap(), &mut data_nodes), + node_registration(&mut stream, &packet.json.unwrap(), &conn), +// PacketType::AddFile => _ => (), }, Err(e) => println!("Error parsing json {}", e.to_string()), @@ -36,11 +34,11 @@ fn main() { } } -fn list(stream: &mut TcpStream, files: &[String]) { +fn list(stream: &mut TcpStream, conn: &Connection) { match serde_json::to_writer( stream, &FilePaths { - paths: Cow::from(files), + paths: Cow::from(get_files(&conn)), }, ) { Ok(_) => println!("{}", "Sent file paths"), @@ -48,33 +46,18 @@ fn list(stream: &mut TcpStream, files: &[String]) { }; } -fn put(stream: &mut TcpStream, json: &String, files: &mut Vec) { - let files: PutFiles = serde_json::from_str(json).unwrap(); - report_success(stream, "Successfully Put Files"); -} - -fn node_registration(stream: &mut TcpStream, json: &String, data_nodes: &mut Vec) { +fn node_registration(stream: &mut TcpStream, json: &String, conn: &Connection) { let endpoint : NodeRegistration = serde_json::from_str(json).unwrap(); let message = if endpoint.register { - data_nodes.push(DataNode { ip: endpoint.ip, port: endpoint.port, id: 1 }); - "You were successfully registering" + // TODO: We probably should check if the endpoint already exists! + add_data_node(&conn, &endpoint.ip, endpoint.port as i32); + "You were successfully registered" } else { - match data_nodes.iter() - .position(|dn| dn.ip == endpoint.ip && dn.port == endpoint.port) { - Some(index) => { - data_nodes.remove(index); - "You were successfully unregistering" - }, - None => { - println!("Data Node at {}:{} does not exit", endpoint.ip, endpoint.port); - "You weren't found" - } - } + // TODO: We should check if the endpoint exists! + remove_data_node(&conn, &endpoint.ip, endpoint.port as i32); + "You were successfully unregistered" }; - for dn in data_nodes { - println!("{:?}", dn); - } report_success(stream, message); } @@ -91,37 +74,6 @@ fn report_success(stream: &mut TcpStream, message: &str) { }; } -fn create_tables(conn: &Connection) { - conn.execute( - "CREATE TABLE inode ( -fid INTEGER PRIMARY KEY ASC AUTOINCREMENT, -fname TEXT UNIQUE NOT NULL DEFAULT \" \", -fsize INTEGER NOT NULL default \"0\")", - NO_PARAMS, - ).unwrap(); - - conn.execute( - "CREATE TABLE dnode ( -nid INTEGER PRIMARY KEY ASC AUTOINCREMENT, -address TEXT NOT NULL DEFAULT \" \", -port INTEGER NOT NULL DEFAULT \"0\")", - NO_PARAMS, - ).unwrap(); - - conn.execute( - "CREATE TABLE block ( -bid INTEGER PRIMARY KEY ASC AUTOINCREMENT, -fid INTEGER NOT NULL DEFAULT \"0\", -nid INTEGER NOT NULL DEFAULT \"0\", -cid TEXT NOT NULL DEFAULT \"0\")", - NO_PARAMS, - ).unwrap(); - - // Create UNIQUE tuple for block - conn.execute("CREATE UNIQUE INDEX blocknc ON block(nid, cid)", NO_PARAMS) - .unwrap(); -} - fn add_data_node(conn: &Connection, address: &str, port: i32) { match conn.execute( "INSERT INTO dnode (address, port) VALUES (?1, ?2)", @@ -132,7 +84,17 @@ fn add_data_node(conn: &Connection, address: &str, port: i32) { }; } -fn check_node(conn: &Connection, address: &str, port: i32) -> DataNode { +fn remove_data_node(conn: &Connection, address: &str, port: i32) { + match conn.execute( + "DELETE FROM dnode WHERE address=?1 AND port=?2)", + &[&address as &ToSql, &port], + ) { + Ok(n) => println!("{} rows updated", n), + Err(e) => println!("DELETE error: {}", e), + }; +} + +fn get_data_node(conn: &Connection, address: &str, port: i32) -> DataNode { let mut stmt = conn .prepare("SELECT nid, address, port FROM dnode WHERE address=?1 AND port=?2") .unwrap(); @@ -143,26 +105,127 @@ fn check_node(conn: &Connection, address: &str, port: i32) -> DataNode { }).unwrap() } -fn get_data_nodes(conn: &Connection) { - let mut stmt = conn.prepare("SELECT address, port FROM dnode WHERE 1"); +fn get_data_nodes(conn: &Connection) -> Vec { + let mut stmt = conn.prepare("SELECT nid, address, port FROM dnode").unwrap(); + let iter = stmt.query_map(NO_PARAMS, |row| DataNode { + id: row.get(0), + ip: row.get(1), + port: row.get(2), + }).unwrap(); + let mut nodes = Vec::new(); + for n in iter { + let n = n.unwrap(); + nodes.push(n); + } + nodes } -fn insert_file(conn: &Connection, fname: String, fsize: usize) { - let mut stmt = conn.prepare("INSERT INTO inode (fname, fsize) VALUES (\"?1\", ?2)"); +fn add_file(conn: &Connection, fname: &String, fsize: i32) { + conn.execute( + "INSERT INTO inode (fname, fsize) VALUES (?1, ?2)", + &[&fname as &ToSql, &fsize]) + .unwrap(); } -// fn get_file_info(conn: &Connection, fname: String) {} +fn remove_file(conn: &Connection, fname: String) { + conn.execute( + "DELETE FROM inode WHERE fname=?1", + &[&fname as &ToSql]) + .unwrap(); +} -// fn get_files(conn: &Connection) {} + fn get_file_info(conn: &Connection, fname: &String) -> INode { + conn.query_row( + "SELECT fid, fsize FROM inode where fname=?1", + &[&fname as &ToSql], + |row| INode { id: row.get(0), size: row.get(1), name: fname.clone() } + ).unwrap() + } -// fn add_block_to_inode(conn: &Connection, fname: String, blocks: usize) {} + fn get_files(conn: &Connection) -> Vec { + let mut stmt = conn + .prepare("SELECT fname, fsize FROM inode") + .unwrap(); + let iter = stmt.query_map(NO_PARAMS, |row| INode { + id: 0, + name: row.get(0), + size: row.get(1), + }).unwrap(); + let mut files = Vec::new(); + for f in iter { + let f = f.unwrap(); + files.push(format!("{} {} bytes", f.name, f.size)); + } + files + } -// fn get_file_inode(conn: &Connection, fname: String) {} + fn add_blocks_to_inode(conn: &Connection, fname: &String, blocks: &Vec) { + let fid : u32 = get_file_info(&conn, fname).id; + for block in blocks { + conn.execute( + "INSERT INTO block (nid, fid, cid) VALUES (?1, ?2, ?3)", + &[&block.node_id as &ToSql, &fid, &block.chunk_id]).unwrap(); + } + } + + fn get_file_inode(conn: &Connection, fname: &String) -> (INode, Vec) { + let file = get_file_info(&conn, &fname); + let mut stmt = conn.prepare( + "SELECT address, port, cid FROM dnode, block WHERE dnode.nid = block.nid AND block.fid = ?1") + .unwrap(); + let iter = stmt.query_map( + &[&file.id], + |row| BlockQuery { + data_node: DataNode { id: 0, ip: row.get(0), port: row.get(1) }, + chunk_id: row.get(2), + id: 0, + }).unwrap(); + let mut blocks: Vec = Vec::new(); + for b in iter { + blocks.push(b.unwrap()); + } + (file, blocks) + } #[cfg(test)] mod tests { use super::*; + fn create_tables(conn: &Connection) { + conn.execute( +"CREATE TABLE inode ( +fid INTEGER PRIMARY KEY ASC AUTOINCREMENT, +fname TEXT UNIQUE NOT NULL DEFAULT \" \", +fsize INTEGER NOT NULL default \"0\")", + NO_PARAMS + ).unwrap(); + + conn.execute( +"CREATE TABLE dnode ( +nid INTEGER PRIMARY KEY ASC AUTOINCREMENT, +address TEXT NOT NULL DEFAULT \" \", +port INTEGER NOT NULL DEFAULT \"0\")", + NO_PARAMS, + ).unwrap(); + + conn.execute( +"CREATE TABLE block ( +bid INTEGER PRIMARY KEY ASC AUTOINCREMENT, +fid INTEGER NOT NULL DEFAULT \"0\", +nid INTEGER NOT NULL DEFAULT \"0\", +cid TEXT NOT NULL DEFAULT \"0\")", + NO_PARAMS + ).unwrap(); + + // Create UNIQUE tuple for block + conn.execute("CREATE UNIQUE INDEX dnodeA ON dnode(address, port)", NO_PARAMS) + .unwrap(); + + // Create UNIQUE tuple for block + conn.execute("CREATE UNIQUE INDEX blocknc ON block(nid, cid)", NO_PARAMS) + .unwrap(); + } + fn get_test_db() -> Connection { let conn = Connection::open_in_memory().unwrap(); create_tables(&conn); @@ -170,13 +233,112 @@ mod tests { } #[test] - fn inserts_dnode_with_correct_ip_and_port() { + fn add_dnode_with_correct_ip_and_port() { let conn = get_test_db(); let ip = "127.0.0.1"; - add_data_node(&conn, &ip, 65533); - let dnode = check_node(&conn, &ip, 65533); + let port = 65533; + add_data_node(&conn, &ip, port as i32); + let dnode = get_data_node(&conn, &ip, port as i32); assert_eq!(dnode.id, 1); - assert_eq!(dnode.address, ip); - assert_eq!(dnode.port, 65533); + assert_eq!(dnode.ip, ip); + assert_eq!(dnode.port, port); } + + #[test] + fn removes_dnode_with_correct_ip_and_port() { + // TODO: I don't know how to test a delete + let conn = get_test_db(); + let ip = "127.0.0.1"; + let port = 65533; + add_data_node(&conn, &ip, port as i32); + let dnode = get_data_node(&conn, &ip, port as i32); + assert_eq!(dnode.id, 1); + assert_eq!(dnode.ip, ip); + assert_eq!(dnode.port, port); + } + + #[test] + fn gets_all_data_nodes() { + let conn = get_test_db(); + let ip1 = "127.0.0.1"; + let port1 = 65533; + let ip2 = "127.0.0.2"; + let port2 = port1 + 1; + let ip3 = "127.0.0.3"; + let port3 = port2 + 1; + add_data_node(&conn, &ip1, port1 as i32); + add_data_node(&conn, &ip2, port2 as i32); + add_data_node(&conn, &ip3, port3 as i32); + let ds = get_data_nodes(&conn); + for i in 0..ds.len() { + let d = &ds[i]; + assert_eq!(d.ip, format!("127.0.0.{}", i + 1)); + assert_eq!(d.port, 65533 + i as u32); + } + } + + #[test] + fn adds_file() { + let conn = get_test_db(); + add_file(&conn, &String::from("my_1337_virus"), 32); + let files = get_files(&conn); + assert_eq!(files.len(), 1); + assert_eq!(files[0], "my_1337_virus 32 bytes"); + } + + #[test] + fn deletes_file() { + let conn = get_test_db(); + add_file(&conn, &String::from("my_1337_virus"), 32); + let files = get_files(&conn); + assert_eq!(files.len(), 1); + assert_eq!(files[0], "my_1337_virus 32 bytes"); + } + + #[test] + fn gets_all_file() { + let conn = get_test_db(); + add_file(&conn, &String::from("file1"), 32); + add_file(&conn, &String::from("file2"), 64); + add_file(&conn, &String::from("file3"), 128); + let files = get_files(&conn); + assert_eq!(files.len(), 3); + assert_eq!(files[0], "file1 32 bytes"); + assert_eq!(files[1], "file2 64 bytes"); + assert_eq!(files[2], "file3 128 bytes"); + } + + #[test] + fn adds_blocks_to_inode() { + let conn = get_test_db(); + let filename = String::from("main_file"); + add_file(&conn, &filename, 128); + add_data_node(&conn, "127.0.0.1", 1337); + add_data_node(&conn, "127.0.0.2", 1338); + let inode = get_file_info(&conn, &filename); + let blocks = vec!( + Block { + file_id: inode.id, + id: 0, + node_id: 1, + chunk_id: String::from("c1"), + }, + Block { + file_id: inode.id, + id: 0, + node_id: 2, + chunk_id: String::from("c2"), + }, + ); + add_blocks_to_inode(&conn, &filename, &blocks); + let (inode, blocks) = get_file_inode(&conn, &filename); + assert_eq!(inode.name, "main_file"); + assert_eq!(inode.size, 128); + assert_eq!(blocks.len(), 2); + let dn1 = get_data_node(&conn, "127.0.0.1", 1337); + let dn2 = get_data_node(&conn, "127.0.0.2", 1338); + assert_eq!(dn1.id, 1); + assert_eq!(dn2.id, 2); + } + } diff --git a/src/lib.rs b/src/lib.rs index 073e3c6..8e9ed7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,10 +10,10 @@ use std::net::Ipv4Addr; pub enum PacketType { NodeRegistration, ListFiles, + AddFile, PutFiles, GetFiles, AddDataBlocks, - ShutdownDataNode, Success, } @@ -30,8 +30,8 @@ pub struct FilePaths<'a> { #[derive(Serialize, Deserialize, Debug)] pub struct NodeRegistration { - pub ip: String, pub register: bool, + pub ip: String, pub port: u32, } @@ -58,7 +58,6 @@ pub struct INode { pub id: u32, pub name: String, pub size: u32, - pub blocks: Vec, } #[derive(Debug)] @@ -66,6 +65,12 @@ pub struct Block { pub id: u32, pub file_id: u32, pub node_id: u32, - pub c_id: String, + pub chunk_id: String, } +#[derive(Debug)] +pub struct BlockQuery { + pub id: u32, + pub data_node: DataNode, + pub chunk_id: String +}