diff --git a/.gitignore b/.gitignore index 6b53060..7c6e94c 100644 --- a/.gitignore +++ b/.gitignore @@ -8,5 +8,6 @@ *.db dfs_skel/ venv/ -data_node/ +dn1/ +dn2/ copy_dir/ diff --git a/pug.jpg b/pug.jpg new file mode 100644 index 0000000..b5d3978 Binary files /dev/null and b/pug.jpg differ diff --git a/src/bin/copy.rs b/src/bin/copy.rs index 3d6ba69..9f2cd3b 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -58,22 +58,27 @@ fn send_file_to_data_nodes( nodes: &Vec, file: &Vec) { - let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port); - let mut stream = TcpStream::connect(endpoint).unwrap(); + let div: usize = ((file.len() as f32) / (nodes.len() as f32)).ceil() as usize; + let chunks: Vec<_> = file.chunks(div).collect(); println!("Going to send a file! Bytes {}", file.len()); - let chunk = Chunk { - index: nodes[0].chunk_index, - filename: filename.clone(), - }; - let packet = serde_json::to_writer( - &mut stream, - &Packet { - p_type: PacketType::PutFile, - json: Some(serde_json::to_string(&chunk).unwrap()), - data: Some(file.clone()), - }).unwrap(); - stream.flush().unwrap(); - stream.shutdown(Shutdown::Write).unwrap(); + for node in nodes { + let endpoint = format!("{}:{}", node.ip, node.port); + let mut stream = TcpStream::connect(endpoint).unwrap(); + let chunk = Chunk { + index: node.chunk_index, + filename: filename.clone(), + }; + let packet = serde_json::to_writer( + &mut stream, + &Packet { + p_type: PacketType::PutFile, + json: Some(serde_json::to_string(&chunk).unwrap()), + data: Some(chunks[node.chunk_index as usize].to_vec()), +// data: None, + }).unwrap(); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); + } } fn get_file_from_data_nodes( @@ -81,35 +86,44 @@ fn get_file_from_data_nodes( filename: &String, nodes: &Vec) { - let chunk = Chunk { - index: nodes[0].chunk_index, - filename: filename.clone(), - }; - let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port); - let mut stream = TcpStream::connect(endpoint).unwrap(); - let packet = serde_json::to_writer( - &stream, - &Packet { - p_type: PacketType::GetFile, - json: Some(serde_json::to_string(&chunk).unwrap()), - data: None, - }).unwrap(); - stream.flush().unwrap(); - stream.shutdown(Shutdown::Write).unwrap(); - match serde_json::from_reader(stream) { - Ok(Packet { p_type: PacketType::GetFile, json, data }) => { - let data = data.unwrap(); - let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); - // TODO: Here we have to rebuild the chunks - let mut copy = File::create(destination_path).unwrap(); - copy.write_all(&data[..]).unwrap(); + let mut chunks: Vec> = Vec::with_capacity(nodes.len()); + let mut successful = 0; + for node in nodes { + let chunk = Chunk { + index: node.chunk_index, + filename: filename.clone(), + }; + let endpoint = format!("{}:{}", node.ip, node.port); + let mut stream = TcpStream::connect(endpoint).unwrap(); + let packet = serde_json::to_writer( + &stream, + &Packet { + p_type: PacketType::GetFile, + json: Some(serde_json::to_string(&chunk).unwrap()), + data: None, + }).unwrap(); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); + match serde_json::from_reader(stream) { + Ok(Packet { p_type: PacketType::GetFile, json, data }) => { + let data = data.unwrap(); + let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); + chunks.insert(chunk.index as usize, data); + successful += 1; + } + Ok(Packet { p_type: PacketType::Error, json, .. }) => { + eprintln!("Data Node Server Error: {}", &json.unwrap()); + } + Ok(_) => {} + Err(e) => eprintln!("Error parsing json {}", e.to_string()), + }; + } + if successful == nodes.len() { + let mut copy = File::create(destination_path).unwrap(); + for chunk in chunks { + copy.write(&chunk[..]).unwrap(); } - Ok(Packet { p_type: PacketType::Error, json, .. }) => { - eprintln!("Data Node Server Error: {}", &json.unwrap()); - } - Ok(_) => {} - Err(e) => eprintln!("Error parsing json {}", e.to_string()), - }; + } } #[derive(Debug)] diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index 746fdec..6c2cf11 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -25,12 +25,12 @@ fn main() { let mut stream = stream.unwrap(); match serde_json::from_reader(&mut stream) { Ok(Packet { p_type: PacketType::GetFile, json, .. }) => { - send_file(&data_path, &mut stream, &json.unwrap()); + send_chunk(&data_path, &mut stream, &json.unwrap()); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); } Ok(Packet { p_type: PacketType::PutFile, json, data, }) => - receive_file(&data_path, &json.unwrap(), &data.unwrap()), + receive_chunk(&data_path, &json.unwrap(), &data.unwrap()), Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => shutdown(&mut stream, &metadata_endpoint, &node_endpoint), Ok(_) => eprintln!("We don't handle this PacketType"), @@ -39,15 +39,14 @@ fn main() { } } -fn receive_file(base_path: &String, json: &String, data: &Vec) { +fn receive_chunk(base_path: &String, json: &String, data: &Vec) { let chunk: Chunk = serde_json::from_str(json).unwrap(); let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); - println!("{}", filepath); let mut copy = File::create(filepath).unwrap(); copy.write_all(&data[..]).unwrap(); } -fn send_file(base_path: &String, stream: &mut TcpStream, json: &String) { +fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) { let chunk: Chunk = serde_json::from_str(json).unwrap(); println!("{}", chunk.filename); match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) {