Getting rid of data in the Packet struct

This commit is contained in:
Joseph Ferano 2018-12-18 19:02:35 -04:00
parent 2602a97892
commit 6b792a95ac
5 changed files with 31 additions and 31 deletions

View File

@ -8,6 +8,7 @@ use std::net::{TcpStream, Shutdown};
use std::io::Write; use std::io::Write;
use std::fs::File; use std::fs::File;
use std::fs; use std::fs;
use std::io::Read;
fn main() { fn main() {
let args = get_cli_args(); let args = get_cli_args();
@ -26,7 +27,7 @@ fn main() {
println!("Requesting Read of {}", args.filepath); println!("Requesting Read of {}", args.filepath);
json = Some(serde_json::to_string::<String>(&args.filepath).unwrap()) json = Some(serde_json::to_string::<String>(&args.filepath).unwrap())
} }
serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, data: None }) serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, })
.unwrap(); .unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
@ -59,26 +60,27 @@ fn send_file_to_data_nodes(
{ {
let div: usize = ((file.len() as f32) / (nodes.len() as f32)).ceil() as usize; let div: usize = ((file.len() as f32) / (nodes.len() as f32)).ceil() as usize;
let chunks: Vec<_> = file.chunks(div).collect(); let chunks: Vec<_> = file.chunks(div).collect();
println!("Going to send a file! Bytes {}", file.len());
for node in nodes { for node in nodes {
let endpoint = format!("{}:{}", node.ip, node.port); let endpoint = format!("{}:{}", node.ip, node.port);
let mut stream = TcpStream::connect(endpoint).unwrap(); println!("{}", endpoint);
let mut stream = TcpStream::connect(&endpoint).unwrap();
let file_size = chunks[node.chunk_index as usize].len() as i64;
let chunk = Chunk { let chunk = Chunk {
index: node.chunk_index, index: node.chunk_index,
filename: filename.clone(), filename: filename.clone(),
file_size,
}; };
println!("Sending");
serde_json::to_writer( serde_json::to_writer(
&mut stream, &mut stream,
&Packet { &Packet {
p_type: PacketType::PutFile, p_type: PacketType::PutFile,
json: Some(serde_json::to_string(&chunk).unwrap()), json: Some(serde_json::to_string(&chunk).unwrap()),
data: Some(chunks[node.chunk_index as usize].to_vec()),
// data: None,
}).unwrap(); }).unwrap();
stream.flush().unwrap();
stream.write(chunks[node.chunk_index as usize]).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
} }
// stream.flush().unwrap();
// stream.shutdown(Shutdown::Write).unwrap();
} }
fn get_file_from_data_nodes( fn get_file_from_data_nodes(
@ -92,6 +94,7 @@ fn get_file_from_data_nodes(
let chunk = Chunk { let chunk = Chunk {
index: node.chunk_index, index: node.chunk_index,
filename: filename.clone(), filename: filename.clone(),
file_size: 128,
}; };
let endpoint = format!("{}:{}", node.ip, node.port); let endpoint = format!("{}:{}", node.ip, node.port);
let mut stream = TcpStream::connect(endpoint).unwrap(); let mut stream = TcpStream::connect(endpoint).unwrap();
@ -100,15 +103,14 @@ fn get_file_from_data_nodes(
&Packet { &Packet {
p_type: PacketType::GetFile, p_type: PacketType::GetFile,
json: Some(serde_json::to_string(&chunk).unwrap()), json: Some(serde_json::to_string(&chunk).unwrap()),
data: None,
}).unwrap(); }).unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
match serde_json::from_reader(stream) { match serde_json::from_reader(stream) {
Ok(Packet { p_type: PacketType::GetFile, json, data }) => { Ok(Packet { p_type: PacketType::GetFile, json, }) => {
let data = data.unwrap(); // let data = data.unwrap();
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
chunks.insert(chunk.index as usize, data); // chunks.insert(chunk.index as usize, data);
successful += 1; successful += 1;
} }
Ok(Packet { p_type: PacketType::Error, json, .. }) => { Ok(Packet { p_type: PacketType::Error, json, .. }) => {

View File

@ -12,6 +12,7 @@ use serde_json::from_str;
use std::fs::File; use std::fs::File;
use std::fs; use std::fs;
use std::error::Error; use std::error::Error;
use std::io::BufWriter;
fn main() { fn main() {
let node_endpoint = parse_endpoint_from_cli(0); let node_endpoint = parse_endpoint_from_cli(0);
@ -24,13 +25,15 @@ fn main() {
for stream in listener.incoming() { for stream in listener.incoming() {
let mut stream = stream.unwrap(); let mut stream = stream.unwrap();
match serde_json::from_reader(&mut stream) { match serde_json::from_reader(&mut stream) {
Ok(Packet { p_type: PacketType::GetFile, json, .. }) => { Ok(Packet { p_type: PacketType::GetFile, json }) => {
send_chunk(&data_path, &mut stream, &json.unwrap()); send_chunk(&data_path, &mut stream, &json.unwrap());
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
} }
Ok(Packet { p_type: PacketType::PutFile, json, data, }) => Ok(Packet { p_type: PacketType::PutFile, json }) => {
receive_chunk(&data_path, &json.unwrap(), &data.unwrap()), println!("Receiving chunk");
receive_chunk(&mut stream, &data_path, &json.unwrap());
},
Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) =>
shutdown(&mut stream, &metadata_endpoint, &node_endpoint), shutdown(&mut stream, &metadata_endpoint, &node_endpoint),
Ok(_) => eprintln!("We don't handle this PacketType"), Ok(_) => eprintln!("We don't handle this PacketType"),
@ -39,11 +42,16 @@ fn main() {
} }
} }
fn receive_chunk(base_path: &String, json: &String, data: &Vec<u8>) { fn receive_chunk(stream: &mut TcpStream, base_path: &String, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
let mut buf = [0u8; 1];
let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index);
let mut copy = File::create(filepath).unwrap(); let mut copy = BufWriter::new(File::create(filepath).unwrap());
copy.write_all(&data[..]).unwrap(); for i in 0..chunk.file_size as usize {
stream.read(&mut buf).unwrap();
copy.write(&buf).unwrap();
copy.flush().unwrap();
}
} }
fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) { fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) {
@ -56,21 +64,19 @@ fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) {
&Packet { &Packet {
p_type: PacketType::GetFile, p_type: PacketType::GetFile,
json: Some(json.clone()), json: Some(json.clone()),
data: Some(Vec::from(f)),
}).unwrap(); }).unwrap();
}, }
Err(e) => { Err(e) => {
match serde_json::to_writer( match serde_json::to_writer(
stream, stream,
&Packet { &Packet {
p_type: PacketType::Error, p_type: PacketType::Error,
json: Some(String::from(e.description())), json: Some(String::from(e.description())),
data: None,
}) { }) {
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
} }
}, }
}; };
} }
@ -89,7 +95,6 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String)
port: from_str(split[1]).unwrap(), port: from_str(split[1]).unwrap(),
}) })
.unwrap()), .unwrap()),
data: None,
}) })
.unwrap(); .unwrap();
println!("Registered myself"); println!("Registered myself");
@ -113,7 +118,6 @@ fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: &
port: from_str(split[1]).unwrap(), port: from_str(split[1]).unwrap(),
}) })
.unwrap()), .unwrap()),
data: None,
}) })
.unwrap(); .unwrap();
println!("Unregistered myself"); println!("Unregistered myself");

