Changing chunk id to be an integer, life is better that way, adding sending and receiving of files with data_node

This commit is contained in:
Joseph Ferano 2018-12-16 20:30:53 -04:00
parent d3ebfd9638
commit 1dafc23828
5 changed files with 104 additions and 47 deletions

2
.gitignore vendored
View File

@ -8,3 +8,5 @@
*.db
dfs_skel/
venv/
data_node/
copy_dir/

View File

@ -21,7 +21,7 @@ fn main() {
packet_type = PacketType::RequestWrite;
println!("Requesting Write of {}", args.filepath);
json = Some(serde_json::to_string(
&AddFile { name: args.filepath, size: size as u32, }).unwrap())
&AddFile { name: args.filepath.clone(), size: size as u32, }).unwrap())
} else {
packet_type = PacketType::RequestRead;
println!("Requesting Read of {}", args.filepath);
@ -43,11 +43,11 @@ fn main() {
Ok(_) => {},
Err(e) => eprintln!("Error parsing json {}", e.to_string()),
};
let filename = &args.filename;
let filename = &args.filepath;
if args.is_copy_to_dfs {
nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file));
} else {
nodes.map(|ns| get_file_from_data_nodes(&ns));
nodes.map(|ns| get_file_from_data_nodes(&filename, &ns));
}
}
@ -55,7 +55,7 @@ fn send_file_to_data_nodes(filename: &String, nodes: &Vec<AvailableNodes>, file:
let mut stream = TcpStream::connect("localhost:6771").unwrap();
println!("Going to send a file! Bytes {}", file.len());
let chunk = Chunk {
id: nodes[0].chunk_id.clone(),
index: nodes[0].chunk_index,
filename: filename.clone(),
};
let packet = serde_json::to_writer(
@ -69,7 +69,35 @@ fn send_file_to_data_nodes(filename: &String, nodes: &Vec<AvailableNodes>, file:
stream.shutdown(Shutdown::Write).unwrap();
}
fn get_file_from_data_nodes(nodes: &Vec<AvailableNodes>) {
fn get_file_from_data_nodes(filename: &String, nodes: &Vec<AvailableNodes>) {
let chunk = Chunk {
index: nodes[0].chunk_index,
filename: filename.clone(),
};
let mut stream = TcpStream::connect("localhost:6771").unwrap();
let packet = serde_json::to_writer(
&stream,
&Packet {
p_type: PacketType::GetFile,
json: Some(serde_json::to_string(&chunk).unwrap()),
data: None,
}).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
match serde_json::from_reader(stream) {
Ok(Packet { p_type: PacketType::GetFile, json, data }) => {
let data = data.unwrap();
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
// TODO: Here we have to rebuild the chunks
let mut copy = File::create(chunk.filename).unwrap();
copy.write_all(&data[..]).unwrap();
},
Ok(Packet { p_type: PacketType::Error, json, .. }) => {
eprintln!("Data Node Server Error: {}", &json.unwrap());
},
Ok(_) => {},
Err(e) => eprintln!("Error parsing json {}", e.to_string()),
};
}
#[derive(Debug)]

View File

@ -10,6 +10,8 @@ use std::io::{Write, Read};
use std::net::TcpListener;
use serde_json::from_str;
use std::fs::File;
use std::fs;
use std::error::Error;
fn main() {
let endpoint = parse_endpoint_from_cli(0);
@ -19,10 +21,13 @@ fn main() {
for stream in listener.incoming() {
let mut stream = stream.unwrap();
match serde_json::from_reader(&mut stream) {
Ok(Packet { p_type: PacketType::GetFile, json, data, }) =>
get(&mut stream, &json.unwrap(), &data.unwrap()),
Ok(Packet { p_type: PacketType::GetFile, json, data, }) => {
send_file(&mut stream, &json.unwrap(), &data.unwrap());
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
}
Ok(Packet { p_type: PacketType::PutFile, json, data, }) =>
put(&mut stream, &json.unwrap(), &data.unwrap()),
receive_file(&json.unwrap(), &data.unwrap()),
Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) =>
shutdown(&mut stream, &endpoint),
Ok(_) => eprintln!("We don't handle this PacketType"),
@ -31,6 +36,41 @@ fn main() {
}
}
fn receive_file(json: &String, data: &Vec<u8>) {
let chunk: Chunk = serde_json::from_str(json).unwrap();
let mut copy = File::create(format!("{}_{}", chunk.filename, chunk.index)).unwrap();
copy.write_all(&data[..]).unwrap();
}
fn send_file(stream: &mut TcpStream, json: &String, data: &Vec<u8>) {
let chunk: Chunk = serde_json::from_str(json).unwrap();
match fs::read(&chunk.filename) {
Ok(f) => {
let packet = serde_json::to_writer(
stream,
&Packet {
p_type: PacketType::GetFile,
json: Some(json.clone()),
data: Some(Vec::from(f)),
}).unwrap();
}
Err(e) => {
match serde_json::to_writer(
stream,
&Packet {
p_type: PacketType::Error,
json: Some(String::from(e.description())),
data: None,
}) {
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
Err(e) => println!("{}", e),
}
}
};
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
}
fn register_with_meta_server(endpoint: &String) {
let mut stream = TcpStream::connect("localhost:6770").unwrap();
let split: Vec<&str> = endpoint.split(":").collect();
@ -42,7 +82,8 @@ fn register_with_meta_server(endpoint: &String) {
&NodeRegistration {
register: true,
ip: String::from(split[0]),
port: from_str(split[1]).unwrap() })
port: from_str(split[1]).unwrap(),
})
.unwrap()),
data: None,
})
@ -54,18 +95,6 @@ fn register_with_meta_server(endpoint: &String) {
println!("{:?}", result);
}
fn put(stream: &mut TcpStream, json: &String, data: &Vec<u8>) {
let chunk_id: Chunk = serde_json::from_str(json).unwrap();
println!("CId: {:?}", chunk_id);
println!("Data Amount: {:?}", data.len());
let mut copy = File::create(format!("{}_{}", chunk_id.filename, chunk_id.id)).unwrap();
copy.write_all(&data[..]).unwrap();
}
fn get(stream: &mut TcpStream, json: &String, data: &Vec<u8>) {
// let files: String = serde_json::from_str(json).unwrap();
}
fn shutdown(stream: &mut TcpStream, endpoint: &String) {
let mut stream = TcpStream::connect("localhost:6770").unwrap();
let split: Vec<&str> = endpoint.split(":").collect();
@ -77,7 +106,8 @@ fn shutdown(stream: &mut TcpStream, endpoint: &String) {
&NodeRegistration {
register: false,
ip: String::from(split[0]),
port: from_str(split[1]).unwrap() })
port: from_str(split[1]).unwrap(),
})
.unwrap()),
data: None,
})

