diff --git a/src/bin/copy.rs b/src/bin/copy.rs index da2a1eb..aadb2ad 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -12,13 +12,13 @@ use std::fs; fn main() { let args = get_cli_args(); - let file = fs::read(&args.filename).expect("File not found!"); - let size = file.len(); let mut stream = TcpStream::connect(args.endpoint).unwrap(); let packet_type; let json; if args.is_copy_to_dfs { packet_type = PacketType::RequestWrite; + let file = fs::read(&args.filename).unwrap(); + let size = file.len(); println!("Requesting Write of {}", args.filepath); json = Some(serde_json::to_string( &AddFile { name: args.filepath.clone(), size: size as u32, }).unwrap()) @@ -44,10 +44,12 @@ fn main() { Err(e) => eprintln!("Error parsing json {}", e.to_string()), }; let filename = &args.filepath; + let destination = &args.filename; if args.is_copy_to_dfs { + let file = fs::read(&args.filename).unwrap(); nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file)); } else { - nodes.map(|ns| get_file_from_data_nodes(&filename, &ns)); + nodes.map(|ns| get_file_from_data_nodes(&destination, &filename, &ns)); } } @@ -69,12 +71,13 @@ fn send_file_to_data_nodes(filename: &String, nodes: &Vec, file: stream.shutdown(Shutdown::Write).unwrap(); } -fn get_file_from_data_nodes(filename: &String, nodes: &Vec) { +fn get_file_from_data_nodes(destination_path: &String, filename: &String, nodes: &Vec) { let chunk = Chunk { index: nodes[0].chunk_index, filename: filename.clone(), }; let mut stream = TcpStream::connect("localhost:6771").unwrap(); + println!("The filename is {}", filename); let packet = serde_json::to_writer( &stream, &Packet { @@ -89,7 +92,7 @@ fn get_file_from_data_nodes(filename: &String, nodes: &Vec) { let data = data.unwrap(); let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); // TODO: Here we have to rebuild the chunks - let mut copy = File::create(chunk.filename).unwrap(); + let mut copy = File::create(destination_path).unwrap(); copy.write_all(&data[..]).unwrap(); }, Ok(Packet { p_type: PacketType::Error, json, .. }) => { @@ -140,6 +143,8 @@ pub fn get_cli_args() -> CliArgs { endpoint = format!("{}:{}", splits[0], splits[1]); filepath = String::from(splits[2]); - CliArgs { endpoint, filepath, filename, is_copy_to_dfs } + let a = CliArgs { endpoint, filepath, filename, is_copy_to_dfs }; + println!("{:?}", a); + a } diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index f7584cb..f984600 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -21,8 +21,8 @@ fn main() { for stream in listener.incoming() { let mut stream = stream.unwrap(); match serde_json::from_reader(&mut stream) { - Ok(Packet { p_type: PacketType::GetFile, json, data, }) => { - send_file(&mut stream, &json.unwrap(), &data.unwrap()); + Ok(Packet { p_type: PacketType::GetFile, json, .. }) => { + send_file(&mut stream, &json.unwrap()); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); } @@ -42,18 +42,19 @@ fn receive_file(json: &String, data: &Vec) { copy.write_all(&data[..]).unwrap(); } -fn send_file(stream: &mut TcpStream, json: &String, data: &Vec) { +fn send_file(stream: &mut TcpStream, json: &String) { let chunk: Chunk = serde_json::from_str(json).unwrap(); - match fs::read(&chunk.filename) { + println!("{}", chunk.filename); + match fs::read(format!("{}_{}", &chunk.filename, &chunk.index)) { Ok(f) => { - let packet = serde_json::to_writer( + serde_json::to_writer( stream, &Packet { p_type: PacketType::GetFile, json: Some(json.clone()), data: Some(Vec::from(f)), }).unwrap(); - } + }, Err(e) => { match serde_json::to_writer( stream, @@ -65,10 +66,8 @@ fn send_file(stream: &mut TcpStream, json: &String, data: &Vec) { Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Err(e) => println!("{}", e), } - } + }, }; - stream.flush().unwrap(); - stream.shutdown(Shutdown::Write).unwrap(); } fn register_with_meta_server(endpoint: &String) {