Getting node transfer working

This commit is contained in:
Joseph Ferano 2018-12-20 14:25:05 -04:00
parent 0f2582c37a
commit 99cbb4166b
2 changed files with 29 additions and 21 deletions

View File

@ -9,6 +9,7 @@ use std::io::Write;
use std::fs::File; use std::fs::File;
use std::fs; use std::fs;
use std::io::Read; use std::io::Read;
use std::io::BufWriter;
fn main() { fn main() {
let args = get_cli_args(); let args = get_cli_args();
@ -49,7 +50,7 @@ fn main() {
let file = fs::read(&args.filename).unwrap(); let file = fs::read(&args.filename).unwrap();
nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file)); nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file));
} else { } else {
nodes.map(|ns| get_file_from_data_nodes(&destination, &filename, &ns)); nodes.map(|mut ns| get_file_from_data_nodes(&destination, &filename, &mut ns));
} }
} }
@ -86,15 +87,15 @@ fn send_file_to_data_nodes(
fn get_file_from_data_nodes( fn get_file_from_data_nodes(
destination_path: &String, destination_path: &String,
filename: &String, filename: &String,
nodes: &Vec<AvailableNodes>) nodes: &mut Vec<AvailableNodes>)
{ {
let mut chunks: Vec<Vec<u8>> = Vec::with_capacity(nodes.len()); nodes.sort_by_key(|n| n.chunk_index);
let mut successful = 0; let mut file = BufWriter::new(File::create(destination_path).unwrap());
for node in nodes { for node in nodes {
let chunk = Chunk { let chunk = Chunk {
index: node.chunk_index, index: node.chunk_index,
filename: filename.clone(), filename: filename.clone(),
file_size: 128, file_size: 0,
}; };
let endpoint = format!("{}:{}", node.ip, node.port); let endpoint = format!("{}:{}", node.ip, node.port);
let mut stream = TcpStream::connect(endpoint).unwrap(); let mut stream = TcpStream::connect(endpoint).unwrap();
@ -106,13 +107,11 @@ fn get_file_from_data_nodes(
}).unwrap(); }).unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
match serde_json::from_reader(stream) { match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() {
Ok(Packet { p_type: PacketType::GetFile, json, }) => { Ok(Packet { p_type: PacketType::GetFile, json, }) => {
// let data = data.unwrap();
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
// chunks.insert(chunk.index as usize, data); receive_chunk(&mut stream, &chunk, &mut file);
successful += 1; },
}
Ok(Packet { p_type: PacketType::Error, json, .. }) => { Ok(Packet { p_type: PacketType::Error, json, .. }) => {
eprintln!("Data Node Server Error: {}", &json.unwrap()); eprintln!("Data Node Server Error: {}", &json.unwrap());
} }
@ -120,14 +119,18 @@ fn get_file_from_data_nodes(
Err(e) => eprintln!("Error parsing json {}", e.to_string()), Err(e) => eprintln!("Error parsing json {}", e.to_string()),
}; };
} }
if successful == nodes.len() { }
let mut copy = File::create(destination_path).unwrap();
for chunk in chunks { fn receive_chunk(stream: &mut TcpStream, chunk: &Chunk, chunk_buf: &mut BufWriter<File>) {
copy.write(&chunk[..]).unwrap(); let mut buf = [0u8; 256];
} for _ in 0..(chunk.file_size / 256 + 1) as usize {
stream.read(&mut buf).unwrap();
chunk_buf.write_all(&buf).unwrap();
chunk_buf.flush().unwrap();
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct CliArgs { pub struct CliArgs {
pub endpoint: String, pub endpoint: String,

View File

@ -44,10 +44,10 @@ fn main() {
fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) { fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
let mut buf = [0u8; 8]; let mut buf = [0u8; 256];
let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index);
let mut copy = BufWriter::new(File::create(filepath).unwrap()); let mut copy = BufWriter::new(File::create(filepath).unwrap());
for _ in 0..(chunk.file_size / 8 + 1) as usize { for _ in 0..(chunk.file_size / 256 + 1) as usize {
stream.read(&mut buf).unwrap(); stream.read(&mut buf).unwrap();
copy.write_all(&buf).unwrap(); copy.write_all(&buf).unwrap();
copy.flush().unwrap(); copy.flush().unwrap();
@ -56,15 +56,20 @@ fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) {
fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) { fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
println!("{}", chunk.filename); println!("Sending {}", chunk.filename);
match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) { match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) {
Ok(_) => { Ok(file) => {
serde_json::to_writer( serde_json::to_writer(
stream, &mut *stream,
&Packet { &Packet {
p_type: PacketType::GetFile, p_type: PacketType::GetFile,
json: Some(json.clone()), json: Some(serde_json::to_string(
&Chunk { file_size: file.len() as i64, ..chunk}).unwrap()),
}).unwrap(); }).unwrap();
stream.flush().unwrap();
stream.write_all(&file).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
} }
Err(e) => { Err(e) => {
match serde_json::to_writer( match serde_json::to_writer(