diff --git a/src/bin/copy.rs b/src/bin/copy.rs index 03d0635..f62bad1 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -77,7 +77,7 @@ fn send_file_to_data_nodes( json: Some(serde_json::to_string(&chunk).unwrap()), }).unwrap(); stream.flush().unwrap(); - stream.write(chunks[node.chunk_index as usize]).unwrap(); + stream.write_all(chunks[node.chunk_index as usize]).unwrap(); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); } diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index dd7dfa0..8722228 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -24,7 +24,7 @@ fn main() { for stream in listener.incoming() { let mut stream = stream.unwrap(); - match serde_json::from_reader(&mut stream) { + match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() { Ok(Packet { p_type: PacketType::GetFile, json }) => { send_chunk(&data_path, &mut stream, &json.unwrap()); stream.flush().unwrap(); @@ -35,7 +35,7 @@ fn main() { receive_chunk(&mut stream, &data_path, &json.unwrap()); }, Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => - shutdown(&mut stream, &metadata_endpoint, &node_endpoint), + shutdown(&metadata_endpoint, &node_endpoint), Ok(_) => eprintln!("We don't handle this PacketType"), Err(e) => eprintln!("Error parsing json: {}", e.to_string()), }; @@ -44,12 +44,12 @@ fn main() { fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) { let chunk: Chunk = serde_json::from_str(json).unwrap(); - let mut buf = [0u8; 1]; + let mut buf = [0u8; 8]; let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); let mut copy = BufWriter::new(File::create(filepath).unwrap()); - for i in 0..chunk.file_size as usize { + for _ in 0..(chunk.file_size / 8 + 1) as usize { stream.read(&mut buf).unwrap(); - copy.write(&buf).unwrap(); + copy.write_all(&buf).unwrap(); copy.flush().unwrap(); } } @@ -58,7 +58,7 @@ fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) { let chunk: Chunk = serde_json::from_str(json).unwrap(); println!("{}", chunk.filename); match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) { - Ok(f) => { + Ok(_) => { serde_json::to_writer( stream, &Packet { @@ -104,7 +104,7 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String) println!("{:?}", result); } -fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: &String) { +fn shutdown(metadata_endpoint: &String, node_endpoint: &String) { let mut stream = TcpStream::connect(&metadata_endpoint).unwrap(); let split: Vec<&str> = node_endpoint.split(":").collect(); serde_json::to_writer(