From a281bd2a9e94c088867e7c8cf4093e9833f223db Mon Sep 17 00:00:00 2001 From: Joseph Ferano Date: Wed, 12 Dec 2018 22:30:55 -0400 Subject: [PATCH] Properly configuring read/write requests between copy and meta_data --- clean => clean_db | 0 src/bin/copy.rs | 52 ++++++++++++++++---------------- src/bin/data_node.rs | 7 ++--- src/bin/meta_data.rs | 70 +++++++++++++++++++++++++++----------------- src/lib.rs | 5 ++-- 5 files changed, 73 insertions(+), 61 deletions(-) rename clean => clean_db (100%) diff --git a/clean b/clean_db similarity index 100% rename from clean rename to clean_db diff --git a/src/bin/copy.rs b/src/bin/copy.rs index 238cec3..2b24619 100644 --- a/src/bin/copy.rs +++ b/src/bin/copy.rs @@ -11,26 +11,28 @@ use std::fs; fn main() { let args = get_cli_args(); + let file = fs::read(args.filename).expect("File not found!"); + let size = file.len(); let mut stream = TcpStream::connect(args.endpoint).unwrap(); + let packet_type; + let json; if args.is_copy_to_dfs { - serde_json::to_writer( - &mut stream, - &Packet { - p_type: PacketType::GetFile, - json: Some(serde_json::to_string( - &PutFile { name: args.filepath, size: 32 }).unwrap()), + packet_type = PacketType::PutFile; + json = Some(serde_json::to_string( + &PutFile { + name: args.filepath, + size: size as u32, }) - .unwrap(); + .unwrap()) } else { - serde_json::to_writer( - &mut stream, - &Packet { - p_type: PacketType::PutFile, - json: Some(serde_json::to_string( - &PutFile { name: args.filepath, size: 32 }).unwrap()), + packet_type = PacketType::GetFile; + json = Some(serde_json::to_string( + &GetFile { }) - .unwrap(); + .unwrap()) } + serde_json::to_writer( &mut stream, &Packet { p_type: packet_type, json, }) + .unwrap(); println!("Sent file"); stream.flush().unwrap(); stream.shutdown(Shutdown::Write).unwrap(); @@ -40,7 +42,6 @@ fn main() { println!("Chunk ID: {}", f.chunk_id); } - let mut file = fs::read(args.filename).expect("File not found!"); println!("{} bytes", file.len()); // let mut stream = TcpStream::connect("localhost:6771").unwrap(); // stream.write(&file).unwrap(); @@ -61,33 +62,30 @@ pub fn get_cli_args() -> CliArgs { if args.len() < 2 { panic!("Requires 2 arguments; IP:PORT:FILEPATH and a Local filename/filepath") } - let endpoint_arg: String = args.get(0).unwrap().clone(); + let mut endpoint_arg: String = args.get(0).unwrap().clone(); let endpoint; let filepath; let filename; - let is_copy_to_dfs; + let splits: Vec<&str>; - if endpoint_arg.contains(":") { - let splits: Vec<&str> = endpoint_arg.split(':').collect(); + let is_copy_to_dfs = endpoint_arg.contains(":"); + if is_copy_to_dfs { + splits = endpoint_arg.split(':').collect(); if splits.len() < 3 { panic!("Incorrect endpoint argument format! Please provide IP:PORT:FILE"); } - endpoint = format!("{}:{}", splits[0], splits[1]); - filepath = String::from(splits[2]); filename = args.get(1).unwrap().clone(); - is_copy_to_dfs = true; } else { - let endpoint_arg: String = args.get(1).unwrap().clone(); - let splits: Vec<&str> = endpoint_arg.split(':').collect(); + endpoint_arg = args.get(1).unwrap().clone(); + splits = endpoint_arg.split(':').collect(); if splits.len() < 3 { panic!("Incorrect endpoint argument format! Please provide IP:PORT:FILE"); } - endpoint = format!("{}:{}", splits[0], splits[1]); - filepath = String::from(splits[2]); filename = args.get(0).unwrap().clone(); - is_copy_to_dfs = false; } + endpoint = format!("{}:{}", splits[0], splits[1]); + filepath = String::from(splits[2]); CliArgs { endpoint, filepath, filename, is_copy_to_dfs } } diff --git a/src/bin/data_node.rs b/src/bin/data_node.rs index d958930..cf8beb9 100644 --- a/src/bin/data_node.rs +++ b/src/bin/data_node.rs @@ -6,16 +6,13 @@ extern crate serde_derive; use a03::*; use std::net::{TcpStream, Shutdown}; -use std::thread; -use std::io::Read; use std::io::Write; use std::net::TcpListener; -use std::fs::File; - fn main() { + let endpoint = parse_endpoint_from_cli(0); + let listener = TcpListener::bind(endpoint).unwrap(); register_with_meta_server(); - let listener = TcpListener::bind("localhost:6771").unwrap(); for stream in listener.incoming() { let mut stream = stream.unwrap(); diff --git a/src/bin/meta_data.rs b/src/bin/meta_data.rs index f0b0df9..148d8c7 100644 --- a/src/bin/meta_data.rs +++ b/src/bin/meta_data.rs @@ -24,8 +24,10 @@ fn main() { PacketType::ListFiles => list(&mut stream, &conn), PacketType::NodeRegistration => node_registration(&mut stream, &conn, &packet.json.unwrap()), - PacketType::PutFile => - put_file(&mut stream, &conn, &packet.json.unwrap()), + PacketType::RequestRead => + request_read(&mut stream, &conn, &packet.json.unwrap()), + PacketType::RequestWrite => + request_write(&mut stream, &conn, &packet.json.unwrap()), _ => (), }, Err(e) => println!("Error parsing json {}", e.to_string()), @@ -34,36 +36,56 @@ fn main() { } } -fn put_file(stream: &mut TcpStream, conn: &Connection, message: &str) { +fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) { let file : PutFile = serde_json::from_str(message).unwrap(); add_file(&conn, &file.name, file.size as i32); - let mut blocks: Vec = Vec::new(); - // Divide the blocks up into the amount of nodes available - add_blocks_to_inode(&conn, &file.name, &blocks); + let (inode, blocks) = get_file_inode(&conn, &file.name); let mut nodes: Vec = Vec::new(); - for dn in get_data_nodes(&conn) { + for b in blocks { nodes.push(AvailableNodes { - ip: dn.ip, - port: dn.port, - chunk_id: uuid::Uuid::new_v4().to_string(), + ip: b.data_node.ip, + port: b.data_node.port, + chunk_id: b.chunk_id, }); } - match serde_json::to_writer( - stream, - &nodes - ) { + match serde_json::to_writer( stream, &nodes) { + Ok(_) => println!("{}", "Sent nodes with chunks"), + Err(e) => println!("{}", e), + }; +} + +fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { + let file : PutFile = serde_json::from_str(message).unwrap(); + add_file(&conn, &file.name, file.size as i32); + // TODO: Need to ensure that replaces also work + let fid = conn.last_insert_rowid(); + let mut blocks: Vec = Vec::new(); + let mut nodes: Vec = Vec::new(); + let dnodes = get_data_nodes(&conn); + for i in 0..dnodes.len() { + let dn = &dnodes[i]; + let uuid = uuid::Uuid::new_v4().to_string(); + blocks.push(Block { + chunk_id: uuid.clone(), + node_id: dn.id, + file_id: fid as u32, + id: 0, + }); + nodes.push(AvailableNodes { + ip: dn.ip.clone(), + port: dn.port, + chunk_id: uuid.clone(), + }); + } + add_blocks_to_inode(&conn, &file.name, &blocks); + match serde_json::to_writer(stream, &nodes) { Ok(_) => println!("{}", "Sent nodes with chunks"), Err(e) => println!("{}", e), }; } fn list(stream: &mut TcpStream, conn: &Connection) { - match serde_json::to_writer( - stream, - &FilePaths { - paths: Cow::from(get_files(&conn)), - }, - ) { + match serde_json::to_writer(stream, &FilePaths { paths: Cow::from(get_files(&conn)), }) { Ok(_) => println!("{}", "Sent file paths"), Err(e) => println!("{}", e), }; @@ -85,13 +107,7 @@ fn node_registration(stream: &mut TcpStream, conn: &Connection, json: &String) { } fn report_success(stream: &mut TcpStream, message: &str) { - match serde_json::to_writer( - stream, - &Packet { - p_type: PacketType::Success, - json: None, - }, - ) { + match serde_json::to_writer(stream, &Packet { p_type: PacketType::Success, json: None, }) { Ok(_) => println!("{}", message), Err(e) => println!("{}", e), }; diff --git a/src/lib.rs b/src/lib.rs index 93e15c5..ea93271 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,6 @@ use std::borrow::Cow; use std::net::Ipv4Addr; use std::net::SocketAddrV4; use std::str::FromStr; -//use std:: const DEFAULT_PORT: &str = "8000"; @@ -17,6 +16,8 @@ pub enum PacketType { ListFiles, PutFile, GetFile, + RequestRead, + RequestWrite, AddDataBlocks, ShutdownDataNode, Success, @@ -54,7 +55,7 @@ pub struct AvailableNodes { } #[derive(Serialize, Deserialize, Debug)] -pub struct GetFiles {} +pub struct GetFile {} #[derive(Serialize, Deserialize, Debug)] pub struct AddDataBlocks {}