From 0e10f88c099c17fe5b880daf722d5d373174dbb1 Mon Sep 17 00:00:00 2001 From: Joseph Ferano Date: Wed, 28 Nov 2018 21:14:14 -0400 Subject: [PATCH] meta_data now handles ls request and simple data node registration --- .gitignore | 1 + sm | 1 + src/bin/copy.rs | 12 +++-- src/bin/data_node.rs | 21 +++++---- src/bin/ls.rs | 30 +++++++++++- src/bin/meta_data.rs | 109 ++++++++++++++++++++++++++----------------- src/lib.rs | 62 +++++++++++++++++++++++- 7 files changed, 178 insertions(+), 58 deletions(-) create mode 100755 sm diff --git a/.gitignore b/.gitignore index 6797165..91ca2aa 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ *.tar.gz *.db dfs_skel/ +venv/ diff --git a/sm b/sm new file mode 100755 index 0000000..ed7592f --- /dev/null +++ b/sm @@ -0,0 +1 @@ +echo $1 | nc -v -N localhost 6770 diff --git a/src/bin/copy.rs b/src/bin/copy.rs index 66f5387..e200b9f 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -1,7 +1,13 @@ extern crate a03; +extern crate serde; +extern crate serde_json; +#[macro_use] +extern crate serde_derive; use a03::*; +use std::net::{TcpListener, TcpStream, Shutdown}; +use std::thread; +use std::io::Read; +use std::io::Write; -fn main() { - println!("Hello, world!"); -} +fn main() {} diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index f198f0d..be2c110 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -1,25 +1,28 @@ extern crate a03; - -use a03::*; extern crate serde; extern crate serde_json; #[macro_use] extern crate serde_derive; -use std::net::{TcpListener,TcpStream,Shutdown}; +use a03::*; +use std::net::{TcpStream, Shutdown}; use std::thread; use std::io::Read; use std::io::Write; fn main() { let mut stream = TcpStream::connect("localhost:6770").unwrap(); - let writer = serde_json::to_writer( + serde_json::to_writer( &mut stream, - &TestObj { message : String::from("I can't think of something clever")}) + &Packet { + p_type: PacketType::RegisterNode, + json: Some(serde_json::to_string( + &RegisterNode { ip: String::from("localhost"), port: 6770 }).unwrap()), + }) .unwrap(); - println!("Message sent!"); + println!("Registered myself"); stream.flush().unwrap(); -// stream.shutdown(Shutdown::Write); - let test_obj : TestObj = serde_json::from_reader(&mut stream).unwrap(); - println!("Received message back! {}", test_obj.message); + stream.shutdown(Shutdown::Write).unwrap(); + let result: Packet = serde_json::from_reader(&mut stream).unwrap(); + println!("{:?}", result); } diff --git a/src/bin/ls.rs b/src/bin/ls.rs index e7a11a9..2bde7f3 100644 --- a/src/bin/ls.rs +++ b/src/bin/ls.rs @@ -1,3 +1,31 @@ +extern crate a03; +extern crate serde; +extern crate serde_json; +#[macro_use] +extern crate serde_derive; + +use a03::*; +use std::net::{TcpListener, TcpStream, Shutdown}; +use std::borrow::Cow; +use std::thread; +use std::io::Read; +use std::io::Write; + fn main() { - println!("Hello, world!"); + let mut stream = TcpStream::connect("localhost:6770").unwrap(); + serde_json::to_writer( + &mut stream, + &Packet { + p_type: PacketType::ListFiles, + json: None, + }) + .unwrap(); + println!("Message sent!"); + stream.flush().unwrap(); + stream.shutdown(Shutdown::Write).unwrap(); + let files: FilePaths = serde_json::from_reader(&mut stream).unwrap(); + for path in files.paths.iter() { + println!("Path: {}", path); + } } + diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index 8f3ed1b..263ab2c 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -5,50 +5,77 @@ extern crate serde_json; #[macro_use] extern crate serde_derive; - use a03::*; +use a03::*; use rusqlite::types::ToSql; use rusqlite::{Connection, NO_PARAMS}; -use std::net::{TcpListener,TcpStream}; -use std::thread; +use std::borrow::Cow; use std::io::Read; +use std::io::Write; +use std::net::{Shutdown, TcpListener, TcpStream}; +use std::thread; fn main() { + let mut data_nodes: Vec = Vec::new(); + let mut file_list: Vec = Vec::new(); + + file_list.push(String::from("/")); + file_list.push(String::from("/home")); + file_list.push(String::from("/home/joe")); + file_list.push(String::from("/bin")); let listener = TcpListener::bind("localhost:6770").unwrap(); - println!("Binding!"); for stream in listener.incoming() { let mut stream = stream.unwrap(); - println!("Got here!"); - let test_obj : TestObj = serde_json::from_reader(&mut stream).unwrap(); - match serde_json::to_writer( - &mut stream, - &TestObj { message : String::from("I got it dude") }) - { - Ok(_) => println!("{}", test_obj.message), - Err(e) => println!("{}", e), + match serde_json::from_reader(&mut stream) { + Ok(packet @ Packet { .. }) => match packet.p_type { + PacketType::ListFiles => list(&mut stream, &file_list[..]), + PacketType::PutFiles => put(&mut stream, &packet.json.unwrap(), &mut file_list), + PacketType::RegisterNode => + register_node(&mut stream, &packet.json.unwrap(), &mut data_nodes), + _ => (), + }, + Err(e) => println!("Error parsing json {}", e.to_string()), }; + stream.flush().unwrap(); } } -#[derive(Debug)] -struct DataNode { - id: i32, - address: String, - port: i32, +fn list(stream: &mut TcpStream, files: &[String]) { + match serde_json::to_writer( + stream, + &FilePaths { + paths: Cow::from(files), + }, + ) { + Ok(_) => println!("{}", "Sent file paths"), + Err(e) => println!("{}", e), + }; } -#[derive(Debug)] -struct INode { - id: i32, - name: String, - size: i32, +fn put(stream: &mut TcpStream, json: &String, files: &mut Vec) { + let files: PutFiles = serde_json::from_str(json).unwrap(); + report_success(stream); } -#[derive(Debug)] -struct Block { - id : i32, - file_id : i32, - node_id : i32, - c_id : String, +fn register_node(stream: &mut TcpStream, json: &String, data_nodes: &mut Vec) { + let endpoint : RegisterNode = serde_json::from_str(json).unwrap(); + data_nodes.push(DataNode { address: endpoint.ip, port: endpoint.port, id: 1 }); + for dn in data_nodes { + println!("{:?}", dn); + } + report_success(stream); +} + +fn report_success(stream: &mut TcpStream) { + match serde_json::to_writer( + stream, + &Packet { + p_type: PacketType::Success, + json: None, + }, + ) { + Ok(_) => println!("{}", "Success Registering Data Node"), + Err(e) => println!("{}", e), + }; } fn create_tables(conn: &Connection) { @@ -96,23 +123,20 @@ fn check_node(conn: &Connection, address: &str, port: i32) -> DataNode { let mut stmt = conn .prepare("SELECT nid, address, port FROM dnode WHERE address=?1 AND port=?2") .unwrap(); - stmt.query_row( - &[&address as &ToSql, &port], - |row| DataNode { - id: row.get(0), - address: row.get(1), - port: row.get(2), - }) - .unwrap() + stmt.query_row(&[&address as &ToSql, &port], |row| DataNode { + id: row.get(0), + address: row.get(1), + port: row.get(2), + }).unwrap() } - fn get_data_nodes(conn: &Connection) { - let mut stmt = conn.prepare("SELECT address, port FROM dnode WHERE 1"); - } +fn get_data_nodes(conn: &Connection) { + let mut stmt = conn.prepare("SELECT address, port FROM dnode WHERE 1"); +} - fn insert_file(conn: &Connection, fname: String, fsize: usize) { - let mut stmt = conn.prepare("INSERT INTO inode (fname, fsize) VALUES (\"?1\", ?2)"); - } +fn insert_file(conn: &Connection, fname: String, fsize: usize) { + let mut stmt = conn.prepare("INSERT INTO inode (fname, fsize) VALUES (\"?1\", ?2)"); +} // fn get_file_info(conn: &Connection, fname: String) {} @@ -142,5 +166,4 @@ mod tests { assert_eq!(dnode.address, ip); assert_eq!(dnode.port, 65533); } - } diff --git a/src/lib.rs b/src/lib.rs index cd06f73..e7dfc94 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,8 +3,66 @@ extern crate serde_json; #[macro_use] extern crate serde_derive; +use std::borrow::Cow; +use std::net::Ipv4Addr; + #[derive(Serialize, Deserialize, Debug)] -pub struct TestObj { - pub message : String +pub enum PacketType { + RegisterNode, + ListFiles, + PutFiles, + GetFiles, + AddDataBlocks, + Success, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Packet { + pub p_type: PacketType, + pub json: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct FilePaths<'a> { + pub paths: Cow<'a, [String]>, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct RegisterNode { + pub ip: String, + pub port: u32, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct PutFiles { + pub files: Vec +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct GetFiles {} + +#[derive(Serialize, Deserialize, Debug)] +pub struct AddDataBlocks {} + +#[derive(Debug)] +pub struct DataNode { + pub id: u32, + pub address: String, + pub port: u32, +} + +#[derive(Debug)] +pub struct INode { + pub id: u32, + pub name: String, + pub size: u32, +} + +#[derive(Debug)] +pub struct Block { + pub id: u32, + pub file_id: u32, + pub node_id: u32, + pub c_id: String, }