Adding checks for existing and missing files

This commit is contained in:
Joseph Ferano 2018-12-14 20:28:22 -04:00
parent 3bd0e60169
commit 30d68cd0ee
3 changed files with 161 additions and 102 deletions

View File

@ -17,32 +17,45 @@ fn main() {
let packet_type;
let json;
if args.is_copy_to_dfs {
packet_type = PacketType::PutFile;
packet_type = PacketType::RequestWrite;
println!("Requesting Write of {}", args.filepath);
json = Some(serde_json::to_string(
&PutFile {
name: args.filepath,
size: size as u32,
})
.unwrap())
&PutFile { name: args.filepath, size: size as u32, }).unwrap())
} else {
packet_type = PacketType::GetFile;
packet_type = PacketType::RequestRead;
println!("Requesting Read of {}", args.filepath);
json = Some(serde_json::to_string(
&GetFile {
})
.unwrap())
&GetFile { name: args.filepath, }).unwrap())
}
serde_json::to_writer( &mut stream, &Packet { p_type: packet_type, json, })
.unwrap();
println!("Sent file");
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
let files: Vec<AvailableNodes> = serde_json::from_reader(&mut stream).unwrap();
for f in files {
println!("Chunk ID: {}", f.chunk_id);
}
match serde_json::from_reader(&mut stream) {
Ok(packet @ Packet { .. }) => match packet.p_type {
PacketType::Success => {
let nodes = serde_json::from_str::<Vec<AvailableNodes>>(&packet.json.unwrap())
.unwrap();
for node in nodes {
println!("{}", node.chunk_id);
}
},
PacketType::Error => {
let unwrapped = &packet.json.unwrap();
panic!("Meta Data Server Error: {}", unwrapped);
},
_ => (),
},
Err(e) => println!("Error parsing json {}", e.to_string()),
};
println!("{} bytes", file.len());
// let files: Vec<AvailableNodes> = serde_json::from_reader(&mut stream).unwrap();
// for f in files {
// println!("Chunk ID: {}", f.chunk_id);
// }
// println!("{} bytes", file.len());
// let mut stream = TcpStream::connect("localhost:6771").unwrap();
// stream.write(&file).unwrap();
// stream.flush().unwrap();
@ -64,25 +77,27 @@ pub fn get_cli_args() -> CliArgs {
}
let mut endpoint_arg: String = args.get(0).unwrap().clone();
println!("Endpoint Arg {}", endpoint_arg);
let endpoint;
let filepath;
let filename;
let splits: Vec<&str>;
let is_copy_to_dfs = endpoint_arg.contains(":");
let is_copy_to_dfs = !endpoint_arg.contains(":");
if is_copy_to_dfs {
splits = endpoint_arg.split(':').collect();
if splits.len() < 3 {
panic!("Incorrect endpoint argument format! Please provide IP:PORT:FILE");
}
filename = args.get(1).unwrap().clone();
} else {
endpoint_arg = args.get(1).unwrap().clone();
splits = endpoint_arg.split(':').collect();
if splits.len() < 3 {
panic!("Incorrect endpoint argument format! Please provide IP:PORT:FILE");
}
filename = args.get(0).unwrap().clone();
} else {
splits = endpoint_arg.split(':').collect();
if splits.len() < 3 {
panic!("Incorrect endpoint argument format! Please provide IP:PORT:FILE");
}
filename = args.get(1).unwrap().clone();
}
endpoint = format!("{}:{}", splits[0], splits[1]);
filepath = String::from(splits[2]);

View File

