diff --git a/src/bin/copy.rs b/src/bin/copy.rs index a0afc2d..f49c392 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -6,12 +6,13 @@ extern crate serde_derive; use a03::*; use std::net::{TcpStream, Shutdown}; -use std::io::Write; +use std::io::{Write, Read}; +use std::fs::File; use std::fs; fn main() { let args = get_cli_args(); - let file = fs::read(args.filename).expect("File not found!"); + let file = fs::read(&args.filename).expect("File not found!"); let size = file.len(); let mut stream = TcpStream::connect(args.endpoint).unwrap(); let packet_type; @@ -20,46 +21,55 @@ fn main() { packet_type = PacketType::RequestWrite; println!("Requesting Write of {}", args.filepath); json = Some(serde_json::to_string( - &PutFile { name: args.filepath, size: size as u32, }).unwrap()) + &AddFile { name: args.filepath, size: size as u32, }).unwrap()) } else { packet_type = PacketType::RequestRead; println!("Requesting Read of {}", args.filepath); - json = Some(serde_json::to_string( - &GetFile { name: args.filepath, }).unwrap()) + json = Some(serde_json::to_string::(&args.filepath).unwrap()) } - serde_json::to_writer( &mut stream, &Packet { p_type: packet_type, json, }) + serde_json::to_writer( &mut stream, &Packet { p_type: packet_type, json, data: None, }) .unwrap(); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); + let mut nodes: Option> = None; match serde_json::from_reader(&mut stream) { - Ok(packet @ Packet { .. }) => match packet.p_type { - PacketType::Success => { - let nodes = serde_json::from_str::>(&packet.json.unwrap()) - .unwrap(); - for node in nodes { - println!("{}", node.chunk_id); - } - }, - PacketType::Error => { - let unwrapped = &packet.json.unwrap(); - panic!("Meta Data Server Error: {}", unwrapped); - }, - _ => (), + Ok(Packet { p_type: PacketType::Success, json, .. }) => + nodes = Some(serde_json::from_str::>(&json.unwrap()) + .unwrap()), + Ok(Packet { p_type: PacketType::Error, json, .. }) => { + eprintln!("Meta Data Server Error: {}", &json.unwrap()); }, - Err(e) => println!("Error parsing json {}", e.to_string()), + Ok(_) => {}, + Err(e) => eprintln!("Error parsing json {}", e.to_string()), }; + let filename = &args.filename; + if args.is_copy_to_dfs { + nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file)); + } else { + nodes.map(|ns| get_file_from_data_nodes(&ns)); + } +} +fn send_file_to_data_nodes(filename: &String, nodes: &Vec, file: &Vec) { + let mut stream = TcpStream::connect("localhost:6771").unwrap(); + println!("Going to send a file! Bytes {}", file.len()); + let chunk = Chunk { + id: nodes[0].chunk_id.clone(), + filename: filename.clone(), + }; + let packet = serde_json::to_writer( + &mut stream, + &Packet { + p_type: PacketType::PutFile, + json: Some(serde_json::to_string(&chunk).unwrap()), + data: Some(file.clone()), + }).unwrap(); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); +} -// let files: Vec = serde_json::from_reader(&mut stream).unwrap(); -// for f in files { -// println!("Chunk ID: {}", f.chunk_id); -// } -// println!("{} bytes", file.len()); -// let mut stream = TcpStream::connect("localhost:6771").unwrap(); -// stream.write(&file).unwrap(); -// stream.flush().unwrap(); -// stream.shutdown(Shutdown::Write).unwrap(); +fn get_file_from_data_nodes(nodes: &Vec) { } #[derive(Debug)] diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index e118a70..1258839 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -6,9 +6,10 @@ extern crate serde_derive; use a03::*; use std::net::{TcpStream, Shutdown}; -use std::io::Write; +use std::io::{Write, Read}; use std::net::TcpListener; use serde_json::from_str; +use std::fs::File; fn main() { let endpoint = parse_endpoint_from_cli(0); @@ -17,23 +18,15 @@ fn main() { for stream in listener.incoming() { let mut stream = stream.unwrap(); -// let mut buf = Vec::new(); -// match stream.read_to_end(&mut buf) { -// Ok(size) => { -// println!("Total bytes: {}", size); -// let mut copy = File::create("new_version").unwrap(); -// copy.write_all(&buf[..]).unwrap(); -// }, -// Err(e) => println!("{}", e), -// } match serde_json::from_reader(&mut stream) { - Ok(packet @ Packet { .. }) => match packet.p_type { -// PacketType::GetFiles => shutdown(&mut stream), -// PacketType::PutFile => put(&mut stream, &packet.json.unwrap(), &mut Vec::new()), - PacketType::ShutdownDataNode => shutdown(&mut stream, &endpoint), - _ => (), - }, - Err(e) => println!("Error parsing json: {}", e.to_string()), + Ok(Packet { p_type: PacketType::GetFile, json, data, }) => + get(&mut stream, &json.unwrap(), &data.unwrap()), + Ok(Packet { p_type: PacketType::PutFile, json, data, }) => + put(&mut stream, &json.unwrap(), &data.unwrap()), + Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => + shutdown(&mut stream, &endpoint), + Ok(_) => eprintln!("We don't handle this PacketType"), + Err(e) => eprintln!("Error parsing json: {}", e.to_string()), }; } } @@ -51,6 +44,7 @@ fn register_with_meta_server(endpoint: &String) { ip: String::from(split[0]), port: from_str(split[1]).unwrap() }) .unwrap()), + data: None, }) .unwrap(); println!("Registered myself"); @@ -60,9 +54,17 @@ fn register_with_meta_server(endpoint: &String) { println!("{:?}", result); } -//fn put(stream: &mut TcpStream, json: &String, files: &mut Vec) { -// let files: PutFiles = serde_json::from_str(json).unwrap(); -//} +fn put(stream: &mut TcpStream, json: &String, data: &Vec) { + let chunk_id: Chunk = serde_json::from_str(json).unwrap(); + println!("CId: {:?}", chunk_id); + println!("Data Amount: {:?}", data.len()); + let mut copy = File::create(format!("{}_{}", chunk_id.filename, chunk_id.id)).unwrap(); + copy.write_all(&data[..]).unwrap(); +} + +fn get(stream: &mut TcpStream, json: &String, data: &Vec) { +// let files: String = serde_json::from_str(json).unwrap(); +} fn shutdown(stream: &mut TcpStream, endpoint: &String) { let mut stream = TcpStream::connect("localhost:6770").unwrap(); @@ -77,6 +79,7 @@ fn shutdown(stream: &mut TcpStream, endpoint: &String) { ip: String::from(split[0]), port: from_str(split[1]).unwrap() }) .unwrap()), + data: None, }) .unwrap(); println!("Unregistered myself"); diff --git a/src/bin/ls.rs b/src/bin/ls.rs index 8aa8434..6df2a76 100644 --- a/src/bin/ls.rs +++ b/src/bin/ls.rs @@ -19,6 +19,7 @@ fn main() { &Packet { p_type: PacketType::ListFiles, json: None, + data: None, }) .unwrap(); stream.flush().unwrap(); diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index c5652ba..e483b25 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -37,14 +37,15 @@ fn main() { } fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) { - let file: GetFile = serde_json::from_str(message).unwrap(); - let file_info = get_file_info(&conn, &file.name); + let filename: String = serde_json::from_str(message).unwrap(); + let file_info = get_file_info(&conn, &filename); if file_info.is_none() { match serde_json::to_writer( stream, &Packet { p_type: PacketType::Error, json: Some(String::from("File not found")), + data: None, }) { Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Err(e) => println!("{}", e), @@ -63,28 +64,36 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) { } match serde_json::to_writer( stream, - &Packet { p_type: PacketType::Success, json: Some(serde_json::to_string(&nodes).unwrap()) }) { + &Packet { + p_type: PacketType::Success, + json: Some(serde_json::to_string(&nodes).unwrap()), + data: None, + }) { Ok(_) => println!("{}", "Sent nodes with chunks"), Err(e) => println!("{}", e), }; } fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { - let file: PutFile = serde_json::from_str(message).unwrap(); - let file_already_exists = add_file(&conn, &file.name, file.size as i32); + let file: AddFile = serde_json::from_str(message).unwrap(); +// let file_already_exists = add_file(&conn, &file.name, file.size as i32); + let file_already_exists = false; if file_already_exists { match serde_json::to_writer( stream, &Packet { p_type: PacketType::Error, json: Some(String::from("File already exists, please remove before re-uploading")), + data: None, }) { Ok(_) => println!("{}", "Copy client attempted to add an existing file"), Err(e) => println!("{}", e), }; return; } - let file_info = get_file_info(&conn, &file.name).unwrap(); +// let file_info = get_file_info(&conn, &file.name).unwrap(); + let file_info = INode { id: 1, name: file.name, size: file.size }; + println!("{:?}", file_info); let mut blocks: Vec = Vec::new(); let mut nodes: Vec = Vec::new(); let dnodes = get_data_nodes(&conn); @@ -103,10 +112,14 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { chunk_id: uuid.clone(), }); } - add_blocks_to_inode(&conn, file_info.id, &blocks); +// add_blocks_to_inode(&conn, file_info.id, &blocks); match serde_json::to_writer( stream, - &Packet { p_type: PacketType::Success, json: Some(serde_json::to_string(&nodes).unwrap()) }) { + &Packet { + p_type: PacketType::Success, + json: Some(serde_json::to_string(&nodes).unwrap()), + data: None, + }) { Ok(_) => println!("{}", "Sent nodes with chunks"), Err(e) => println!("{}", e), }; @@ -134,7 +147,11 @@ fn node_registration(stream: &mut TcpStream, conn: &Connection, json: &String) { } fn report_success(stream: &mut TcpStream, message: &str) { - match serde_json::to_writer(stream, &Packet { p_type: PacketType::Success, json: None }) { + match serde_json::to_writer(stream, &Packet { + p_type: PacketType::Success, + json: None, + data: None, + }) { Ok(_) => println!("{}", message), Err(e) => println!("{}", e), }; diff --git a/src/lib.rs b/src/lib.rs index 7ec4008..207ef2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ pub enum PacketType { pub struct Packet { pub p_type: PacketType, pub json: Option, + pub data: Option>, } #[derive(Serialize, Deserialize, Debug)] @@ -43,7 +44,7 @@ pub struct NodeRegistration { } #[derive(Serialize, Deserialize, Debug)] -pub struct PutFile { +pub struct AddFile { pub name: String, pub size: u32 } @@ -56,13 +57,11 @@ pub struct AvailableNodes { } #[derive(Serialize, Deserialize, Debug)] -pub struct GetFile { - pub name: String, +pub struct Chunk { + pub id: String, + pub filename: String, } -#[derive(Serialize, Deserialize, Debug)] -pub struct AddDataBlocks {} - #[derive(Debug)] pub struct DataNode { pub id: u32,