View File

@ -3,7 +3,6 @@ extern crate rusqlite;
extern crate serde;
extern crate serde_json;
extern crate serde_derive;
extern crate uuid;
use a03::*;
use rusqlite::types::ToSql;
@ -59,7 +58,7 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
nodes.push(AvailableNodes {
ip: b.data_node.ip,
port: b.data_node.port,
chunk_id: b.chunk_id,
chunk_index: b.chunk_index,
});
}
match serde_json::to_writer(
@ -76,8 +75,7 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
let file: AddFile = serde_json::from_str(message).unwrap();
// let file_already_exists = add_file(&conn, &file.name, file.size as i32);
let file_already_exists = false;
let file_already_exists = add_file(&conn, &file.name, file.size as i32);
if file_already_exists {
match serde_json::to_writer(
stream,
@ -91,17 +89,16 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
};
return;
}
// let file_info = get_file_info(&conn, &file.name).unwrap();
let file_info = INode { id: 1, name: file.name, size: file.size };
println!("{:?}", file_info);
let file_info = get_file_info(&conn, &file.name).unwrap();
// let file_info = INode { id: 1, name: file.name, size: file.size };
// println!("{:?}", file_info);
let mut blocks: Vec<Block> = Vec::new();
let mut nodes: Vec<AvailableNodes> = Vec::new();
let dnodes = get_data_nodes(&conn);
for i in 0..dnodes.len() {
let dn = &dnodes[i];
let uuid = uuid::Uuid::new_v4().to_string();
blocks.push(Block {
chunk_id: uuid.clone(),
chunk_index: i as u32,
node_id: dn.id,
file_id: file_info.id as u32,
id: 0,
@ -109,10 +106,10 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
nodes.push(AvailableNodes {
ip: dn.ip.clone(),
port: dn.port,
chunk_id: uuid.clone(),
chunk_index: i as u32,
});
}
// add_blocks_to_inode(&conn, file_info.id, &blocks);
add_blocks_to_inode(&conn, file_info.id, &blocks);
match serde_json::to_writer(
stream,
&Packet {
@ -260,7 +257,7 @@ fn add_blocks_to_inode(conn: &Connection, fid: u32, blocks: &Vec<Block>) {
for block in blocks {
match conn.execute(
"INSERT INTO block (nid, fid, cid) VALUES (?1, ?2, ?3)",
&[&block.node_id as &ToSql, &fid, &block.chunk_id]) {
&[&block.node_id as &ToSql, &fid, &block.chunk_index]) {
Ok(n) => println!("Updated {}", n),
Err(e) => println!("Error: {}", e),
}
@ -275,7 +272,7 @@ fn get_file_inode(conn: &Connection, fid: u32) -> Vec<BlockQuery> {
&[&fid],
|row| BlockQuery {
data_node: DataNode { id: row.get(0), ip: row.get(1), port: row.get(2) },
chunk_id: row.get(3),
chunk_index: row.get(3),
}).unwrap();
let mut blocks: Vec<BlockQuery> = Vec::new();
for b in iter {
@ -311,7 +308,7 @@ port INTEGER NOT NULL DEFAULT \"0\")",
bid INTEGER PRIMARY KEY ASC AUTOINCREMENT,
fid INTEGER NOT NULL DEFAULT \"0\",
nid INTEGER NOT NULL DEFAULT \"0\",
cid TEXT NOT NULL DEFAULT \"0\")",
cid INTEGER NOT NULL DEFAULT \"0\")",
NO_PARAMS,
).unwrap();
conn
@ -463,19 +460,19 @@ cid TEXT NOT NULL DEFAULT \"0\")",
file_id: inode.id,
id: 0,
node_id: 1,
chunk_id: String::from("c1"),
chunk_index: 0,
},
Block {
file_id: inode.id,
id: 0,
node_id: 2,
chunk_id: String::from("c2"),
chunk_index: 1,
},
Block {
file_id: inode.id,
id: 0,
node_id: 3,
chunk_id: String::from("c3"),
chunk_index: 2,
},
);
add_blocks_to_inode(&conn, inode.id, &blocks);
@ -483,15 +480,15 @@ cid TEXT NOT NULL DEFAULT \"0\")",
assert_eq!(inode.name, "main_file");
assert_eq!(inode.size, 128);
assert_eq!(blocks.len(), 3);
assert_eq!(blocks[0].chunk_id, "c1");
assert_eq!(blocks[0].chunk_index, 0);
assert_eq!(blocks[0].data_node.id, 1);
assert_eq!(blocks[0].data_node.ip, "127.0.0.1");
assert_eq!(blocks[0].data_node.port, 1337);
assert_eq!(blocks[1].chunk_id, "c2");
assert_eq!(blocks[1].chunk_index, 1);
assert_eq!(blocks[1].data_node.id, 2);
assert_eq!(blocks[1].data_node.ip, "127.0.0.2");
assert_eq!(blocks[1].data_node.port, 1338);
assert_eq!(blocks[2].chunk_id, "c3");
assert_eq!(blocks[2].chunk_index, 2);
assert_eq!(blocks[2].data_node.id, 3);
assert_eq!(blocks[2].data_node.ip, "127.0.0.2");
assert_eq!(blocks[2].data_node.port, 1339);

View File

@ -53,12 +53,12 @@ pub struct AddFile {
pub struct AvailableNodes {
pub ip: String,
pub port: u32,
pub chunk_id: String
pub chunk_index: u32
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Chunk {
pub id: String,
pub index: u32,
pub filename: String,
}
@ -81,13 +81,13 @@ pub struct Block {
pub id: u32,
pub file_id: u32,
pub node_id: u32,
pub chunk_id: String,
pub chunk_index: u32,
}
#[derive(Debug)]
pub struct BlockQuery {
pub data_node: DataNode,
pub chunk_id: String
pub chunk_index: u32
}
pub fn parse_endpoint_from_cli(arg_index : usize) -> String {