diff --git a/src/bin/copy.rs b/src/bin/copy.rs index 2b24619..a0afc2d 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -17,32 +17,45 @@ fn main() { let packet_type; let json; if args.is_copy_to_dfs { - packet_type = PacketType::PutFile; + packet_type = PacketType::RequestWrite; + println!("Requesting Write of {}", args.filepath); json = Some(serde_json::to_string( - &PutFile { - name: args.filepath, - size: size as u32, - }) - .unwrap()) + &PutFile { name: args.filepath, size: size as u32, }).unwrap()) } else { - packet_type = PacketType::GetFile; + packet_type = PacketType::RequestRead; + println!("Requesting Read of {}", args.filepath); json = Some(serde_json::to_string( - &GetFile { - }) - .unwrap()) + &GetFile { name: args.filepath, }).unwrap()) } serde_json::to_writer( &mut stream, &Packet { p_type: packet_type, json, }) .unwrap(); - println!("Sent file"); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); - let files: Vec = serde_json::from_reader(&mut stream).unwrap(); - for f in files { - println!("Chunk ID: {}", f.chunk_id); - } + match serde_json::from_reader(&mut stream) { + Ok(packet @ Packet { .. }) => match packet.p_type { + PacketType::Success => { + let nodes = serde_json::from_str::>(&packet.json.unwrap()) + .unwrap(); + for node in nodes { + println!("{}", node.chunk_id); + } + }, + PacketType::Error => { + let unwrapped = &packet.json.unwrap(); + panic!("Meta Data Server Error: {}", unwrapped); + }, + _ => (), + }, + Err(e) => println!("Error parsing json {}", e.to_string()), + }; - println!("{} bytes", file.len()); + +// let files: Vec = serde_json::from_reader(&mut stream).unwrap(); +// for f in files { +// println!("Chunk ID: {}", f.chunk_id); +// } +// println!("{} bytes", file.len()); // let mut stream = TcpStream::connect("localhost:6771").unwrap(); // stream.write(&file).unwrap(); // stream.flush().unwrap(); @@ -64,25 +77,27 @@ pub fn get_cli_args() -> CliArgs { } let mut endpoint_arg: String = args.get(0).unwrap().clone(); + println!("Endpoint Arg {}", endpoint_arg); + let endpoint; let filepath; let filename; let splits: Vec<&str>; - let is_copy_to_dfs = endpoint_arg.contains(":"); + let is_copy_to_dfs = !endpoint_arg.contains(":"); if is_copy_to_dfs { - splits = endpoint_arg.split(':').collect(); - if splits.len() < 3 { - panic!("Incorrect endpoint argument format! Please provide IP:PORT:FILE"); - } - filename = args.get(1).unwrap().clone(); - } else { endpoint_arg = args.get(1).unwrap().clone(); splits = endpoint_arg.split(':').collect(); if splits.len() < 3 { panic!("Incorrect endpoint argument format! Please provide IP:PORT:FILE"); } filename = args.get(0).unwrap().clone(); + } else { + splits = endpoint_arg.split(':').collect(); + if splits.len() < 3 { + panic!("Incorrect endpoint argument format! Please provide IP:PORT:FILE"); + } + filename = args.get(1).unwrap().clone(); } endpoint = format!("{}:{}", splits[0], splits[1]); filepath = String::from(splits[2]); diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index 00d6079..c5652ba 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -37,9 +37,22 @@ fn main() { } fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) { - let file : PutFile = serde_json::from_str(message).unwrap(); - add_file(&conn, &file.name, file.size as i32); - let (inode, blocks) = get_file_inode(&conn, &file.name); + let file: GetFile = serde_json::from_str(message).unwrap(); + let file_info = get_file_info(&conn, &file.name); + if file_info.is_none() { + match serde_json::to_writer( + stream, + &Packet { + p_type: PacketType::Error, + json: Some(String::from("File not found")), + }) { + Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), + Err(e) => println!("{}", e), + }; + return; + } + let file_info = file_info.unwrap(); + let blocks = get_file_inode(&conn, file_info.id); let mut nodes: Vec = Vec::new(); for b in blocks { nodes.push(AvailableNodes { @@ -48,17 +61,30 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) { chunk_id: b.chunk_id, }); } - match serde_json::to_writer( stream, &nodes) { + match serde_json::to_writer( + stream, + &Packet { p_type: PacketType::Success, json: Some(serde_json::to_string(&nodes).unwrap()) }) { Ok(_) => println!("{}", "Sent nodes with chunks"), Err(e) => println!("{}", e), }; } fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { - let file : PutFile = serde_json::from_str(message).unwrap(); - add_file(&conn, &file.name, file.size as i32); - // TODO: Need to ensure that replaces also work - let fid = conn.last_insert_rowid(); + let file: PutFile = serde_json::from_str(message).unwrap(); + let file_already_exists = add_file(&conn, &file.name, file.size as i32); + if file_already_exists { + match serde_json::to_writer( + stream, + &Packet { + p_type: PacketType::Error, + json: Some(String::from("File already exists, please remove before re-uploading")), + }) { + Ok(_) => println!("{}", "Copy client attempted to add an existing file"), + Err(e) => println!("{}", e), + }; + return; + } + let file_info = get_file_info(&conn, &file.name).unwrap(); let mut blocks: Vec = Vec::new(); let mut nodes: Vec = Vec::new(); let dnodes = get_data_nodes(&conn); @@ -68,7 +94,7 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { blocks.push(Block { chunk_id: uuid.clone(), node_id: dn.id, - file_id: fid as u32, + file_id: file_info.id as u32, id: 0, }); nodes.push(AvailableNodes { @@ -77,28 +103,29 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { chunk_id: uuid.clone(), }); } - add_blocks_to_inode(&conn, &file.name, &blocks); - match serde_json::to_writer(stream, &nodes) { + add_blocks_to_inode(&conn, file_info.id, &blocks); + match serde_json::to_writer( + stream, + &Packet { p_type: PacketType::Success, json: Some(serde_json::to_string(&nodes).unwrap()) }) { Ok(_) => println!("{}", "Sent nodes with chunks"), Err(e) => println!("{}", e), }; } fn list(stream: &mut TcpStream, conn: &Connection) { - match serde_json::to_writer(stream, &FilePaths { paths: Cow::from(get_files(&conn)), }) { + match serde_json::to_writer(stream, &FilePaths { paths: Cow::from(get_files(&conn)) }) { Ok(_) => println!("{}", "Sent file paths"), Err(e) => println!("{}", e), }; } fn node_registration(stream: &mut TcpStream, conn: &Connection, json: &String) { - let endpoint : NodeRegistration = serde_json::from_str(json).unwrap(); + let endpoint: NodeRegistration = serde_json::from_str(json).unwrap(); let message = if endpoint.register { // TODO: We probably should check if the endpoint already exists! add_data_node(&conn, &endpoint.ip, endpoint.port as i32); "You were successfully registered" - } - else { + } else { // TODO: We should check if the endpoint exists! remove_data_node(&conn, &endpoint.ip, endpoint.port as i32); "You were successfully unregistered" @@ -107,7 +134,7 @@ fn node_registration(stream: &mut TcpStream, conn: &Connection, json: &String) { } fn report_success(stream: &mut TcpStream, message: &str) { - match serde_json::to_writer(stream, &Packet { p_type: PacketType::Success, json: None, }) { + match serde_json::to_writer(stream, &Packet { p_type: PacketType::Success, json: None }) { Ok(_) => println!("{}", message), Err(e) => println!("{}", e), }; @@ -160,12 +187,20 @@ fn get_data_nodes(conn: &Connection) -> Vec { nodes } -fn add_file(conn: &Connection, fname: &String, fsize: i32) { +fn add_file(conn: &Connection, fname: &String, fsize: i32) -> bool { + let file_exists = conn.query_row( + "SELECT fid FROM inode WHERE fname = ?1", + &[&fname as &ToSql], + |_| {}) + .is_ok(); + if file_exists { + return true; + } conn.execute( - "INSERT OR REPLACE INTO inode (fid, fname, fsize) -VALUES ((SELECT fid FROM inode WHERE fname = ?1), ?1, ?2)", + "INSERT INTO inode (fname, fsize) VALUES (?1, ?2)", &[&fname as &ToSql, &fsize]) .unwrap(); + false } fn remove_file(conn: &Connection, fname: String) { @@ -175,61 +210,62 @@ fn remove_file(conn: &Connection, fname: String) { .unwrap(); } - fn get_file_info(conn: &Connection, fname: &String) -> INode { - conn.query_row( - "SELECT fid, fsize FROM inode where fname=?1", - &[&fname as &ToSql], - |row| INode { id: row.get(0), size: row.get(1), name: fname.clone() } - ).unwrap() - } +fn get_file_info(conn: &Connection, fname: &String) -> Option { + conn.query_row( + "SELECT fid, fsize FROM inode WHERE fname=?1", + &[&fname as &ToSql], + |row| INode { id: row.get(0), size: row.get(1), name: fname.clone() }, + ).ok() +} - fn get_files(conn: &Connection) -> Vec { - let mut stmt = conn - .prepare("SELECT fname, fsize FROM inode") - .unwrap(); - let mut files = Vec::new(); - match stmt.query_map(NO_PARAMS, |row| INode { - id: 0, - name: row.get(0), - size: row.get(1), - }) { - Ok(iter) => { - for f in iter { - let f = f.unwrap(); - files.push(format!("{} {} bytes", f.name, f.size)); - } - }, - Err(e) => println!("Error! {}", e), - } - files - } +fn get_files(conn: &Connection) -> Vec { + let mut stmt = conn + .prepare("SELECT fname, fsize FROM inode") + .unwrap(); + let mut files = Vec::new(); + match stmt.query_map(NO_PARAMS, |row| INode { + id: 0, + name: row.get(0), + size: row.get(1), + }) { + Ok(iter) => { + for f in iter { + let f = f.unwrap(); + files.push(format!("{} {} bytes", f.name, f.size)); + } + } + Err(e) => println!("Error! {}", e), + } + files +} - fn add_blocks_to_inode(conn: &Connection, fname: &String, blocks: &Vec) { - let fid : u32 = get_file_info(&conn, fname).id; - for block in blocks { - conn.execute( - "INSERT INTO block (nid, fid, cid) VALUES (?1, ?2, ?3)", - &[&block.node_id as &ToSql, &fid, &block.chunk_id]).unwrap(); - } - } +fn add_blocks_to_inode(conn: &Connection, fid: u32, blocks: &Vec) { + for block in blocks { + match conn.execute( + "INSERT INTO block (nid, fid, cid) VALUES (?1, ?2, ?3)", + &[&block.node_id as &ToSql, &fid, &block.chunk_id]) { + Ok(n) => println!("Updated {}", n), + Err(e) => println!("Error: {}", e), + } + } +} - fn get_file_inode(conn: &Connection, fname: &String) -> (INode, Vec) { - let file = get_file_info(&conn, &fname); - let mut stmt = conn.prepare( - "SELECT dnode.nid, address, port, cid FROM dnode, block WHERE dnode.nid = block.nid AND block.fid = ?1") - .unwrap(); - let iter = stmt.query_map( - &[&file.id], - |row| BlockQuery { - data_node: DataNode { id: row.get(0), ip: row.get(1), port: row.get(2) }, - chunk_id: row.get(3), - }).unwrap(); - let mut blocks: Vec = Vec::new(); - for b in iter { - blocks.push(b.unwrap()); - } - (file, blocks) - } +fn get_file_inode(conn: &Connection, fid: u32) -> Vec { + let mut stmt = conn.prepare( + "SELECT dnode.nid, address, port, cid FROM dnode, block WHERE dnode.nid = block.nid AND block.fid = ?1") + .unwrap(); + let iter = stmt.query_map( + &[&fid], + |row| BlockQuery { + data_node: DataNode { id: row.get(0), ip: row.get(1), port: row.get(2) }, + chunk_id: row.get(3), + }).unwrap(); + let mut blocks: Vec = Vec::new(); + for b in iter { + blocks.push(b.unwrap()); + } + blocks +} #[cfg(test)] mod tests { @@ -242,7 +278,7 @@ mod tests { fid INTEGER PRIMARY KEY ASC AUTOINCREMENT, fname TEXT UNIQUE NOT NULL DEFAULT \" \", fsize INTEGER NOT NULL default \"0\")", - NO_PARAMS + NO_PARAMS, ).unwrap(); conn.execute( @@ -259,7 +295,7 @@ bid INTEGER PRIMARY KEY ASC AUTOINCREMENT, fid INTEGER NOT NULL DEFAULT \"0\", nid INTEGER NOT NULL DEFAULT \"0\", cid TEXT NOT NULL DEFAULT \"0\")", - NO_PARAMS + NO_PARAMS, ).unwrap(); conn } @@ -355,12 +391,18 @@ cid TEXT NOT NULL DEFAULT \"0\")", add_file(&conn, &fname, 32); let files = get_files(&conn); assert_eq!(files.len(), 1); - let file = get_file_info(&conn, &fname); + let file = get_file_info(&conn, &fname).unwrap(); assert_eq!(file.id, 1); assert_eq!(file.name, "my_1337_virus"); assert_eq!(file.size, 32); } + #[test] + fn adding_new_file_returns_true() {} + + #[test] + fn adding_same_file_returns_false() {} + #[test] fn removes_file() { let conn = get_test_db(); @@ -398,7 +440,7 @@ cid TEXT NOT NULL DEFAULT \"0\")", add_data_node(&conn, "127.0.0.1", 1337); add_data_node(&conn, "127.0.0.2", 1338); add_data_node(&conn, "127.0.0.2", 1339); - let inode = get_file_info(&conn, &filename); + let inode = get_file_info(&conn, &filename).unwrap(); let blocks = vec!( Block { file_id: inode.id, @@ -419,8 +461,8 @@ cid TEXT NOT NULL DEFAULT \"0\")", chunk_id: String::from("c3"), }, ); - add_blocks_to_inode(&conn, &filename, &blocks); - let (inode, blocks) = get_file_inode(&conn, &filename); + add_blocks_to_inode(&conn, inode.id, &blocks); + let blocks = get_file_inode(&conn, inode.id); assert_eq!(inode.name, "main_file"); assert_eq!(inode.size, 128); assert_eq!(blocks.len(), 3); @@ -437,5 +479,4 @@ cid TEXT NOT NULL DEFAULT \"0\")", assert_eq!(blocks[2].data_node.ip, "127.0.0.2"); assert_eq!(blocks[2].data_node.port, 1339); } - } diff --git a/src/lib.rs b/src/lib.rs index ea93271..7ec4008 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ pub enum PacketType { AddDataBlocks, ShutdownDataNode, Success, + Error, } #[derive(Serialize, Deserialize, Debug)] @@ -55,7 +56,9 @@ pub struct AvailableNodes { } #[derive(Serialize, Deserialize, Debug)] -pub struct GetFile {} +pub struct GetFile { + pub name: String, +} #[derive(Serialize, Deserialize, Debug)] pub struct AddDataBlocks {}