Sending and receiving files now writes correctly

This commit is contained in:
Joseph Ferano 2018-12-16 21:01:14 -04:00
parent 1dafc23828
commit c669fa796b
2 changed files with 19 additions and 15 deletions

View File

@ -12,13 +12,13 @@ use std::fs;
fn main() { fn main() {
let args = get_cli_args(); let args = get_cli_args();
let file = fs::read(&args.filename).expect("File not found!");
let size = file.len();
let mut stream = TcpStream::connect(args.endpoint).unwrap(); let mut stream = TcpStream::connect(args.endpoint).unwrap();
let packet_type; let packet_type;
let json; let json;
if args.is_copy_to_dfs { if args.is_copy_to_dfs {
packet_type = PacketType::RequestWrite; packet_type = PacketType::RequestWrite;
let file = fs::read(&args.filename).unwrap();
let size = file.len();
println!("Requesting Write of {}", args.filepath); println!("Requesting Write of {}", args.filepath);
json = Some(serde_json::to_string( json = Some(serde_json::to_string(
&AddFile { name: args.filepath.clone(), size: size as u32, }).unwrap()) &AddFile { name: args.filepath.clone(), size: size as u32, }).unwrap())
@ -44,10 +44,12 @@ fn main() {
Err(e) => eprintln!("Error parsing json {}", e.to_string()), Err(e) => eprintln!("Error parsing json {}", e.to_string()),
}; };
let filename = &args.filepath; let filename = &args.filepath;
let destination = &args.filename;
if args.is_copy_to_dfs { if args.is_copy_to_dfs {
let file = fs::read(&args.filename).unwrap();
nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file)); nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file));
} else { } else {
nodes.map(|ns| get_file_from_data_nodes(&filename, &ns)); nodes.map(|ns| get_file_from_data_nodes(&destination, &filename, &ns));
} }
} }
@ -69,12 +71,13 @@ fn send_file_to_data_nodes(filename: &String, nodes: &Vec<AvailableNodes>, file:
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
} }
fn get_file_from_data_nodes(filename: &String, nodes: &Vec<AvailableNodes>) { fn get_file_from_data_nodes(destination_path: &String, filename: &String, nodes: &Vec<AvailableNodes>) {
let chunk = Chunk { let chunk = Chunk {
index: nodes[0].chunk_index, index: nodes[0].chunk_index,
filename: filename.clone(), filename: filename.clone(),
}; };
let mut stream = TcpStream::connect("localhost:6771").unwrap(); let mut stream = TcpStream::connect("localhost:6771").unwrap();
println!("The filename is {}", filename);
let packet = serde_json::to_writer( let packet = serde_json::to_writer(
&stream, &stream,
&Packet { &Packet {
@ -89,7 +92,7 @@ fn get_file_from_data_nodes(filename: &String, nodes: &Vec<AvailableNodes>) {
let data = data.unwrap(); let data = data.unwrap();
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
// TODO: Here we have to rebuild the chunks // TODO: Here we have to rebuild the chunks
let mut copy = File::create(chunk.filename).unwrap(); let mut copy = File::create(destination_path).unwrap();
copy.write_all(&data[..]).unwrap(); copy.write_all(&data[..]).unwrap();
}, },
Ok(Packet { p_type: PacketType::Error, json, .. }) => { Ok(Packet { p_type: PacketType::Error, json, .. }) => {
@ -140,6 +143,8 @@ pub fn get_cli_args() -> CliArgs {
endpoint = format!("{}:{}", splits[0], splits[1]); endpoint = format!("{}:{}", splits[0], splits[1]);
filepath = String::from(splits[2]); filepath = String::from(splits[2]);
CliArgs { endpoint, filepath, filename, is_copy_to_dfs } let a = CliArgs { endpoint, filepath, filename, is_copy_to_dfs };
println!("{:?}", a);
a
} }

View File

@ -21,8 +21,8 @@ fn main() {
for stream in listener.incoming() { for stream in listener.incoming() {
let mut stream = stream.unwrap(); let mut stream = stream.unwrap();
match serde_json::from_reader(&mut stream) { match serde_json::from_reader(&mut stream) {
Ok(Packet { p_type: PacketType::GetFile, json, data, }) => { Ok(Packet { p_type: PacketType::GetFile, json, .. }) => {
send_file(&mut stream, &json.unwrap(), &data.unwrap()); send_file(&mut stream, &json.unwrap());
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
} }
@ -42,18 +42,19 @@ fn receive_file(json: &String, data: &Vec<u8>) {
copy.write_all(&data[..]).unwrap(); copy.write_all(&data[..]).unwrap();
} }
fn send_file(stream: &mut TcpStream, json: &String, data: &Vec<u8>) { fn send_file(stream: &mut TcpStream, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
match fs::read(&chunk.filename) { println!("{}", chunk.filename);
match fs::read(format!("{}_{}", &chunk.filename, &chunk.index)) {
Ok(f) => { Ok(f) => {
let packet = serde_json::to_writer( serde_json::to_writer(
stream, stream,
&Packet { &Packet {
p_type: PacketType::GetFile, p_type: PacketType::GetFile,
json: Some(json.clone()), json: Some(json.clone()),
data: Some(Vec::from(f)), data: Some(Vec::from(f)),
}).unwrap(); }).unwrap();
} },
Err(e) => { Err(e) => {
match serde_json::to_writer( match serde_json::to_writer(
stream, stream,
@ -65,10 +66,8 @@ fn send_file(stream: &mut TcpStream, json: &String, data: &Vec<u8>) {
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
} }
} },
}; };
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
} }
fn register_with_meta_server(endpoint: &String) { fn register_with_meta_server(endpoint: &String) {