This commit is contained in:
Joseph Ferano 2018-12-18 20:53:56 -04:00
parent 6b792a95ac
commit 0f2582c37a
2 changed files with 8 additions and 8 deletions

View File

@ -77,7 +77,7 @@ fn send_file_to_data_nodes(
json: Some(serde_json::to_string(&chunk).unwrap()), json: Some(serde_json::to_string(&chunk).unwrap()),
}).unwrap(); }).unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.write(chunks[node.chunk_index as usize]).unwrap(); stream.write_all(chunks[node.chunk_index as usize]).unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
} }

View File

@ -24,7 +24,7 @@ fn main() {
for stream in listener.incoming() { for stream in listener.incoming() {
let mut stream = stream.unwrap(); let mut stream = stream.unwrap();
match serde_json::from_reader(&mut stream) { match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() {
Ok(Packet { p_type: PacketType::GetFile, json }) => { Ok(Packet { p_type: PacketType::GetFile, json }) => {
send_chunk(&data_path, &mut stream, &json.unwrap()); send_chunk(&data_path, &mut stream, &json.unwrap());
stream.flush().unwrap(); stream.flush().unwrap();
@ -35,7 +35,7 @@ fn main() {
receive_chunk(&mut stream, &data_path, &json.unwrap()); receive_chunk(&mut stream, &data_path, &json.unwrap());
}, },
Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) =>
shutdown(&mut stream, &metadata_endpoint, &node_endpoint), shutdown(&metadata_endpoint, &node_endpoint),
Ok(_) => eprintln!("We don't handle this PacketType"), Ok(_) => eprintln!("We don't handle this PacketType"),
Err(e) => eprintln!("Error parsing json: {}", e.to_string()), Err(e) => eprintln!("Error parsing json: {}", e.to_string()),
}; };
@ -44,12 +44,12 @@ fn main() {
fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) { fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
let mut buf = [0u8; 1]; let mut buf = [0u8; 8];
let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index);
let mut copy = BufWriter::new(File::create(filepath).unwrap()); let mut copy = BufWriter::new(File::create(filepath).unwrap());
for i in 0..chunk.file_size as usize { for _ in 0..(chunk.file_size / 8 + 1) as usize {
stream.read(&mut buf).unwrap(); stream.read(&mut buf).unwrap();
copy.write(&buf).unwrap(); copy.write_all(&buf).unwrap();
copy.flush().unwrap(); copy.flush().unwrap();
} }
} }
@ -58,7 +58,7 @@ fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
println!("{}", chunk.filename); println!("{}", chunk.filename);
match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) { match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) {
Ok(f) => { Ok(_) => {
serde_json::to_writer( serde_json::to_writer(
stream, stream,
&Packet { &Packet {
@ -104,7 +104,7 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String)
println!("{:?}", result); println!("{:?}", result);
} }
fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: &String) { fn shutdown(metadata_endpoint: &String, node_endpoint: &String) {
let mut stream = TcpStream::connect(&metadata_endpoint).unwrap(); let mut stream = TcpStream::connect(&metadata_endpoint).unwrap();
let split: Vec<&str> = node_endpoint.split(":").collect(); let split: Vec<&str> = node_endpoint.split(":").collect();
serde_json::to_writer( serde_json::to_writer(