Now able to send large data over TCP. Implemented a lot of the remaining SQL related calls

This commit is contained in:
Joseph Ferano 2018-12-09 15:33:50 -04:00
parent 84bd0e5985
commit 1f7d26f9bd
4 changed files with 277 additions and 91 deletions

View File

@ -13,7 +13,9 @@ use std::fs::File;
use std::fs; use std::fs;
fn main() { fn main() {
let mut file = fs::read("dfs.db").unwrap(); let mut file = fs::read("/home/joe/Downloads/ideaIU-2018.3.tar.gz").unwrap();
let mut copy = File::create("copy").unwrap(); let mut stream = TcpStream::connect("localhost:6771").unwrap();
copy.write_all(&file[..]).unwrap(); stream.write(&file).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
} }

View File

@ -10,20 +10,33 @@ use std::thread;
use std::io::Read; use std::io::Read;
use std::io::Write; use std::io::Write;
use std::net::TcpListener; use std::net::TcpListener;
use std::fs::File;
fn main() { fn main() {
register_with_meta_server(); // register_with_meta_server();
let listener = TcpListener::bind("localhost:6771").unwrap(); let listener = TcpListener::bind("localhost:6771").unwrap();
for stream in listener.incoming() { for stream in listener.incoming() {
let mut stream = stream.unwrap(); let mut stream = stream.unwrap();
match serde_json::from_reader(&mut stream) { let mut buf = Vec::new();
Ok(packet @ Packet { .. }) => match packet.p_type { match stream.read_to_end(&mut buf) {
PacketType::ShutdownDataNode => shutdown(&mut stream), Ok(size) => {
_ => (), println!("Total bytes: {}", size);
let mut copy = File::create("new_version").unwrap();
copy.write_all(&buf[..]).unwrap();
}, },
Err(e) => println!("Error parsing json {}", e.to_string()), Err(e) => println!("{}", e),
}; }
// match serde_json::from_reader(&mut stream) {
// Ok(packet @ Packet { .. }) => match packet.p_type {
// PacketType::GetFiles => shutdown(&mut stream),
// PacketType::PutFiles => put(&mut stream, &packet.json.unwrap(), &mut Vec::new()),
// PacketType::ShutdownDataNode => shutdown(&mut stream),
// _ => (),
// },
// Err(e) => println!("Error parsing json {}", e.to_string()),
// };
} }
} }
@ -44,6 +57,10 @@ fn register_with_meta_server() {
println!("{:?}", result); println!("{:?}", result);
} }
fn put(stream: &mut TcpStream, json: &String, files: &mut Vec<String>) {
let files: PutFiles = serde_json::from_str(json).unwrap();
}
fn shutdown(stream: &mut TcpStream) { fn shutdown(stream: &mut TcpStream) {
let mut stream = TcpStream::connect("localhost:6770").unwrap(); let mut stream = TcpStream::connect("localhost:6770").unwrap();
serde_json::to_writer( serde_json::to_writer(

View File

@ -12,22 +12,20 @@ use std::io::Write;
use std::net::{TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
fn main() { fn main() {
let mut data_nodes: Vec<DataNode> = Vec::new(); // TODO: We need to check if the DB exists, if not, ask the user to run the python script
// Maybe we can even run it for them
let conn = Connection::open("dfs.db").unwrap();
let mut file_list: Vec<String> = Vec::new(); let mut file_list: Vec<String> = Vec::new();
file_list.push(String::from("/"));
file_list.push(String::from("/home"));
file_list.push(String::from("/home/joe"));
file_list.push(String::from("/bin"));
let listener = TcpListener::bind("localhost:6770").unwrap(); let listener = TcpListener::bind("localhost:6770").unwrap();
for stream in listener.incoming() { for stream in listener.incoming() {
let mut stream = stream.unwrap(); let mut stream = stream.unwrap();
match serde_json::from_reader(&mut stream) { match serde_json::from_reader(&mut stream) {
Ok(packet @ Packet { .. }) => match packet.p_type { Ok(packet @ Packet { .. }) => match packet.p_type {
PacketType::ListFiles => list(&mut stream, &file_list[..]), PacketType::ListFiles => list(&mut stream, &conn),
PacketType::PutFiles => put(&mut stream, &packet.json.unwrap(), &mut file_list),
PacketType::NodeRegistration => PacketType::NodeRegistration =>
node_registration(&mut stream, &packet.json.unwrap(), &mut data_nodes), node_registration(&mut stream, &packet.json.unwrap(), &conn),
// PacketType::AddFile =>
_ => (), _ => (),
}, },
Err(e) => println!("Error parsing json {}", e.to_string()), Err(e) => println!("Error parsing json {}", e.to_string()),
@ -36,11 +34,11 @@ fn main() {
} }
} }
fn list(stream: &mut TcpStream, files: &[String]) { fn list(stream: &mut TcpStream, conn: &Connection) {
match serde_json::to_writer( match serde_json::to_writer(
stream, stream,
&FilePaths { &FilePaths {
paths: Cow::from(files), paths: Cow::from(get_files(&conn)),
}, },
) { ) {
Ok(_) => println!("{}", "Sent file paths"), Ok(_) => println!("{}", "Sent file paths"),
@ -48,33 +46,18 @@ fn list(stream: &mut TcpStream, files: &[String]) {
}; };
} }
fn put(stream: &mut TcpStream, json: &String, files: &mut Vec<String>) { fn node_registration(stream: &mut TcpStream, json: &String, conn: &Connection) {
let files: PutFiles = serde_json::from_str(json).unwrap();
report_success(stream, "Successfully Put Files");
}
fn node_registration(stream: &mut TcpStream, json: &String, data_nodes: &mut Vec<DataNode>) {
let endpoint : NodeRegistration = serde_json::from_str(json).unwrap(); let endpoint : NodeRegistration = serde_json::from_str(json).unwrap();
let message = if endpoint.register { let message = if endpoint.register {
data_nodes.push(DataNode { ip: endpoint.ip, port: endpoint.port, id: 1 }); // TODO: We probably should check if the endpoint already exists!
"You were successfully registering" add_data_node(&conn, &endpoint.ip, endpoint.port as i32);
"You were successfully registered"
} }
else { else {
match data_nodes.iter() // TODO: We should check if the endpoint exists!
.position(|dn| dn.ip == endpoint.ip && dn.port == endpoint.port) { remove_data_node(&conn, &endpoint.ip, endpoint.port as i32);
Some(index) => { "You were successfully unregistered"
data_nodes.remove(index);
"You were successfully unregistering"
},
None => {
println!("Data Node at {}:{} does not exit", endpoint.ip, endpoint.port);
"You weren't found"
}
}
}; };
for dn in data_nodes {
println!("{:?}", dn);
}
report_success(stream, message); report_success(stream, message);
} }
@ -91,37 +74,6 @@ fn report_success(stream: &mut TcpStream, message: &str) {
}; };
} }
fn create_tables(conn: &Connection) {
conn.execute(
"CREATE TABLE inode (
fid INTEGER PRIMARY KEY ASC AUTOINCREMENT,
fname TEXT UNIQUE NOT NULL DEFAULT \" \",
fsize INTEGER NOT NULL default \"0\")",
NO_PARAMS,
).unwrap();
conn.execute(
"CREATE TABLE dnode (
nid INTEGER PRIMARY KEY ASC AUTOINCREMENT,
address TEXT NOT NULL DEFAULT \" \",
port INTEGER NOT NULL DEFAULT \"0\")",
NO_PARAMS,
).unwrap();
conn.execute(
"CREATE TABLE block (
bid INTEGER PRIMARY KEY ASC AUTOINCREMENT,
fid INTEGER NOT NULL DEFAULT \"0\",
nid INTEGER NOT NULL DEFAULT \"0\",
cid TEXT NOT NULL DEFAULT \"0\")",
NO_PARAMS,
).unwrap();
// Create UNIQUE tuple for block
conn.execute("CREATE UNIQUE INDEX blocknc ON block(nid, cid)", NO_PARAMS)
.unwrap();
}
fn add_data_node(conn: &Connection, address: &str, port: i32) { fn add_data_node(conn: &Connection, address: &str, port: i32) {
match conn.execute( match conn.execute(
"INSERT INTO dnode (address, port) VALUES (?1, ?2)", "INSERT INTO dnode (address, port) VALUES (?1, ?2)",
@ -132,7 +84,17 @@ fn add_data_node(conn: &Connection, address: &str, port: i32) {
}; };
} }
fn check_node(conn: &Connection, address: &str, port: i32) -> DataNode { fn remove_data_node(conn: &Connection, address: &str, port: i32) {
match conn.execute(
"DELETE FROM dnode WHERE address=?1 AND port=?2)",
&[&address as &ToSql, &port],
) {
Ok(n) => println!("{} rows updated", n),
Err(e) => println!("DELETE error: {}", e),
};
}
fn get_data_node(conn: &Connection, address: &str, port: i32) -> DataNode {
let mut stmt = conn let mut stmt = conn
.prepare("SELECT nid, address, port FROM dnode WHERE address=?1 AND port=?2") .prepare("SELECT nid, address, port FROM dnode WHERE address=?1 AND port=?2")
.unwrap(); .unwrap();
@ -143,26 +105,127 @@ fn check_node(conn: &Connection, address: &str, port: i32) -> DataNode {
}).unwrap() }).unwrap()
} }
fn get_data_nodes(conn: &Connection) { fn get_data_nodes(conn: &Connection) -> Vec<DataNode> {
let mut stmt = conn.prepare("SELECT address, port FROM dnode WHERE 1"); let mut stmt = conn.prepare("SELECT nid, address, port FROM dnode").unwrap();
let iter = stmt.query_map(NO_PARAMS, |row| DataNode {
id: row.get(0),
ip: row.get(1),
port: row.get(2),
}).unwrap();
let mut nodes = Vec::new();
for n in iter {
let n = n.unwrap();
nodes.push(n);
}
nodes
} }
fn insert_file(conn: &Connection, fname: String, fsize: usize) { fn add_file(conn: &Connection, fname: &String, fsize: i32) {
let mut stmt = conn.prepare("INSERT INTO inode (fname, fsize) VALUES (\"?1\", ?2)"); conn.execute(
"INSERT INTO inode (fname, fsize) VALUES (?1, ?2)",
&[&fname as &ToSql, &fsize])
.unwrap();
} }
// fn get_file_info(conn: &Connection, fname: String) {} fn remove_file(conn: &Connection, fname: String) {
conn.execute(
"DELETE FROM inode WHERE fname=?1",
&[&fname as &ToSql])
.unwrap();
}
// fn get_files(conn: &Connection) {} fn get_file_info(conn: &Connection, fname: &String) -> INode {
conn.query_row(
"SELECT fid, fsize FROM inode where fname=?1",
&[&fname as &ToSql],
|row| INode { id: row.get(0), size: row.get(1), name: fname.clone() }
).unwrap()
}
// fn add_block_to_inode(conn: &Connection, fname: String, blocks: usize) {} fn get_files(conn: &Connection) -> Vec<String> {
let mut stmt = conn
.prepare("SELECT fname, fsize FROM inode")
.unwrap();
let iter = stmt.query_map(NO_PARAMS, |row| INode {
id: 0,
name: row.get(0),
size: row.get(1),
}).unwrap();
let mut files = Vec::new();
for f in iter {
let f = f.unwrap();
files.push(format!("{} {} bytes", f.name, f.size));
}
files
}
// fn get_file_inode(conn: &Connection, fname: String) {} fn add_blocks_to_inode(conn: &Connection, fname: &String, blocks: &Vec<Block>) {
let fid : u32 = get_file_info(&conn, fname).id;
for block in blocks {
conn.execute(
"INSERT INTO block (nid, fid, cid) VALUES (?1, ?2, ?3)",
&[&block.node_id as &ToSql, &fid, &block.chunk_id]).unwrap();
}
}
fn get_file_inode(conn: &Connection, fname: &String) -> (INode, Vec<BlockQuery>) {
let file = get_file_info(&conn, &fname);
let mut stmt = conn.prepare(
"SELECT address, port, cid FROM dnode, block WHERE dnode.nid = block.nid AND block.fid = ?1")
.unwrap();
let iter = stmt.query_map(
&[&file.id],
|row| BlockQuery {
data_node: DataNode { id: 0, ip: row.get(0), port: row.get(1) },
chunk_id: row.get(2),
id: 0,
}).unwrap();
let mut blocks: Vec<BlockQuery> = Vec::new();
for b in iter {
blocks.push(b.unwrap());
}
(file, blocks)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
fn create_tables(conn: &Connection) {
conn.execute(
"CREATE TABLE inode (
fid INTEGER PRIMARY KEY ASC AUTOINCREMENT,
fname TEXT UNIQUE NOT NULL DEFAULT \" \",
fsize INTEGER NOT NULL default \"0\")",
NO_PARAMS
).unwrap();
conn.execute(
"CREATE TABLE dnode (
nid INTEGER PRIMARY KEY ASC AUTOINCREMENT,
address TEXT NOT NULL DEFAULT \" \",
port INTEGER NOT NULL DEFAULT \"0\")",
NO_PARAMS,
).unwrap();
conn.execute(
"CREATE TABLE block (
bid INTEGER PRIMARY KEY ASC AUTOINCREMENT,
fid INTEGER NOT NULL DEFAULT \"0\",
nid INTEGER NOT NULL DEFAULT \"0\",
cid TEXT NOT NULL DEFAULT \"0\")",
NO_PARAMS
).unwrap();
// Create UNIQUE tuple for block
conn.execute("CREATE UNIQUE INDEX dnodeA ON dnode(address, port)", NO_PARAMS)
.unwrap();
// Create UNIQUE tuple for block
conn.execute("CREATE UNIQUE INDEX blocknc ON block(nid, cid)", NO_PARAMS)
.unwrap();
}
fn get_test_db() -> Connection { fn get_test_db() -> Connection {
let conn = Connection::open_in_memory().unwrap(); let conn = Connection::open_in_memory().unwrap();
create_tables(&conn); create_tables(&conn);
@ -170,13 +233,112 @@ mod tests {
} }
#[test] #[test]
fn inserts_dnode_with_correct_ip_and_port() { fn add_dnode_with_correct_ip_and_port() {
let conn = get_test_db(); let conn = get_test_db();
let ip = "127.0.0.1"; let ip = "127.0.0.1";
add_data_node(&conn, &ip, 65533); let port = 65533;
let dnode = check_node(&conn, &ip, 65533); add_data_node(&conn, &ip, port as i32);
let dnode = get_data_node(&conn, &ip, port as i32);
assert_eq!(dnode.id, 1); assert_eq!(dnode.id, 1);
assert_eq!(dnode.address, ip); assert_eq!(dnode.ip, ip);
assert_eq!(dnode.port, 65533); assert_eq!(dnode.port, port);
} }
#[test]
fn removes_dnode_with_correct_ip_and_port() {
// TODO: I don't know how to test a delete
let conn = get_test_db();
let ip = "127.0.0.1";
let port = 65533;
add_data_node(&conn, &ip, port as i32);
let dnode = get_data_node(&conn, &ip, port as i32);
assert_eq!(dnode.id, 1);
assert_eq!(dnode.ip, ip);
assert_eq!(dnode.port, port);
}
#[test]
fn gets_all_data_nodes() {
let conn = get_test_db();
let ip1 = "127.0.0.1";
let port1 = 65533;
let ip2 = "127.0.0.2";
let port2 = port1 + 1;
let ip3 = "127.0.0.3";
let port3 = port2 + 1;
add_data_node(&conn, &ip1, port1 as i32);
add_data_node(&conn, &ip2, port2 as i32);
add_data_node(&conn, &ip3, port3 as i32);
let ds = get_data_nodes(&conn);
for i in 0..ds.len() {
let d = &ds[i];
assert_eq!(d.ip, format!("127.0.0.{}", i + 1));
assert_eq!(d.port, 65533 + i as u32);
}
}
#[test]
fn adds_file() {
let conn = get_test_db();
add_file(&conn, &String::from("my_1337_virus"), 32);
let files = get_files(&conn);
assert_eq!(files.len(), 1);
assert_eq!(files[0], "my_1337_virus 32 bytes");
}
#[test]
fn deletes_file() {
let conn = get_test_db();
add_file(&conn, &String::from("my_1337_virus"), 32);
let files = get_files(&conn);
assert_eq!(files.len(), 1);
assert_eq!(files[0], "my_1337_virus 32 bytes");
}
#[test]
fn gets_all_file() {
let conn = get_test_db();
add_file(&conn, &String::from("file1"), 32);
add_file(&conn, &String::from("file2"), 64);
add_file(&conn, &String::from("file3"), 128);
let files = get_files(&conn);
assert_eq!(files.len(), 3);
assert_eq!(files[0], "file1 32 bytes");
assert_eq!(files[1], "file2 64 bytes");
assert_eq!(files[2], "file3 128 bytes");
}
#[test]
fn adds_blocks_to_inode() {
let conn = get_test_db();
let filename = String::from("main_file");
add_file(&conn, &filename, 128);
add_data_node(&conn, "127.0.0.1", 1337);
add_data_node(&conn, "127.0.0.2", 1338);
let inode = get_file_info(&conn, &filename);
let blocks = vec!(
Block {
file_id: inode.id,
id: 0,
node_id: 1,
chunk_id: String::from("c1"),
},
Block {
file_id: inode.id,
id: 0,
node_id: 2,
chunk_id: String::from("c2"),
},
);
add_blocks_to_inode(&conn, &filename, &blocks);
let (inode, blocks) = get_file_inode(&conn, &filename);
assert_eq!(inode.name, "main_file");
assert_eq!(inode.size, 128);
assert_eq!(blocks.len(), 2);
let dn1 = get_data_node(&conn, "127.0.0.1", 1337);
let dn2 = get_data_node(&conn, "127.0.0.2", 1338);
assert_eq!(dn1.id, 1);
assert_eq!(dn2.id, 2);
}
} }

View File

@ -10,10 +10,10 @@ use std::net::Ipv4Addr;
pub enum PacketType { pub enum PacketType {
NodeRegistration, NodeRegistration,
ListFiles, ListFiles,
AddFile,
PutFiles, PutFiles,
GetFiles, GetFiles,
AddDataBlocks, AddDataBlocks,
ShutdownDataNode,
Success, Success,
} }
@ -30,8 +30,8 @@ pub struct FilePaths<'a> {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct NodeRegistration { pub struct NodeRegistration {
pub ip: String,
pub register: bool, pub register: bool,
pub ip: String,
pub port: u32, pub port: u32,
} }
@ -58,7 +58,6 @@ pub struct INode {
pub id: u32, pub id: u32,
pub name: String, pub name: String,
pub size: u32, pub size: u32,
pub blocks: Vec<Block>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -66,6 +65,12 @@ pub struct Block {
pub id: u32, pub id: u32,
pub file_id: u32, pub file_id: u32,
pub node_id: u32, pub node_id: u32,
pub c_id: String, pub chunk_id: String,
} }
#[derive(Debug)]
pub struct BlockQuery {
pub id: u32,
pub data_node: DataNode,
pub chunk_id: String
}