diff --git a/Cargo.lock b/Cargo.lock index 77d2b1f..667cab3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,7 +20,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "libc" -version = "0.2.43" +version = "0.2.44" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -127,7 +127,7 @@ name = "time" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)", "redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -164,7 +164,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] "checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12" "checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b" -"checksum libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)" = "76e3a3ef172f1a0b9a9ff0dd1491ae5e6c948b94479a3021819ba7d860c8645d" +"checksum libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)" = "10923947f84a519a45c8fefb7dd1b3e8c08747993381adee176d7a82b4195311" "checksum libsqlite3-sys 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "742b695cbfb89e549dca6960a55e6802f67d352e33e97859ee46dee835211b0f" "checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939" "checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21" diff --git a/shutdown_node b/shutdown_node new file mode 100755 index 0000000..09a7d89 --- /dev/null +++ b/shutdown_node @@ -0,0 +1,2 @@ +if [ -z "$1" ]; then echo Requires port argument. Exiting; exit 1; fi +echo '{"p_type":"ShutdownDataNode","json":null}' | nc -v -N localhost $1 diff --git a/src/bin/copy.rs b/src/bin/copy.rs index e200b9f..a508afa 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -9,5 +9,11 @@ use std::net::{TcpListener, TcpStream, Shutdown}; use std::thread; use std::io::Read; use std::io::Write; +use std::fs::File; +use std::fs; -fn main() {} +fn main() { + let mut file = fs::read("dfs.db").unwrap(); + let mut copy = File::create("copy").unwrap(); + copy.write_all(&file[..]).unwrap(); +} diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index be2c110..21f1591 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -9,15 +9,32 @@ use std::net::{TcpStream, Shutdown}; use std::thread; use std::io::Read; use std::io::Write; +use std::net::TcpListener; + fn main() { + register_with_meta_server(); + let listener = TcpListener::bind("localhost:6771").unwrap(); + for stream in listener.incoming() { + let mut stream = stream.unwrap(); + match serde_json::from_reader(&mut stream) { + Ok(packet @ Packet { .. }) => match packet.p_type { + PacketType::ShutdownDataNode => shutdown(&mut stream), + _ => (), + }, + Err(e) => println!("Error parsing json {}", e.to_string()), + }; + } +} + +fn register_with_meta_server() { let mut stream = TcpStream::connect("localhost:6770").unwrap(); serde_json::to_writer( &mut stream, &Packet { - p_type: PacketType::RegisterNode, + p_type: PacketType::NodeRegistration, json: Some(serde_json::to_string( - &RegisterNode { ip: String::from("localhost"), port: 6770 }).unwrap()), + &NodeRegistration { register: true, ip: String::from("localhost"), port: 6770 }).unwrap()), }) .unwrap(); println!("Registered myself"); @@ -26,3 +43,21 @@ fn main() { let result: Packet = serde_json::from_reader(&mut stream).unwrap(); println!("{:?}", result); } + +fn shutdown(stream: &mut TcpStream) { + let mut stream = TcpStream::connect("localhost:6770").unwrap(); + serde_json::to_writer( + &mut stream, + &Packet { + p_type: PacketType::NodeRegistration, + json: Some(serde_json::to_string( + &NodeRegistration { register: false, ip: String::from("localhost"), port: 6770 }).unwrap()), + }) + .unwrap(); + println!("Unregistered myself"); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); + let result: Packet = serde_json::from_reader(&mut stream).unwrap(); + println!("{:?}", result); + std::process::exit(0); +} diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index 263ab2c..a692d77 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -2,17 +2,14 @@ extern crate a03; extern crate rusqlite; extern crate serde; extern crate serde_json; -#[macro_use] extern crate serde_derive; use a03::*; use rusqlite::types::ToSql; use rusqlite::{Connection, NO_PARAMS}; use std::borrow::Cow; -use std::io::Read; use std::io::Write; -use std::net::{Shutdown, TcpListener, TcpStream}; -use std::thread; +use std::net::{TcpListener, TcpStream}; fn main() { let mut data_nodes: Vec = Vec::new(); @@ -29,8 +26,8 @@ fn main() { Ok(packet @ Packet { .. }) => match packet.p_type { PacketType::ListFiles => list(&mut stream, &file_list[..]), PacketType::PutFiles => put(&mut stream, &packet.json.unwrap(), &mut file_list), - PacketType::RegisterNode => - register_node(&mut stream, &packet.json.unwrap(), &mut data_nodes), + PacketType::NodeRegistration => + node_registration(&mut stream, &packet.json.unwrap(), &mut data_nodes), _ => (), }, Err(e) => println!("Error parsing json {}", e.to_string()), @@ -53,19 +50,35 @@ fn list(stream: &mut TcpStream, files: &[String]) { fn put(stream: &mut TcpStream, json: &String, files: &mut Vec) { let files: PutFiles = serde_json::from_str(json).unwrap(); - report_success(stream); + report_success(stream, "Successfully Put Files"); } -fn register_node(stream: &mut TcpStream, json: &String, data_nodes: &mut Vec) { - let endpoint : RegisterNode = serde_json::from_str(json).unwrap(); - data_nodes.push(DataNode { address: endpoint.ip, port: endpoint.port, id: 1 }); +fn node_registration(stream: &mut TcpStream, json: &String, data_nodes: &mut Vec) { + let endpoint : NodeRegistration = serde_json::from_str(json).unwrap(); + let message = if endpoint.register { + data_nodes.push(DataNode { ip: endpoint.ip, port: endpoint.port, id: 1 }); + "You were successfully registering" + } + else { + match data_nodes.iter() + .position(|dn| dn.ip == endpoint.ip && dn.port == endpoint.port) { + Some(index) => { + data_nodes.remove(index); + "You were successfully unregistering" + }, + None => { + println!("Data Node at {}:{} does not exit", endpoint.ip, endpoint.port); + "You weren't found" + } + } + }; for dn in data_nodes { println!("{:?}", dn); } - report_success(stream); + report_success(stream, message); } -fn report_success(stream: &mut TcpStream) { +fn report_success(stream: &mut TcpStream, message: &str) { match serde_json::to_writer( stream, &Packet { @@ -73,7 +86,7 @@ fn report_success(stream: &mut TcpStream) { json: None, }, ) { - Ok(_) => println!("{}", "Success Registering Data Node"), + Ok(_) => println!("{}", message), Err(e) => println!("{}", e), }; } @@ -125,7 +138,7 @@ fn check_node(conn: &Connection, address: &str, port: i32) -> DataNode { .unwrap(); stmt.query_row(&[&address as &ToSql, &port], |row| DataNode { id: row.get(0), - address: row.get(1), + ip: row.get(1), port: row.get(2), }).unwrap() } diff --git a/src/lib.rs b/src/lib.rs index e7dfc94..073e3c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,11 +8,12 @@ use std::net::Ipv4Addr; #[derive(Serialize, Deserialize, Debug)] pub enum PacketType { - RegisterNode, + NodeRegistration, ListFiles, PutFiles, GetFiles, AddDataBlocks, + ShutdownDataNode, Success, } @@ -28,8 +29,9 @@ pub struct FilePaths<'a> { } #[derive(Serialize, Deserialize, Debug)] -pub struct RegisterNode { +pub struct NodeRegistration { pub ip: String, + pub register: bool, pub port: u32, } @@ -47,7 +49,7 @@ pub struct AddDataBlocks {} #[derive(Debug)] pub struct DataNode { pub id: u32, - pub address: String, + pub ip: String, pub port: u32, } @@ -56,6 +58,7 @@ pub struct INode { pub id: u32, pub name: String, pub size: u32, + pub blocks: Vec, } #[derive(Debug)]