Reverting to 1 byte buf, it works

This commit is contained in:
Joseph Ferano 2018-12-20 21:01:39 -04:00
parent 99cbb4166b
commit e70859ef17
2 changed files with 10 additions and 9 deletions

View File

@ -78,7 +78,7 @@ fn send_file_to_data_nodes(
json: Some(serde_json::to_string(&chunk).unwrap()), json: Some(serde_json::to_string(&chunk).unwrap()),
}).unwrap(); }).unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.write_all(chunks[node.chunk_index as usize]).unwrap(); stream.write(chunks[node.chunk_index as usize]).unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
} }
@ -98,6 +98,7 @@ fn get_file_from_data_nodes(
file_size: 0, file_size: 0,
}; };
let endpoint = format!("{}:{}", node.ip, node.port); let endpoint = format!("{}:{}", node.ip, node.port);
println!("Connecting to endpoint: {}", endpoint);
let mut stream = TcpStream::connect(endpoint).unwrap(); let mut stream = TcpStream::connect(endpoint).unwrap();
serde_json::to_writer( serde_json::to_writer(
&stream, &stream,
@ -106,10 +107,10 @@ fn get_file_from_data_nodes(
json: Some(serde_json::to_string(&chunk).unwrap()), json: Some(serde_json::to_string(&chunk).unwrap()),
}).unwrap(); }).unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() { match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() {
Ok(Packet { p_type: PacketType::GetFile, json, }) => { Ok(Packet { p_type: PacketType::GetFile, json, }) => {
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
println!("Getting chunk: {:?}", chunk);
receive_chunk(&mut stream, &chunk, &mut file); receive_chunk(&mut stream, &chunk, &mut file);
}, },
Ok(Packet { p_type: PacketType::Error, json, .. }) => { Ok(Packet { p_type: PacketType::Error, json, .. }) => {
@ -122,10 +123,10 @@ fn get_file_from_data_nodes(
} }
fn receive_chunk(stream: &mut TcpStream, chunk: &Chunk, chunk_buf: &mut BufWriter<File>) { fn receive_chunk(stream: &mut TcpStream, chunk: &Chunk, chunk_buf: &mut BufWriter<File>) {
let mut buf = [0u8; 256]; let mut buf = [0u8; 1];
for _ in 0..(chunk.file_size / 256 + 1) as usize { for _ in 0..(chunk.file_size) as usize {
stream.read(&mut buf).unwrap(); stream.read(&mut buf).unwrap();
chunk_buf.write_all(&buf).unwrap(); chunk_buf.write(&buf).unwrap();
chunk_buf.flush().unwrap(); chunk_buf.flush().unwrap();
} }
} }

View File

@ -44,12 +44,12 @@ fn main() {
fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) { fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
let mut buf = [0u8; 256]; let mut buf = [0u8; 1];
let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index);
let mut copy = BufWriter::new(File::create(filepath).unwrap()); let mut copy = BufWriter::new(File::create(filepath).unwrap());
for _ in 0..(chunk.file_size / 256 + 1) as usize { for _ in 0..(chunk.file_size) as usize {
stream.read(&mut buf).unwrap(); stream.read(&mut buf).unwrap();
copy.write_all(&buf).unwrap(); copy.write(&buf).unwrap();
copy.flush().unwrap(); copy.flush().unwrap();
} }
} }
@ -67,7 +67,7 @@ fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) {
&Chunk { file_size: file.len() as i64, ..chunk}).unwrap()), &Chunk { file_size: file.len() as i64, ..chunk}).unwrap()),
}).unwrap(); }).unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.write_all(&file).unwrap(); stream.write(&file).unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
} }