From 99cbb4166b11a68a64d8d9babbb0ca153381c45c Mon Sep 17 00:00:00 2001 From: Joseph Ferano Date: Thu, 20 Dec 2018 14:25:05 -0400 Subject: [PATCH] Getting node transfer working --- src/bin/copy.rs | 33 ++++++++++++++++++--------------- src/bin/data_node.rs | 17 +++++++++++------ 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/src/bin/copy.rs b/src/bin/copy.rs index f62bad1..b50cca1 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -9,6 +9,7 @@ use std::io::Write; use std::fs::File; use std::fs; use std::io::Read; +use std::io::BufWriter; fn main() { let args = get_cli_args(); @@ -49,7 +50,7 @@ fn main() { let file = fs::read(&args.filename).unwrap(); nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file)); } else { - nodes.map(|ns| get_file_from_data_nodes(&destination, &filename, &ns)); + nodes.map(|mut ns| get_file_from_data_nodes(&destination, &filename, &mut ns)); } } @@ -86,15 +87,15 @@ fn send_file_to_data_nodes( fn get_file_from_data_nodes( destination_path: &String, filename: &String, - nodes: &Vec) + nodes: &mut Vec) { - let mut chunks: Vec> = Vec::with_capacity(nodes.len()); - let mut successful = 0; + nodes.sort_by_key(|n| n.chunk_index); + let mut file = BufWriter::new(File::create(destination_path).unwrap()); for node in nodes { let chunk = Chunk { index: node.chunk_index, filename: filename.clone(), - file_size: 128, + file_size: 0, }; let endpoint = format!("{}:{}", node.ip, node.port); let mut stream = TcpStream::connect(endpoint).unwrap(); @@ -106,13 +107,11 @@ fn get_file_from_data_nodes( }).unwrap(); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); - match serde_json::from_reader(stream) { + match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() { Ok(Packet { p_type: PacketType::GetFile, json, }) => { -// let data = data.unwrap(); let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); -// chunks.insert(chunk.index as usize, data); - successful += 1; - } + receive_chunk(&mut stream, &chunk, &mut file); + }, Ok(Packet { p_type: PacketType::Error, json, .. }) => { eprintln!("Data Node Server Error: {}", &json.unwrap()); } @@ -120,14 +119,18 @@ fn get_file_from_data_nodes( Err(e) => eprintln!("Error parsing json {}", e.to_string()), }; } - if successful == nodes.len() { - let mut copy = File::create(destination_path).unwrap(); - for chunk in chunks { - copy.write(&chunk[..]).unwrap(); - } +} + +fn receive_chunk(stream: &mut TcpStream, chunk: &Chunk, chunk_buf: &mut BufWriter) { + let mut buf = [0u8; 256]; + for _ in 0..(chunk.file_size / 256 + 1) as usize { + stream.read(&mut buf).unwrap(); + chunk_buf.write_all(&buf).unwrap(); + chunk_buf.flush().unwrap(); } } + #[derive(Debug)] pub struct CliArgs { pub endpoint: String, diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index 8722228..f74aac5 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -44,10 +44,10 @@ fn main() { fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) { let chunk: Chunk = serde_json::from_str(json).unwrap(); - let mut buf = [0u8; 8]; + let mut buf = [0u8; 256]; let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); let mut copy = BufWriter::new(File::create(filepath).unwrap()); - for _ in 0..(chunk.file_size / 8 + 1) as usize { + for _ in 0..(chunk.file_size / 256 + 1) as usize { stream.read(&mut buf).unwrap(); copy.write_all(&buf).unwrap(); copy.flush().unwrap(); @@ -56,15 +56,20 @@ fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) { fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) { let chunk: Chunk = serde_json::from_str(json).unwrap(); - println!("{}", chunk.filename); + println!("Sending {}", chunk.filename); match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) { - Ok(_) => { + Ok(file) => { serde_json::to_writer( - stream, + &mut *stream, &Packet { p_type: PacketType::GetFile, - json: Some(json.clone()), + json: Some(serde_json::to_string( + &Chunk { file_size: file.len() as i64, ..chunk}).unwrap()), }).unwrap(); + stream.flush().unwrap(); + stream.write_all(&file).unwrap(); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); } Err(e) => { match serde_json::to_writer(