diff --git a/.gitignore b/.gitignore index 91ca2aa..6b53060 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ *.db dfs_skel/ venv/ +data_node/ +copy_dir/ diff --git a/src/bin/copy.rs b/src/bin/copy.rs index f49c392..da2a1eb 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -21,7 +21,7 @@ fn main() { packet_type = PacketType::RequestWrite; println!("Requesting Write of {}", args.filepath); json = Some(serde_json::to_string( - &AddFile { name: args.filepath, size: size as u32, }).unwrap()) + &AddFile { name: args.filepath.clone(), size: size as u32, }).unwrap()) } else { packet_type = PacketType::RequestRead; println!("Requesting Read of {}", args.filepath); @@ -43,11 +43,11 @@ fn main() { Ok(_) => {}, Err(e) => eprintln!("Error parsing json {}", e.to_string()), }; - let filename = &args.filename; + let filename = &args.filepath; if args.is_copy_to_dfs { nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file)); } else { - nodes.map(|ns| get_file_from_data_nodes(&ns)); + nodes.map(|ns| get_file_from_data_nodes(&filename, &ns)); } } @@ -55,7 +55,7 @@ fn send_file_to_data_nodes(filename: &String, nodes: &Vec, file: let mut stream = TcpStream::connect("localhost:6771").unwrap(); println!("Going to send a file! Bytes {}", file.len()); let chunk = Chunk { - id: nodes[0].chunk_id.clone(), + index: nodes[0].chunk_index, filename: filename.clone(), }; let packet = serde_json::to_writer( @@ -69,7 +69,35 @@ fn send_file_to_data_nodes(filename: &String, nodes: &Vec, file: stream.shutdown(Shutdown::Write).unwrap(); } -fn get_file_from_data_nodes(nodes: &Vec) { +fn get_file_from_data_nodes(filename: &String, nodes: &Vec) { + let chunk = Chunk { + index: nodes[0].chunk_index, + filename: filename.clone(), + }; + let mut stream = TcpStream::connect("localhost:6771").unwrap(); + let packet = serde_json::to_writer( + &stream, + &Packet { + p_type: PacketType::GetFile, + json: Some(serde_json::to_string(&chunk).unwrap()), + data: None, + }).unwrap(); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); + match serde_json::from_reader(stream) { + Ok(Packet { p_type: PacketType::GetFile, json, data }) => { + let data = data.unwrap(); + let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); + // TODO: Here we have to rebuild the chunks + let mut copy = File::create(chunk.filename).unwrap(); + copy.write_all(&data[..]).unwrap(); + }, + Ok(Packet { p_type: PacketType::Error, json, .. }) => { + eprintln!("Data Node Server Error: {}", &json.unwrap()); + }, + Ok(_) => {}, + Err(e) => eprintln!("Error parsing json {}", e.to_string()), + }; } #[derive(Debug)] diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index 1258839..f7584cb 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -10,6 +10,8 @@ use std::io::{Write, Read}; use std::net::TcpListener; use serde_json::from_str; use std::fs::File; +use std::fs; +use std::error::Error; fn main() { let endpoint = parse_endpoint_from_cli(0); @@ -19,10 +21,13 @@ fn main() { for stream in listener.incoming() { let mut stream = stream.unwrap(); match serde_json::from_reader(&mut stream) { - Ok(Packet { p_type: PacketType::GetFile, json, data, }) => - get(&mut stream, &json.unwrap(), &data.unwrap()), - Ok(Packet { p_type: PacketType::PutFile, json, data, }) => - put(&mut stream, &json.unwrap(), &data.unwrap()), + Ok(Packet { p_type: PacketType::GetFile, json, data, }) => { + send_file(&mut stream, &json.unwrap(), &data.unwrap()); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); + } + Ok(Packet { p_type: PacketType::PutFile, json, data, }) => + receive_file(&json.unwrap(), &data.unwrap()), Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => shutdown(&mut stream, &endpoint), Ok(_) => eprintln!("We don't handle this PacketType"), @@ -31,6 +36,41 @@ fn main() { } } +fn receive_file(json: &String, data: &Vec) { + let chunk: Chunk = serde_json::from_str(json).unwrap(); + let mut copy = File::create(format!("{}_{}", chunk.filename, chunk.index)).unwrap(); + copy.write_all(&data[..]).unwrap(); +} + +fn send_file(stream: &mut TcpStream, json: &String, data: &Vec) { + let chunk: Chunk = serde_json::from_str(json).unwrap(); + match fs::read(&chunk.filename) { + Ok(f) => { + let packet = serde_json::to_writer( + stream, + &Packet { + p_type: PacketType::GetFile, + json: Some(json.clone()), + data: Some(Vec::from(f)), + }).unwrap(); + } + Err(e) => { + match serde_json::to_writer( + stream, + &Packet { + p_type: PacketType::Error, + json: Some(String::from(e.description())), + data: None, + }) { + Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), + Err(e) => println!("{}", e), + } + } + }; + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); +} + fn register_with_meta_server(endpoint: &String) { let mut stream = TcpStream::connect("localhost:6770").unwrap(); let split: Vec<&str> = endpoint.split(":").collect(); @@ -42,7 +82,8 @@ fn register_with_meta_server(endpoint: &String) { &NodeRegistration { register: true, ip: String::from(split[0]), - port: from_str(split[1]).unwrap() }) + port: from_str(split[1]).unwrap(), + }) .unwrap()), data: None, }) @@ -54,18 +95,6 @@ fn register_with_meta_server(endpoint: &String) { println!("{:?}", result); } -fn put(stream: &mut TcpStream, json: &String, data: &Vec) { - let chunk_id: Chunk = serde_json::from_str(json).unwrap(); - println!("CId: {:?}", chunk_id); - println!("Data Amount: {:?}", data.len()); - let mut copy = File::create(format!("{}_{}", chunk_id.filename, chunk_id.id)).unwrap(); - copy.write_all(&data[..]).unwrap(); -} - -fn get(stream: &mut TcpStream, json: &String, data: &Vec) { -// let files: String = serde_json::from_str(json).unwrap(); -} - fn shutdown(stream: &mut TcpStream, endpoint: &String) { let mut stream = TcpStream::connect("localhost:6770").unwrap(); let split: Vec<&str> = endpoint.split(":").collect(); @@ -77,7 +106,8 @@ fn shutdown(stream: &mut TcpStream, endpoint: &String) { &NodeRegistration { register: false, ip: String::from(split[0]), - port: from_str(split[1]).unwrap() }) + port: from_str(split[1]).unwrap(), + }) .unwrap()), data: None, }) diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index e483b25..8b87eaf 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -3,7 +3,6 @@ extern crate rusqlite; extern crate serde; extern crate serde_json; extern crate serde_derive; -extern crate uuid; use a03::*; use rusqlite::types::ToSql; @@ -59,7 +58,7 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) { nodes.push(AvailableNodes { ip: b.data_node.ip, port: b.data_node.port, - chunk_id: b.chunk_id, + chunk_index: b.chunk_index, }); } match serde_json::to_writer( @@ -76,8 +75,7 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) { fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { let file: AddFile = serde_json::from_str(message).unwrap(); -// let file_already_exists = add_file(&conn, &file.name, file.size as i32); - let file_already_exists = false; + let file_already_exists = add_file(&conn, &file.name, file.size as i32); if file_already_exists { match serde_json::to_writer( stream, @@ -91,17 +89,16 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { }; return; } -// let file_info = get_file_info(&conn, &file.name).unwrap(); - let file_info = INode { id: 1, name: file.name, size: file.size }; - println!("{:?}", file_info); + let file_info = get_file_info(&conn, &file.name).unwrap(); +// let file_info = INode { id: 1, name: file.name, size: file.size }; +// println!("{:?}", file_info); let mut blocks: Vec = Vec::new(); let mut nodes: Vec = Vec::new(); let dnodes = get_data_nodes(&conn); for i in 0..dnodes.len() { let dn = &dnodes[i]; - let uuid = uuid::Uuid::new_v4().to_string(); blocks.push(Block { - chunk_id: uuid.clone(), + chunk_index: i as u32, node_id: dn.id, file_id: file_info.id as u32, id: 0, @@ -109,10 +106,10 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { nodes.push(AvailableNodes { ip: dn.ip.clone(), port: dn.port, - chunk_id: uuid.clone(), + chunk_index: i as u32, }); } -// add_blocks_to_inode(&conn, file_info.id, &blocks); + add_blocks_to_inode(&conn, file_info.id, &blocks); match serde_json::to_writer( stream, &Packet { @@ -260,7 +257,7 @@ fn add_blocks_to_inode(conn: &Connection, fid: u32, blocks: &Vec) { for block in blocks { match conn.execute( "INSERT INTO block (nid, fid, cid) VALUES (?1, ?2, ?3)", - &[&block.node_id as &ToSql, &fid, &block.chunk_id]) { + &[&block.node_id as &ToSql, &fid, &block.chunk_index]) { Ok(n) => println!("Updated {}", n), Err(e) => println!("Error: {}", e), } @@ -275,7 +272,7 @@ fn get_file_inode(conn: &Connection, fid: u32) -> Vec { &[&fid], |row| BlockQuery { data_node: DataNode { id: row.get(0), ip: row.get(1), port: row.get(2) }, - chunk_id: row.get(3), + chunk_index: row.get(3), }).unwrap(); let mut blocks: Vec = Vec::new(); for b in iter { @@ -311,7 +308,7 @@ port INTEGER NOT NULL DEFAULT \"0\")", bid INTEGER PRIMARY KEY ASC AUTOINCREMENT, fid INTEGER NOT NULL DEFAULT \"0\", nid INTEGER NOT NULL DEFAULT \"0\", -cid TEXT NOT NULL DEFAULT \"0\")", +cid INTEGER NOT NULL DEFAULT \"0\")", NO_PARAMS, ).unwrap(); conn @@ -463,19 +460,19 @@ cid TEXT NOT NULL DEFAULT \"0\")", file_id: inode.id, id: 0, node_id: 1, - chunk_id: String::from("c1"), + chunk_index: 0, }, Block { file_id: inode.id, id: 0, node_id: 2, - chunk_id: String::from("c2"), + chunk_index: 1, }, Block { file_id: inode.id, id: 0, node_id: 3, - chunk_id: String::from("c3"), + chunk_index: 2, }, ); add_blocks_to_inode(&conn, inode.id, &blocks); @@ -483,15 +480,15 @@ cid TEXT NOT NULL DEFAULT \"0\")", assert_eq!(inode.name, "main_file"); assert_eq!(inode.size, 128); assert_eq!(blocks.len(), 3); - assert_eq!(blocks[0].chunk_id, "c1"); + assert_eq!(blocks[0].chunk_index, 0); assert_eq!(blocks[0].data_node.id, 1); assert_eq!(blocks[0].data_node.ip, "127.0.0.1"); assert_eq!(blocks[0].data_node.port, 1337); - assert_eq!(blocks[1].chunk_id, "c2"); + assert_eq!(blocks[1].chunk_index, 1); assert_eq!(blocks[1].data_node.id, 2); assert_eq!(blocks[1].data_node.ip, "127.0.0.2"); assert_eq!(blocks[1].data_node.port, 1338); - assert_eq!(blocks[2].chunk_id, "c3"); + assert_eq!(blocks[2].chunk_index, 2); assert_eq!(blocks[2].data_node.id, 3); assert_eq!(blocks[2].data_node.ip, "127.0.0.2"); assert_eq!(blocks[2].data_node.port, 1339); diff --git a/src/lib.rs b/src/lib.rs index 207ef2e..04895bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,12 +53,12 @@ pub struct AddFile { pub struct AvailableNodes { pub ip: String, pub port: u32, - pub chunk_id: String + pub chunk_index: u32 } #[derive(Serialize, Deserialize, Debug)] pub struct Chunk { - pub id: String, + pub index: u32, pub filename: String, } @@ -81,13 +81,13 @@ pub struct Block { pub id: u32, pub file_id: u32, pub node_id: u32, - pub chunk_id: String, + pub chunk_index: u32, } #[derive(Debug)] pub struct BlockQuery { pub data_node: DataNode, - pub chunk_id: String + pub chunk_index: u32 } pub fn parse_endpoint_from_cli(arg_index : usize) -> String {