@ -37,9 +37,22 @@ fn main() {
}
fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
let file : PutFile = serde_json::from_str(message).unwrap();
add_file(&conn, &file.name, file.size as i32);
let (inode, blocks) = get_file_inode(&conn, &file.name);
let file: GetFile = serde_json::from_str(message).unwrap();
let file_info = get_file_info(&conn, &file.name);
if file_info.is_none() {
match serde_json::to_writer(
stream,
&Packet {
p_type: PacketType::Error,
json: Some(String::from("File not found")),
}) {
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
Err(e) => println!("{}", e),
};
return;
}
let file_info = file_info.unwrap();
let blocks = get_file_inode(&conn, file_info.id);
let mut nodes: Vec<AvailableNodes> = Vec::new();
for b in blocks {
nodes.push(AvailableNodes {
@ -48,17 +61,30 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
chunk_id: b.chunk_id,
});
}
match serde_json::to_writer( stream, &nodes) {
match serde_json::to_writer(
stream,
&Packet { p_type: PacketType::Success, json: Some(serde_json::to_string(&nodes).unwrap()) }) {
Ok(_) => println!("{}", "Sent nodes with chunks"),
Err(e) => println!("{}", e),
};
}
fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
let file : PutFile = serde_json::from_str(message).unwrap();
add_file(&conn, &file.name, file.size as i32);
// TODO: Need to ensure that replaces also work
let fid = conn.last_insert_rowid();
let file: PutFile = serde_json::from_str(message).unwrap();
let file_already_exists = add_file(&conn, &file.name, file.size as i32);
if file_already_exists {
match serde_json::to_writer(
stream,
&Packet {
p_type: PacketType::Error,
json: Some(String::from("File already exists, please remove before re-uploading")),
}) {
Ok(_) => println!("{}", "Copy client attempted to add an existing file"),
Err(e) => println!("{}", e),
};
return;
}
let file_info = get_file_info(&conn, &file.name).unwrap();
let mut blocks: Vec<Block> = Vec::new();
let mut nodes: Vec<AvailableNodes> = Vec::new();
let dnodes = get_data_nodes(&conn);
@ -68,7 +94,7 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
blocks.push(Block {
chunk_id: uuid.clone(),
node_id: dn.id,
file_id: fid as u32,
file_id: file_info.id as u32,
id: 0,
});
nodes.push(AvailableNodes {
@ -77,28 +103,29 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
chunk_id: uuid.clone(),
});
}
add_blocks_to_inode(&conn, &file.name, &blocks);
match serde_json::to_writer(stream, &nodes) {
add_blocks_to_inode(&conn, file_info.id, &blocks);
match serde_json::to_writer(
stream,
&Packet { p_type: PacketType::Success, json: Some(serde_json::to_string(&nodes).unwrap()) }) {
Ok(_) => println!("{}", "Sent nodes with chunks"),
Err(e) => println!("{}", e),
};
}
fn list(stream: &mut TcpStream, conn: &Connection) {
match serde_json::to_writer(stream, &FilePaths { paths: Cow::from(get_files(&conn)), }) {
match serde_json::to_writer(stream, &FilePaths { paths: Cow::from(get_files(&conn)) }) {
Ok(_) => println!("{}", "Sent file paths"),
Err(e) => println!("{}", e),
};
}
fn node_registration(stream: &mut TcpStream, conn: &Connection, json: &String) {
let endpoint : NodeRegistration = serde_json::from_str(json).unwrap();
let endpoint: NodeRegistration = serde_json::from_str(json).unwrap();
let message = if endpoint.register {
// TODO: We probably should check if the endpoint already exists!
add_data_node(&conn, &endpoint.ip, endpoint.port as i32);
"You were successfully registered"
}
else {
} else {
// TODO: We should check if the endpoint exists!
remove_data_node(&conn, &endpoint.ip, endpoint.port as i32);
"You were successfully unregistered"
@ -107,7 +134,7 @@ fn node_registration(stream: &mut TcpStream, conn: &Connection, json: &String) {
}
fn report_success(stream: &mut TcpStream, message: &str) {
match serde_json::to_writer(stream, &Packet { p_type: PacketType::Success, json: None, }) {
match serde_json::to_writer(stream, &Packet { p_type: PacketType::Success, json: None }) {
Ok(_) => println!("{}", message),
Err(e) => println!("{}", e),
};
@ -160,12 +187,20 @@ fn get_data_nodes(conn: &Connection) -> Vec<DataNode> {
nodes
}
fn add_file(conn: &Connection, fname: &String, fsize: i32) {
fn add_file(conn: &Connection, fname: &String, fsize: i32) -> bool {
let file_exists = conn.query_row(
"SELECT fid FROM inode WHERE fname = ?1",
&[&fname as &ToSql],
|_| {})
.is_ok();
if file_exists {
return true;
}
conn.execute(
"INSERT OR REPLACE INTO inode (fid, fname, fsize)
VALUES ((SELECT fid FROM inode WHERE fname = ?1), ?1, ?2)",
"INSERT INTO inode (fname, fsize) VALUES (?1, ?2)",
&[&fname as &ToSql, &fsize])
.unwrap();
false
}
fn remove_file(conn: &Connection, fname: String) {
@ -175,61 +210,62 @@ fn remove_file(conn: &Connection, fname: String) {
.unwrap();
}
fn get_file_info(conn: &Connection, fname: &String) -> INode {
conn.query_row(
"SELECT fid, fsize FROM inode where fname=?1",
&[&fname as &ToSql],
|row| INode { id: row.get(0), size: row.get(1), name: fname.clone() }
).unwrap()
}
fn get_file_info(conn: &Connection, fname: &String) -> Option<INode> {
conn.query_row(
"SELECT fid, fsize FROM inode WHERE fname=?1",
&[&fname as &ToSql],
|row| INode { id: row.get(0), size: row.get(1), name: fname.clone() },
).ok()
}
fn get_files(conn: &Connection) -> Vec<String> {
let mut stmt = conn
.prepare("SELECT fname, fsize FROM inode")
.unwrap();
let mut files = Vec::new();
match stmt.query_map(NO_PARAMS, |row| INode {
id: 0,
name: row.get(0),
size: row.get(1),
}) {
Ok(iter) => {
for f in iter {
let f = f.unwrap();
files.push(format!("{} {} bytes", f.name, f.size));
}
},
Err(e) => println!("Error! {}", e),
}
files
}
fn get_files(conn: &Connection) -> Vec<String> {
let mut stmt = conn
.prepare("SELECT fname, fsize FROM inode")
.unwrap();
let mut files = Vec::new();
match stmt.query_map(NO_PARAMS, |row| INode {
id: 0,
name: row.get(0),
size: row.get(1),
}) {
Ok(iter) => {
for f in iter {
let f = f.unwrap();
files.push(format!("{} {} bytes", f.name, f.size));
}
}
Err(e) => println!("Error! {}", e),
}
files
}
fn add_blocks_to_inode(conn: &Connection, fname: &String, blocks: &Vec<Block>) {
let fid : u32 = get_file_info(&conn, fname).id;
for block in blocks {
conn.execute(
"INSERT INTO block (nid, fid, cid) VALUES (?1, ?2, ?3)",
&[&block.node_id as &ToSql, &fid, &block.chunk_id]).unwrap();
}
}
fn add_blocks_to_inode(conn: &Connection, fid: u32, blocks: &Vec<Block>) {
for block in blocks {
match conn.execute(
"INSERT INTO block (nid, fid, cid) VALUES (?1, ?2, ?3)",
&[&block.node_id as &ToSql, &fid, &block.chunk_id]) {
Ok(n) => println!("Updated {}", n),
Err(e) => println!("Error: {}", e),
}
}
}
fn get_file_inode(conn: &Connection, fname: &String) -> (INode, Vec<BlockQuery>) {
let file = get_file_info(&conn, &fname);
let mut stmt = conn.prepare(
"SELECT dnode.nid, address, port, cid FROM dnode, block WHERE dnode.nid = block.nid AND block.fid = ?1")
.unwrap();
let iter = stmt.query_map(
&[&file.id],
|row| BlockQuery {
data_node: DataNode { id: row.get(0), ip: row.get(1), port: row.get(2) },
chunk_id: row.get(3),
}).unwrap();
let mut blocks: Vec<BlockQuery> = Vec::new();
for b in iter {
blocks.push(b.unwrap());
}
(file, blocks)
}
fn get_file_inode(conn: &Connection, fid: u32) -> Vec<BlockQuery> {
let mut stmt = conn.prepare(
"SELECT dnode.nid, address, port, cid FROM dnode, block WHERE dnode.nid = block.nid AND block.fid = ?1")
.unwrap();
let iter = stmt.query_map(
&[&fid],
|row| BlockQuery {
data_node: DataNode { id: row.get(0), ip: row.get(1), port: row.get(2) },
chunk_id: row.get(3),
}).unwrap();
let mut blocks: Vec<BlockQuery> = Vec::new();
for b in iter {
blocks.push(b.unwrap());
}
blocks
}
#[cfg(test)]
mod tests {
@ -242,7 +278,7 @@ mod tests {
fid INTEGER PRIMARY KEY ASC AUTOINCREMENT,
fname TEXT UNIQUE NOT NULL DEFAULT \" \",
fsize INTEGER NOT NULL default \"0\")",
NO_PARAMS
NO_PARAMS,
).unwrap();
conn.execute(
@ -259,7 +295,7 @@ bid INTEGER PRIMARY KEY ASC AUTOINCREMENT,
fid INTEGER NOT NULL DEFAULT \"0\",
nid INTEGER NOT NULL DEFAULT \"0\",
cid TEXT NOT NULL DEFAULT \"0\")",
NO_PARAMS
NO_PARAMS,
).unwrap();
conn
}
@ -355,12 +391,18 @@ cid TEXT NOT NULL DEFAULT \"0\")",
add_file(&conn, &fname, 32);
let files = get_files(&conn);
assert_eq!(files.len(), 1);
let file = get_file_info(&conn, &fname);
let file = get_file_info(&conn, &fname).unwrap();
assert_eq!(file.id, 1);
assert_eq!(file.name, "my_1337_virus");
assert_eq!(file.size, 32);
}
#[test]
fn adding_new_file_returns_true() {}
#[test]
fn adding_same_file_returns_false() {}
#[test]
fn removes_file() {
let conn = get_test_db();
@ -398,7 +440,7 @@ cid TEXT NOT NULL DEFAULT \"0\")",
add_data_node(&conn, "127.0.0.1", 1337);
add_data_node(&conn, "127.0.0.2", 1338);
add_data_node(&conn, "127.0.0.2", 1339);
let inode = get_file_info(&conn, &filename);
let inode = get_file_info(&conn, &filename).unwrap();
let blocks = vec!(
Block {
file_id: inode.id,
@ -419,8 +461,8 @@ cid TEXT NOT NULL DEFAULT \"0\")",
chunk_id: String::from("c3"),
},
);
add_blocks_to_inode(&conn, &filename, &blocks);
let (inode, blocks) = get_file_inode(&conn, &filename);
add_blocks_to_inode(&conn, inode.id, &blocks);
let blocks = get_file_inode(&conn, inode.id);
assert_eq!(inode.name, "main_file");
assert_eq!(inode.size, 128);
assert_eq!(blocks.len(), 3);
@ -437,5 +479,4 @@ cid TEXT NOT NULL DEFAULT \"0\")",
assert_eq!(blocks[2].data_node.ip, "127.0.0.2");
assert_eq!(blocks[2].data_node.port, 1339);
}
}

View File

@ -21,6 +21,7 @@ pub enum PacketType {
AddDataBlocks,
ShutdownDataNode,
Success,
Error,
}
#[derive(Serialize, Deserialize, Debug)]
@ -55,7 +56,9 @@ pub struct AvailableNodes {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetFile {}
pub struct GetFile {
pub name: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct AddDataBlocks {}