Getting chunks to work

This commit is contained in:
Joseph Ferano 2018-12-17 18:13:19 -04:00
parent 65c6bffd85
commit b363fe5585
4 changed files with 63 additions and 49 deletions

3
.gitignore vendored
View File

@ -8,5 +8,6 @@
*.db
dfs_skel/
venv/
data_node/
dn1/
dn2/
copy_dir/

BIN
pug.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

View File

@ -58,22 +58,27 @@ fn send_file_to_data_nodes(
nodes: &Vec<AvailableNodes>,
file: &Vec<u8>)
{
let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port);
let mut stream = TcpStream::connect(endpoint).unwrap();
let div: usize = ((file.len() as f32) / (nodes.len() as f32)).ceil() as usize;
let chunks: Vec<_> = file.chunks(div).collect();
println!("Going to send a file! Bytes {}", file.len());
let chunk = Chunk {
index: nodes[0].chunk_index,
filename: filename.clone(),
};
let packet = serde_json::to_writer(
&mut stream,
&Packet {
p_type: PacketType::PutFile,
json: Some(serde_json::to_string(&chunk).unwrap()),
data: Some(file.clone()),
}).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
for node in nodes {
let endpoint = format!("{}:{}", node.ip, node.port);
let mut stream = TcpStream::connect(endpoint).unwrap();
let chunk = Chunk {
index: node.chunk_index,
filename: filename.clone(),
};
let packet = serde_json::to_writer(
&mut stream,
&Packet {
p_type: PacketType::PutFile,
json: Some(serde_json::to_string(&chunk).unwrap()),
data: Some(chunks[node.chunk_index as usize].to_vec()),
// data: None,
}).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
}
}
fn get_file_from_data_nodes(
@ -81,35 +86,44 @@ fn get_file_from_data_nodes(
filename: &String,
nodes: &Vec<AvailableNodes>)
{
let chunk = Chunk {
index: nodes[0].chunk_index,
filename: filename.clone(),
};
let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port);
let mut stream = TcpStream::connect(endpoint).unwrap();
let packet = serde_json::to_writer(
&stream,
&Packet {
p_type: PacketType::GetFile,
json: Some(serde_json::to_string(&chunk).unwrap()),
data: None,
}).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
match serde_json::from_reader(stream) {
Ok(Packet { p_type: PacketType::GetFile, json, data }) => {
let data = data.unwrap();
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
// TODO: Here we have to rebuild the chunks
let mut copy = File::create(destination_path).unwrap();
copy.write_all(&data[..]).unwrap();
let mut chunks: Vec<Vec<u8>> = Vec::with_capacity(nodes.len());
let mut successful = 0;
for node in nodes {
let chunk = Chunk {
index: node.chunk_index,
filename: filename.clone(),
};
let endpoint = format!("{}:{}", node.ip, node.port);
let mut stream = TcpStream::connect(endpoint).unwrap();
let packet = serde_json::to_writer(
&stream,
&Packet {
p_type: PacketType::GetFile,
json: Some(serde_json::to_string(&chunk).unwrap()),
data: None,
}).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
match serde_json::from_reader(stream) {
Ok(Packet { p_type: PacketType::GetFile, json, data }) => {
let data = data.unwrap();
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
chunks.insert(chunk.index as usize, data);
successful += 1;
}
Ok(Packet { p_type: PacketType::Error, json, .. }) => {
eprintln!("Data Node Server Error: {}", &json.unwrap());
}
Ok(_) => {}
Err(e) => eprintln!("Error parsing json {}", e.to_string()),
};
}
if successful == nodes.len() {
let mut copy = File::create(destination_path).unwrap();
for chunk in chunks {
copy.write(&chunk[..]).unwrap();
}
Ok(Packet { p_type: PacketType::Error, json, .. }) => {
eprintln!("Data Node Server Error: {}", &json.unwrap());
}
Ok(_) => {}
Err(e) => eprintln!("Error parsing json {}", e.to_string()),
};
}
}
#[derive(Debug)]

View File

@ -25,12 +25,12 @@ fn main() {
let mut stream = stream.unwrap();
match serde_json::from_reader(&mut stream) {
Ok(Packet { p_type: PacketType::GetFile, json, .. }) => {
send_file(&data_path, &mut stream, &json.unwrap());
send_chunk(&data_path, &mut stream, &json.unwrap());
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
}
Ok(Packet { p_type: PacketType::PutFile, json, data, }) =>
receive_file(&data_path, &json.unwrap(), &data.unwrap()),
receive_chunk(&data_path, &json.unwrap(), &data.unwrap()),
Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) =>
shutdown(&mut stream, &metadata_endpoint, &node_endpoint),
Ok(_) => eprintln!("We don't handle this PacketType"),
@ -39,15 +39,14 @@ fn main() {
}
}
fn receive_file(base_path: &String, json: &String, data: &Vec<u8>) {
fn receive_chunk(base_path: &String, json: &String, data: &Vec<u8>) {
let chunk: Chunk = serde_json::from_str(json).unwrap();
let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index);
println!("{}", filepath);
let mut copy = File::create(filepath).unwrap();
copy.write_all(&data[..]).unwrap();
}
fn send_file(base_path: &String, stream: &mut TcpStream, json: &String) {
fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap();
println!("{}", chunk.filename);
match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) {