From e70859ef1720f139febc00c00c367edf39970de6 Mon Sep 17 00:00:00 2001 From: Joseph Ferano Date: Thu, 20 Dec 2018 21:01:39 -0400 Subject: [PATCH] Reverting to 1 byte buf, it works --- src/bin/copy.rs | 11 ++++++----- src/bin/data_node.rs | 8 ++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/bin/copy.rs b/src/bin/copy.rs index b50cca1..48762d3 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -78,7 +78,7 @@ fn send_file_to_data_nodes( json: Some(serde_json::to_string(&chunk).unwrap()), }).unwrap(); stream.flush().unwrap(); - stream.write_all(chunks[node.chunk_index as usize]).unwrap(); + stream.write(chunks[node.chunk_index as usize]).unwrap(); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); } @@ -98,6 +98,7 @@ fn get_file_from_data_nodes( file_size: 0, }; let endpoint = format!("{}:{}", node.ip, node.port); + println!("Connecting to endpoint: {}", endpoint); let mut stream = TcpStream::connect(endpoint).unwrap(); serde_json::to_writer( &stream, @@ -106,10 +107,10 @@ fn get_file_from_data_nodes( json: Some(serde_json::to_string(&chunk).unwrap()), }).unwrap(); stream.flush().unwrap(); - stream.shutdown(Shutdown::Write).unwrap(); match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() { Ok(Packet { p_type: PacketType::GetFile, json, }) => { let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); + println!("Getting chunk: {:?}", chunk); receive_chunk(&mut stream, &chunk, &mut file); }, Ok(Packet { p_type: PacketType::Error, json, .. }) => { @@ -122,10 +123,10 @@ fn get_file_from_data_nodes( } fn receive_chunk(stream: &mut TcpStream, chunk: &Chunk, chunk_buf: &mut BufWriter) { - let mut buf = [0u8; 256]; - for _ in 0..(chunk.file_size / 256 + 1) as usize { + let mut buf = [0u8; 1]; + for _ in 0..(chunk.file_size) as usize { stream.read(&mut buf).unwrap(); - chunk_buf.write_all(&buf).unwrap(); + chunk_buf.write(&buf).unwrap(); chunk_buf.flush().unwrap(); } } diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index f74aac5..efbf9c0 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -44,12 +44,12 @@ fn main() { fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) { let chunk: Chunk = serde_json::from_str(json).unwrap(); - let mut buf = [0u8; 256]; + let mut buf = [0u8; 1]; let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); let mut copy = BufWriter::new(File::create(filepath).unwrap()); - for _ in 0..(chunk.file_size / 256 + 1) as usize { + for _ in 0..(chunk.file_size) as usize { stream.read(&mut buf).unwrap(); - copy.write_all(&buf).unwrap(); + copy.write(&buf).unwrap(); copy.flush().unwrap(); } } @@ -67,7 +67,7 @@ fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) { &Chunk { file_size: file.len() as i64, ..chunk}).unwrap()), }).unwrap(); stream.flush().unwrap(); - stream.write_all(&file).unwrap(); + stream.write(&file).unwrap(); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); }