View File

@ -19,7 +19,6 @@ fn main() {
&Packet { &Packet {
p_type: PacketType::ListFiles, p_type: PacketType::ListFiles,
json: None, json: None,
data: None,
}) })
.unwrap(); .unwrap();
stream.flush().unwrap(); stream.flush().unwrap();

View File

@ -44,7 +44,6 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Error, p_type: PacketType::Error,
json: Some(String::from("File not found")), json: Some(String::from("File not found")),
data: None,
}) { }) {
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -66,7 +65,6 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Success, p_type: PacketType::Success,
json: Some(serde_json::to_string(&nodes).unwrap()), json: Some(serde_json::to_string(&nodes).unwrap()),
data: None,
}) { }) {
Ok(_) => println!("{}", "Sent nodes with chunks"), Ok(_) => println!("{}", "Sent nodes with chunks"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -82,7 +80,6 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Error, p_type: PacketType::Error,
json: Some(String::from("File already exists, please remove before re-uploading")), json: Some(String::from("File already exists, please remove before re-uploading")),
data: None,
}) { }) {
Ok(_) => println!("{}", "Copy client attempted to add an existing file"), Ok(_) => println!("{}", "Copy client attempted to add an existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -113,7 +110,6 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Success, p_type: PacketType::Success,
json: Some(serde_json::to_string(&nodes).unwrap()), json: Some(serde_json::to_string(&nodes).unwrap()),
data: None,
}) { }) {
Ok(_) => println!("{}", "Sent nodes with chunks"), Ok(_) => println!("{}", "Sent nodes with chunks"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -145,7 +141,6 @@ fn report_success(stream: &mut TcpStream, message: &str) {
match serde_json::to_writer(stream, &Packet { match serde_json::to_writer(stream, &Packet {
p_type: PacketType::Success, p_type: PacketType::Success,
json: None, json: None,
data: None,
}) { }) {
Ok(_) => println!("{}", message), Ok(_) => println!("{}", message),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),

View File

@ -28,7 +28,6 @@ pub enum PacketType {
pub struct Packet { pub struct Packet {
pub p_type: PacketType, pub p_type: PacketType,
pub json: Option<String>, pub json: Option<String>,
pub data: Option<Vec<u8>>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -60,6 +59,7 @@ pub struct AvailableNodes {
pub struct Chunk { pub struct Chunk {
pub index: u32, pub index: u32,
pub filename: String, pub filename: String,
pub file_size: i64,
} }
#[derive(Debug)] #[derive(Debug)]