diff --git a/src/bin/copy.rs b/src/bin/copy.rs index 530675f..03d0635 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -8,6 +8,7 @@ use std::net::{TcpStream, Shutdown}; use std::io::Write; use std::fs::File; use std::fs; +use std::io::Read; fn main() { let args = get_cli_args(); @@ -26,7 +27,7 @@ fn main() { println!("Requesting Read of {}", args.filepath); json = Some(serde_json::to_string::(&args.filepath).unwrap()) } - serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, data: None }) + serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, }) .unwrap(); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); @@ -59,26 +60,27 @@ fn send_file_to_data_nodes( { let div: usize = ((file.len() as f32) / (nodes.len() as f32)).ceil() as usize; let chunks: Vec<_> = file.chunks(div).collect(); - println!("Going to send a file! Bytes {}", file.len()); for node in nodes { let endpoint = format!("{}:{}", node.ip, node.port); - let mut stream = TcpStream::connect(endpoint).unwrap(); + println!("{}", endpoint); + let mut stream = TcpStream::connect(&endpoint).unwrap(); + let file_size = chunks[node.chunk_index as usize].len() as i64; let chunk = Chunk { index: node.chunk_index, filename: filename.clone(), + file_size, }; - println!("Sending"); serde_json::to_writer( &mut stream, &Packet { p_type: PacketType::PutFile, json: Some(serde_json::to_string(&chunk).unwrap()), - data: Some(chunks[node.chunk_index as usize].to_vec()), -// data: None, }).unwrap(); + stream.flush().unwrap(); + stream.write(chunks[node.chunk_index as usize]).unwrap(); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); } -// stream.flush().unwrap(); -// stream.shutdown(Shutdown::Write).unwrap(); } fn get_file_from_data_nodes( @@ -92,6 +94,7 @@ fn get_file_from_data_nodes( let chunk = Chunk { index: node.chunk_index, filename: filename.clone(), + file_size: 128, }; let endpoint = format!("{}:{}", node.ip, node.port); let mut stream = TcpStream::connect(endpoint).unwrap(); @@ -100,15 +103,14 @@ fn get_file_from_data_nodes( &Packet { p_type: PacketType::GetFile, json: Some(serde_json::to_string(&chunk).unwrap()), - data: None, }).unwrap(); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); match serde_json::from_reader(stream) { - Ok(Packet { p_type: PacketType::GetFile, json, data }) => { - let data = data.unwrap(); + Ok(Packet { p_type: PacketType::GetFile, json, }) => { +// let data = data.unwrap(); let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); - chunks.insert(chunk.index as usize, data); +// chunks.insert(chunk.index as usize, data); successful += 1; } Ok(Packet { p_type: PacketType::Error, json, .. }) => { diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index 6c2cf11..dd7dfa0 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -12,6 +12,7 @@ use serde_json::from_str; use std::fs::File; use std::fs; use std::error::Error; +use std::io::BufWriter; fn main() { let node_endpoint = parse_endpoint_from_cli(0); @@ -24,13 +25,15 @@ fn main() { for stream in listener.incoming() { let mut stream = stream.unwrap(); match serde_json::from_reader(&mut stream) { - Ok(Packet { p_type: PacketType::GetFile, json, .. }) => { + Ok(Packet { p_type: PacketType::GetFile, json }) => { send_chunk(&data_path, &mut stream, &json.unwrap()); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); } - Ok(Packet { p_type: PacketType::PutFile, json, data, }) => - receive_chunk(&data_path, &json.unwrap(), &data.unwrap()), + Ok(Packet { p_type: PacketType::PutFile, json }) => { + println!("Receiving chunk"); + receive_chunk(&mut stream, &data_path, &json.unwrap()); + }, Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => shutdown(&mut stream, &metadata_endpoint, &node_endpoint), Ok(_) => eprintln!("We don't handle this PacketType"), @@ -39,11 +42,16 @@ fn main() { } } -fn receive_chunk(base_path: &String, json: &String, data: &Vec) { +fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) { let chunk: Chunk = serde_json::from_str(json).unwrap(); + let mut buf = [0u8; 1]; let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); - let mut copy = File::create(filepath).unwrap(); - copy.write_all(&data[..]).unwrap(); + let mut copy = BufWriter::new(File::create(filepath).unwrap()); + for i in 0..chunk.file_size as usize { + stream.read(&mut buf).unwrap(); + copy.write(&buf).unwrap(); + copy.flush().unwrap(); + } } fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) { @@ -56,21 +64,19 @@ fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) { &Packet { p_type: PacketType::GetFile, json: Some(json.clone()), - data: Some(Vec::from(f)), }).unwrap(); - }, + } Err(e) => { match serde_json::to_writer( stream, &Packet { p_type: PacketType::Error, json: Some(String::from(e.description())), - data: None, }) { Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Err(e) => println!("{}", e), } - }, + } }; } @@ -89,7 +95,6 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String) port: from_str(split[1]).unwrap(), }) .unwrap()), - data: None, }) .unwrap(); println!("Registered myself"); @@ -113,7 +118,6 @@ fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: & port: from_str(split[1]).unwrap(), }) .unwrap()), - data: None, }) .unwrap(); println!("Unregistered myself"); diff --git a/src/bin/ls.rs b/src/bin/ls.rs index 6df2a76..8aa8434 100644 --- a/src/bin/ls.rs +++ b/src/bin/ls.rs @@ -19,7 +19,6 @@ fn main() { &Packet { p_type: PacketType::ListFiles, json: None, - data: None, }) .unwrap(); stream.flush().unwrap(); diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index f891ca3..f1cd952 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -44,7 +44,6 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) { &Packet { p_type: PacketType::Error, json: Some(String::from("File not found")), - data: None, }) { Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Err(e) => println!("{}", e), @@ -66,7 +65,6 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) { &Packet { p_type: PacketType::Success, json: Some(serde_json::to_string(&nodes).unwrap()), - data: None, }) { Ok(_) => println!("{}", "Sent nodes with chunks"), Err(e) => println!("{}", e), @@ -82,7 +80,6 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { &Packet { p_type: PacketType::Error, json: Some(String::from("File already exists, please remove before re-uploading")), - data: None, }) { Ok(_) => println!("{}", "Copy client attempted to add an existing file"), Err(e) => println!("{}", e), @@ -113,7 +110,6 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { &Packet { p_type: PacketType::Success, json: Some(serde_json::to_string(&nodes).unwrap()), - data: None, }) { Ok(_) => println!("{}", "Sent nodes with chunks"), Err(e) => println!("{}", e), @@ -145,7 +141,6 @@ fn report_success(stream: &mut TcpStream, message: &str) { match serde_json::to_writer(stream, &Packet { p_type: PacketType::Success, json: None, - data: None, }) { Ok(_) => println!("{}", message), Err(e) => println!("{}", e), diff --git a/src/lib.rs b/src/lib.rs index bbf4b99..268e4a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,6 @@ pub enum PacketType { pub struct Packet { pub p_type: PacketType, pub json: Option, - pub data: Option>, } #[derive(Serialize, Deserialize, Debug)] @@ -60,6 +59,7 @@ pub struct AvailableNodes { pub struct Chunk { pub index: u32, pub filename: String, + pub file_size: i64, } #[derive(Debug)]