diff --git a/clean_db b/clean_db index 48d06e7..39f00cd 100755 --- a/clean_db +++ b/clean_db @@ -1 +1 @@ -rm dfs.db && python dfs_skel/createdb.py +rm dfs.db && python createdb.py diff --git a/createdb.py b/createdb.py new file mode 100644 index 0000000..f197917 --- /dev/null +++ b/createdb.py @@ -0,0 +1,20 @@ +import sqlite3 + +conn = sqlite3.connect("dfs.db") + +c = conn.cursor() + +# Create inode table +c.execute("""CREATE TABLE inode (fid INTEGER PRIMARY KEY ASC AUTOINCREMENT, fname TEXT UNIQUE NOT NULL DEFAULT " ", fsize INTEGER NOT NULL default "0")""") + +# Create data node table +c.execute("""CREATE TABLE dnode(nid INTEGER PRIMARY KEY ASC AUTOINCREMENT, address TEXT NOT NULL default " ", port INTEGER NOT NULL DEFAULT "0")""") + +# Create UNIQUE tuple for data node +c.execute("""CREATE UNIQUE INDEX dnodeA ON dnode(address, port)""") + +# Create block table +c.execute("""CREATE TABLE block (bid INTEGER PRIMARY KEY ASC AUTOINCREMENT, fid INTEGER NOT NULL DEFAULT "0", nid INTEGER NOT NULL DEFAULT "0", cid INTEGER NOT NULL DEFAULT "0")""") + +# Create UNIQUE tuple for block +# c.execute("""CREATE UNIQUE INDEX blocknc ON block(nid, cid)""") diff --git a/src/bin/copy.rs b/src/bin/copy.rs index aadb2ad..3d6ba69 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -12,7 +12,7 @@ use std::fs; fn main() { let args = get_cli_args(); - let mut stream = TcpStream::connect(args.endpoint).unwrap(); + let mut stream = TcpStream::connect(&args.endpoint).unwrap(); let packet_type; let json; if args.is_copy_to_dfs { @@ -21,13 +21,13 @@ fn main() { let size = file.len(); println!("Requesting Write of {}", args.filepath); json = Some(serde_json::to_string( - &AddFile { name: args.filepath.clone(), size: size as u32, }).unwrap()) + &AddFile { name: args.filepath.clone(), size: size as u32 }).unwrap()) } else { packet_type = PacketType::RequestRead; println!("Requesting Read of {}", args.filepath); json = Some(serde_json::to_string::(&args.filepath).unwrap()) } - serde_json::to_writer( &mut stream, &Packet { p_type: packet_type, json, data: None, }) + serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, data: None }) .unwrap(); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); @@ -39,8 +39,8 @@ fn main() { .unwrap()), Ok(Packet { p_type: PacketType::Error, json, .. }) => { eprintln!("Meta Data Server Error: {}", &json.unwrap()); - }, - Ok(_) => {}, + } + Ok(_) => {} Err(e) => eprintln!("Error parsing json {}", e.to_string()), }; let filename = &args.filepath; @@ -53,8 +53,13 @@ fn main() { } } -fn send_file_to_data_nodes(filename: &String, nodes: &Vec, file: &Vec) { - let mut stream = TcpStream::connect("localhost:6771").unwrap(); +fn send_file_to_data_nodes( + filename: &String, + nodes: &Vec, + file: &Vec) +{ + let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port); + let mut stream = TcpStream::connect(endpoint).unwrap(); println!("Going to send a file! Bytes {}", file.len()); let chunk = Chunk { index: nodes[0].chunk_index, @@ -71,13 +76,17 @@ fn send_file_to_data_nodes(filename: &String, nodes: &Vec, file: stream.shutdown(Shutdown::Write).unwrap(); } -fn get_file_from_data_nodes(destination_path: &String, filename: &String, nodes: &Vec) { +fn get_file_from_data_nodes( + destination_path: &String, + filename: &String, + nodes: &Vec) +{ let chunk = Chunk { index: nodes[0].chunk_index, filename: filename.clone(), }; - let mut stream = TcpStream::connect("localhost:6771").unwrap(); - println!("The filename is {}", filename); + let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port); + let mut stream = TcpStream::connect(endpoint).unwrap(); let packet = serde_json::to_writer( &stream, &Packet { @@ -94,11 +103,11 @@ fn get_file_from_data_nodes(destination_path: &String, filename: &String, nodes: // TODO: Here we have to rebuild the chunks let mut copy = File::create(destination_path).unwrap(); copy.write_all(&data[..]).unwrap(); - }, + } Ok(Packet { p_type: PacketType::Error, json, .. }) => { eprintln!("Data Node Server Error: {}", &json.unwrap()); - }, - Ok(_) => {}, + } + Ok(_) => {} Err(e) => eprintln!("Error parsing json {}", e.to_string()), }; } @@ -118,8 +127,6 @@ pub fn get_cli_args() -> CliArgs { } let mut endpoint_arg: String = args.get(0).unwrap().clone(); - println!("Endpoint Arg {}", endpoint_arg); - let endpoint; let filepath; let filename; @@ -143,8 +150,6 @@ pub fn get_cli_args() -> CliArgs { endpoint = format!("{}:{}", splits[0], splits[1]); filepath = String::from(splits[2]); - let a = CliArgs { endpoint, filepath, filename, is_copy_to_dfs }; - println!("{:?}", a); - a + CliArgs { endpoint, filepath, filename, is_copy_to_dfs } } diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index f984600..746fdec 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -14,38 +14,43 @@ use std::fs; use std::error::Error; fn main() { - let endpoint = parse_endpoint_from_cli(0); - let listener = TcpListener::bind(&endpoint).unwrap(); - register_with_meta_server(&endpoint); + let node_endpoint = parse_endpoint_from_cli(0); + let metadata_endpoint = parse_endpoint_from_cli(1); + let data_path = std::env::args().skip(3).next() + .expect("Missing data path"); + let listener = TcpListener::bind(&node_endpoint).unwrap(); + register_with_meta_server(&metadata_endpoint, &node_endpoint); for stream in listener.incoming() { let mut stream = stream.unwrap(); match serde_json::from_reader(&mut stream) { Ok(Packet { p_type: PacketType::GetFile, json, .. }) => { - send_file(&mut stream, &json.unwrap()); + send_file(&data_path, &mut stream, &json.unwrap()); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); } Ok(Packet { p_type: PacketType::PutFile, json, data, }) => - receive_file(&json.unwrap(), &data.unwrap()), + receive_file(&data_path, &json.unwrap(), &data.unwrap()), Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => - shutdown(&mut stream, &endpoint), + shutdown(&mut stream, &metadata_endpoint, &node_endpoint), Ok(_) => eprintln!("We don't handle this PacketType"), Err(e) => eprintln!("Error parsing json: {}", e.to_string()), }; } } -fn receive_file(json: &String, data: &Vec) { +fn receive_file(base_path: &String, json: &String, data: &Vec) { let chunk: Chunk = serde_json::from_str(json).unwrap(); - let mut copy = File::create(format!("{}_{}", chunk.filename, chunk.index)).unwrap(); + let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); + println!("{}", filepath); + let mut copy = File::create(filepath).unwrap(); copy.write_all(&data[..]).unwrap(); } -fn send_file(stream: &mut TcpStream, json: &String) { +fn send_file(base_path: &String, stream: &mut TcpStream, json: &String) { let chunk: Chunk = serde_json::from_str(json).unwrap(); println!("{}", chunk.filename); - match fs::read(format!("{}_{}", &chunk.filename, &chunk.index)) { + match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) { Ok(f) => { serde_json::to_writer( stream, @@ -70,9 +75,10 @@ fn send_file(stream: &mut TcpStream, json: &String) { }; } -fn register_with_meta_server(endpoint: &String) { - let mut stream = TcpStream::connect("localhost:6770").unwrap(); - let split: Vec<&str> = endpoint.split(":").collect(); +fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String) { + println!("{}", metadata_endpoint); + let mut stream = TcpStream::connect(&metadata_endpoint).unwrap(); + let split: Vec<&str> = node_endpoint.split(":").collect(); serde_json::to_writer( &mut stream, &Packet { @@ -94,9 +100,9 @@ fn register_with_meta_server(endpoint: &String) { println!("{:?}", result); } -fn shutdown(stream: &mut TcpStream, endpoint: &String) { - let mut stream = TcpStream::connect("localhost:6770").unwrap(); - let split: Vec<&str> = endpoint.split(":").collect(); +fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: &String) { + let mut stream = TcpStream::connect(&metadata_endpoint).unwrap(); + let split: Vec<&str> = node_endpoint.split(":").collect(); serde_json::to_writer( &mut stream, &Packet { diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index 8b87eaf..b1d685f 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -14,8 +14,8 @@ use std::net::{TcpListener, TcpStream}; fn main() { let conn = Connection::open("dfs.db") .expect("Error opening 'dfs.db', consider running 'python createdb.py'"); - - let listener = TcpListener::bind("localhost:6770").unwrap(); + let port = std::env::args().skip(1).next().unwrap_or(String::from(DEFAULT_PORT)); + let listener = TcpListener::bind(format!("localhost:{}", port)).unwrap(); for stream in listener.incoming() { let mut stream = stream.unwrap(); match serde_json::from_reader(&mut stream) { diff --git a/src/lib.rs b/src/lib.rs index 04895bf..5965e4c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ use std::net::Ipv4Addr; use std::net::SocketAddrV4; use std::str::FromStr; -const DEFAULT_PORT: &str = "8000"; +pub const DEFAULT_PORT: &str = "8000"; #[derive(Serialize, Deserialize, Debug)] pub enum PacketType {