Compare commits

...

10 Commits

12 changed files with 211 additions and 191 deletions

2
.gitignore vendored
View File

@ -8,5 +8,5 @@
*.db *.db
dfs_skel/ dfs_skel/
venv/ venv/
data_node/ dn*/
copy_dir/ copy_dir/

63
Cargo.lock generated
View File

@ -6,7 +6,6 @@ dependencies = [
"serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -14,28 +13,6 @@ name = "bitflags"
version = "1.0.4" version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "cloudabi"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "0.4.3" version = "0.4.3"
@ -89,31 +66,6 @@ dependencies = [
"proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)", "proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "rand"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)",
"rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand_core"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand_core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.1.40" version = "0.1.40"
@ -185,14 +137,6 @@ name = "unicode-xid"
version = "0.1.0" version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "uuid"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.6" version = "0.2.6"
@ -219,9 +163,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata] [metadata]
"checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12" "checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
"checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b" "checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b"
"checksum libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)" = "10923947f84a519a45c8fefb7dd1b3e8c08747993381adee176d7a82b4195311" "checksum libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)" = "10923947f84a519a45c8fefb7dd1b3e8c08747993381adee176d7a82b4195311"
"checksum libsqlite3-sys 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "742b695cbfb89e549dca6960a55e6802f67d352e33e97859ee46dee835211b0f" "checksum libsqlite3-sys 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "742b695cbfb89e549dca6960a55e6802f67d352e33e97859ee46dee835211b0f"
@ -230,9 +171,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c" "checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c"
"checksum proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)" = "77619697826f31a02ae974457af0b29b723e5619e113e9397b8b82c6bd253f09" "checksum proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)" = "77619697826f31a02ae974457af0b29b723e5619e113e9397b8b82c6bd253f09"
"checksum quote 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "53fa22a1994bd0f9372d7a816207d8a2677ad0325b073f5c5332760f0fb62b5c" "checksum quote 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "53fa22a1994bd0f9372d7a816207d8a2677ad0325b073f5c5332760f0fb62b5c"
"checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c"
"checksum rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1961a422c4d189dfb50ffa9320bf1f2a9bd54ecb92792fb9477f99a1045f3372"
"checksum rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0905b6b7079ec73b314d4c748701f6931eb79fd97c668caa3f1899b22b32c6db"
"checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1" "checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1"
"checksum rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "39bae767eb27866f5c0be918635ae54af705bc09db11be2c43a3c6b361cf3462" "checksum rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "39bae767eb27866f5c0be918635ae54af705bc09db11be2c43a3c6b361cf3462"
"checksum ryu 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "eb9e9b8cde282a9fe6a42dd4681319bfb63f121b8a8ee9439c6f4107e58a46f7" "checksum ryu 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "eb9e9b8cde282a9fe6a42dd4681319bfb63f121b8a8ee9439c6f4107e58a46f7"
@ -242,7 +180,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum syn 0.15.22 (registry+https://github.com/rust-lang/crates.io-index)" = "ae8b29eb5210bc5cf63ed6149cbf9adfc82ac0be023d8735c176ee74a2db4da7" "checksum syn 0.15.22 (registry+https://github.com/rust-lang/crates.io-index)" = "ae8b29eb5210bc5cf63ed6149cbf9adfc82ac0be023d8735c176ee74a2db4da7"
"checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b" "checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b"
"checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
"checksum uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dab5c5526c5caa3d106653401a267fed923e7046f35895ffcb5ca42db64942e6"
"checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d" "checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d"
"checksum winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "92c1eb33641e276cfa214a0522acad57be5c56b10cb348b3c5117db75f3ac4b0" "checksum winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "92c1eb33641e276cfa214a0522acad57be5c56b10cb348b3c5117db75f3ac4b0"
"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"

View File

@ -1,5 +1,5 @@
[package] [package]
name = "a03" name = "distributed-fs"
version = "0.1.0" version = "0.1.0"
authors = ["Joseph Ferano <joseph@ferano.io>"] authors = ["Joseph Ferano <joseph@ferano.io>"]
@ -8,4 +8,3 @@ rusqlite = "0.15.0"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
uuid = { version = "0.7", features = ["v4"] }

View File

@ -1,10 +1,19 @@
all: build all: build
dist: build dist: clean build
@rm -f assig-03-dfs.tar.gz @cp target/release/copy .
@cp target/release/copy copy @cp target/release/ls .
@tar -czf assig-03-dfs.tar.gz src/ README.md copy @cp target/release/data_node .
@cp target/release/meta_data .
@tar -czf assig-03-dfs.tar.gz src/ README.md copy ls data_node meta_data Cargo.* clean_db createdb.py
@rm copy @rm copy
@rm ls
@rm data_node
@rm meta_data
clean:
@rm -f assig-03-dfs.tar.gz
build: build:
cargo build --release cargo build --release

View File

@ -1,21 +1,79 @@
## Distributed File System in Rust for CCOM4017 # Distributed File System in Rust
#### Running A distributed file system implementation with client-server architecture, built as one of my early projects exploring Rust and distributed systems concepts.
```./copy <PARAM1> <PARAM2>``` ## Architecture
#### Building The system consists of four main components:
If you wish to compile the code, install rust and cargo - **copy** - Client for reading and writing files to the distributed system
[Link](https://www.rust-lang.org/en-US/install.html) - **ls** - Client for listing files stored in the system
- **data_node** - Storage server that handles file chunks
- **meta_data** - Metadata server managing file locations and node registry
Then just run build ## How It Works
```cargo build``` The metadata server uses SQLite to track connected data nodes and file locations. When storing a file, the client connects to the metadata server, which provides a list of available data nodes. The client then divides the file into chunks and distributes them across multiple data nodes, transferring 256 bytes at a time.
If you wish to run a specific algorithm; The system uses JSON serialization via `serde_json` for communication between components. All network communication happens over TCP with a custom protocol for coordinating file operations.
```cargo run --bin copy ``` ## Usage
#### Testing ### Starting the Metadata Server
```bash
cargo run --bin meta_data [port]
# Defaults to port 8000 if not specified
```
### Starting Data Nodes
```bash
cargo run --bin data_node <node_endpoint> <metadata_endpoint> [base_path]
# Example: cargo run --bin data_node localhost:6771 127.0.0.1:8000 ./data
```
### Listing Files
```bash
cargo run --bin ls <metadata_endpoint>
# Example: cargo run --bin ls 127.0.0.1:8000
```
### Copying Files
**Upload to distributed system:**
```bash
cargo run --bin copy <local_file> <endpoint:remote_path>
# Example: cargo run --bin copy ./document.pdf localhost:8000:docs/document.pdf
```
**Download from distributed system:**
```bash
cargo run --bin copy <endpoint:remote_path> <local_file>
# Example: cargo run --bin copy localhost:8000:docs/document.pdf ./document.pdf
```
## Database Setup
Run the included Python script to initialize the SQLite database:
```bash
python3 createdb.py
```
The database schema includes tables for file metadata (inodes), data node registry, and block location tracking.
## Building
```bash
cargo build
```
## Testing
```bash
cargo test --bin meta_data
```
## Dependencies
- **rusqlite** - SQLite database interface
- **serde** - Serialization framework
- **serde_json** - JSON support for network protocol

BIN
pug.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

2
sm
View File

@ -1 +1 @@
echo $1 | nc -v -N localhost 6770 echo $1 | nc -v -N localhost $2

View File

@ -1,14 +1,14 @@
extern crate a03; extern crate distributed_fs;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
#[macro_use]
extern crate serde_derive; extern crate serde_derive;
use a03::*; use distributed_fs::*;
use std::net::{TcpStream, Shutdown}; use std::net::{TcpStream, Shutdown};
use std::io::{Write, Read};
use std::fs::File; use std::fs::File;
use std::fs; use std::fs;
use std::io::{Write, BufWriter};
use std::time::Instant;
fn main() { fn main() {
let args = get_cli_args(); let args = get_cli_args();
@ -19,15 +19,15 @@ fn main() {
packet_type = PacketType::RequestWrite; packet_type = PacketType::RequestWrite;
let file = fs::read(&args.filename).unwrap(); let file = fs::read(&args.filename).unwrap();
let size = file.len(); let size = file.len();
println!("Requesting Write of {}", args.filepath); println!("Sending file of size {}", size);
json = Some(serde_json::to_string( json = Some(serde_json::to_string(
&AddFile { name: args.filepath.clone(), size: size as u32 }).unwrap()) &AddFile { name: args.filepath.clone(), size: size as u64 }).unwrap())
} else { } else {
packet_type = PacketType::RequestRead; packet_type = PacketType::RequestRead;
println!("Requesting Read of {}", args.filepath); println!("Requesting Read of {}", args.filepath);
json = Some(serde_json::to_string::<String>(&args.filepath).unwrap()) json = Some(serde_json::to_string::<String>(&args.filepath).unwrap())
} }
serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, data: None }) serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, })
.unwrap(); .unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
@ -49,7 +49,7 @@ fn main() {
let file = fs::read(&args.filename).unwrap(); let file = fs::read(&args.filename).unwrap();
nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file)); nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file));
} else { } else {
nodes.map(|ns| get_file_from_data_nodes(&destination, &filename, &ns)); nodes.map(|mut ns| get_file_from_data_nodes(&destination, &filename, &mut ns));
} }
} }
@ -58,58 +58,70 @@ fn send_file_to_data_nodes(
nodes: &Vec<AvailableNodes>, nodes: &Vec<AvailableNodes>,
file: &Vec<u8>) file: &Vec<u8>)
{ {
let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port); let div: usize = ((file.len() as f32) / (nodes.len() as f32)).ceil() as usize;
let mut stream = TcpStream::connect(endpoint).unwrap(); let chunks: Vec<_> = file.chunks(div).collect();
println!("Going to send a file! Bytes {}", file.len()); for node in nodes {
let chunk = Chunk { let endpoint = format!("{}:{}", node.ip, node.port);
index: nodes[0].chunk_index, println!("{}", endpoint);
filename: filename.clone(), let mut stream = TcpStream::connect(&endpoint).unwrap();
}; let file_size = chunks[node.chunk_index as usize].len() as i64;
let packet = serde_json::to_writer( let chunk = Chunk {
&mut stream, index: node.chunk_index,
&Packet { filename: filename.clone(),
p_type: PacketType::PutFile, file_size,
json: Some(serde_json::to_string(&chunk).unwrap()), };
data: Some(file.clone()), serde_json::to_writer(
}).unwrap(); &mut stream,
stream.flush().unwrap(); &Packet {
stream.shutdown(Shutdown::Write).unwrap(); p_type: PacketType::PutFile,
json: Some(serde_json::to_string(&chunk).unwrap()),
}).unwrap();
stream.flush().unwrap();
stream.write(chunks[node.chunk_index as usize]).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
}
} }
fn get_file_from_data_nodes( fn get_file_from_data_nodes(
destination_path: &String, destination_path: &String,
filename: &String, filename: &String,
nodes: &Vec<AvailableNodes>) nodes: &mut Vec<AvailableNodes>)
{ {
let chunk = Chunk { nodes.sort_by_key(|n| n.chunk_index);
index: nodes[0].chunk_index, let mut file = BufWriter::new(File::create(destination_path).unwrap());
filename: filename.clone(), for node in nodes {
}; let chunk = Chunk {
let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port); index: node.chunk_index,
let mut stream = TcpStream::connect(endpoint).unwrap(); filename: filename.clone(),
let packet = serde_json::to_writer( file_size: 0,
&stream, };
&Packet { let endpoint = format!("{}:{}", node.ip, node.port);
p_type: PacketType::GetFile, println!("Connecting to endpoint: {}", endpoint);
json: Some(serde_json::to_string(&chunk).unwrap()), let mut stream = TcpStream::connect(endpoint).unwrap();
data: None, serde_json::to_writer(
}).unwrap(); &stream,
stream.flush().unwrap(); &Packet {
stream.shutdown(Shutdown::Write).unwrap(); p_type: PacketType::GetFile,
match serde_json::from_reader(stream) { json: Some(serde_json::to_string(&chunk).unwrap()),
Ok(Packet { p_type: PacketType::GetFile, json, data }) => { }).unwrap();
let data = data.unwrap(); stream.flush().unwrap();
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() {
// TODO: Here we have to rebuild the chunks Ok(Packet { p_type: PacketType::GetFile, json, }) => {
let mut copy = File::create(destination_path).unwrap(); let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
copy.write_all(&data[..]).unwrap(); println!("Getting chunk: {:?}", chunk);
} let start = Instant::now();
Ok(Packet { p_type: PacketType::Error, json, .. }) => { receive_chunk(&mut stream, &chunk, &mut file);
eprintln!("Data Node Server Error: {}", &json.unwrap()); let elapsed = start.elapsed();
} println!("Elapsed: {} ms", (elapsed.as_secs() * 1_000) + (elapsed.subsec_nanos() / 1_000_000) as u64);
Ok(_) => {} },
Err(e) => eprintln!("Error parsing json {}", e.to_string()), Ok(Packet { p_type: PacketType::Error, json, .. }) => {
}; eprintln!("Data Node Server Error: {}", &json.unwrap());
}
Ok(_) => {}
Err(e) => eprintln!("Error parsing json {}", e.to_string()),
};
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -121,7 +133,7 @@ pub struct CliArgs {
} }
pub fn get_cli_args() -> CliArgs { pub fn get_cli_args() -> CliArgs {
let mut args: Vec<String> = std::env::args().skip(1).collect(); let args: Vec<String> = std::env::args().skip(1).collect();
if args.len() < 2 { if args.len() < 2 {
panic!("Requires 2 arguments; IP:PORT:FILEPATH and a Local filename/filepath") panic!("Requires 2 arguments; IP:PORT:FILEPATH and a Local filename/filepath")
} }

View File

@ -1,77 +1,83 @@
extern crate a03; extern crate distributed_fs;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
#[macro_use]
extern crate serde_derive; extern crate serde_derive;
use a03::*; use distributed_fs::*;
use std::net::{TcpStream, Shutdown}; use std::net::{TcpStream, Shutdown};
use std::io::{Write, Read}; use std::io::{Write, BufWriter};
use std::net::TcpListener; use std::net::TcpListener;
use serde_json::from_str; use serde_json::from_str;
use std::fs::File; use std::fs::File;
use std::fs; use std::fs;
use std::error::Error; use std::error::Error;
use std::time::Instant;
fn main() { fn main() {
let node_endpoint = parse_endpoint_from_cli(0); let node_endpoint = parse_endpoint_from_cli(0);
let metadata_endpoint = parse_endpoint_from_cli(1); let metadata_endpoint = parse_endpoint_from_cli(1);
let data_path = std::env::args().skip(3).next() let data_path = std::env::args().skip(3).next()
.expect("Missing data path"); .unwrap_or(String::from("."));
let listener = TcpListener::bind(&node_endpoint).unwrap(); let listener = TcpListener::bind(&node_endpoint).unwrap();
register_with_meta_server(&metadata_endpoint, &node_endpoint); register_with_meta_server(&metadata_endpoint, &node_endpoint);
for stream in listener.incoming() { for stream in listener.incoming() {
let mut stream = stream.unwrap(); let mut stream = stream.unwrap();
match serde_json::from_reader(&mut stream) { match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() {
Ok(Packet { p_type: PacketType::GetFile, json, .. }) => { Ok(Packet { p_type: PacketType::GetFile, json }) => {
send_file(&data_path, &mut stream, &json.unwrap()); send_chunk(&data_path, &mut stream, &json.unwrap());
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
} }
Ok(Packet { p_type: PacketType::PutFile, json, data, }) => Ok(Packet { p_type: PacketType::PutFile, json }) => {
receive_file(&data_path, &json.unwrap(), &data.unwrap()), println!("Receiving chunk");
receive_chunk_from_copy(&mut stream, &data_path, &json.unwrap());
},
Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) =>
shutdown(&mut stream, &metadata_endpoint, &node_endpoint), shutdown(&metadata_endpoint, &node_endpoint),
Ok(_) => eprintln!("We don't handle this PacketType"), Ok(_) => eprintln!("We don't handle this PacketType"),
Err(e) => eprintln!("Error parsing json: {}", e.to_string()), Err(e) => eprintln!("Error parsing json: {}", e.to_string()),
}; };
} }
} }
fn receive_file(base_path: &String, json: &String, data: &Vec<u8>) { fn receive_chunk_from_copy(stream: &mut TcpStream, base_path: &String, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index);
println!("{}", filepath); let mut file = BufWriter::new(File::create(filepath).unwrap());
let mut copy = File::create(filepath).unwrap(); let start = Instant::now();
copy.write_all(&data[..]).unwrap(); receive_chunk(&mut *stream, &chunk, &mut file);
let elapsed = start.elapsed();
println!("Elapsed: {} ms", (elapsed.as_secs() * 1_000) + (elapsed.subsec_nanos() / 1_000_000) as u64);
file.flush().unwrap();
} }
fn send_file(base_path: &String, stream: &mut TcpStream, json: &String) { fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
println!("{}", chunk.filename); println!("Sending {}", chunk.filename);
match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) { match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) {
Ok(f) => { Ok(file) => {
serde_json::to_writer( serde_json::to_writer(
stream, &mut *stream,
&Packet { &Packet {
p_type: PacketType::GetFile, p_type: PacketType::GetFile,
json: Some(json.clone()), json: Some(serde_json::to_string(
data: Some(Vec::from(f)), &Chunk { file_size: file.len() as i64, ..chunk}).unwrap()),
}).unwrap(); }).unwrap();
}, stream.flush().unwrap();
stream.write(&file).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
}
Err(e) => { Err(e) => {
match serde_json::to_writer( match serde_json::to_writer(
stream, stream,
&Packet { &Packet {
p_type: PacketType::Error, p_type: PacketType::Error,
json: Some(String::from(e.description())), json: Some(String::from(e.description())),
data: None,
}) { }) {
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
} }
}, }
}; };
} }
@ -90,7 +96,6 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String)
port: from_str(split[1]).unwrap(), port: from_str(split[1]).unwrap(),
}) })
.unwrap()), .unwrap()),
data: None,
}) })
.unwrap(); .unwrap();
println!("Registered myself"); println!("Registered myself");
@ -100,7 +105,7 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String)
println!("{:?}", result); println!("{:?}", result);
} }
fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: &String) { fn shutdown(metadata_endpoint: &String, node_endpoint: &String) {
let mut stream = TcpStream::connect(&metadata_endpoint).unwrap(); let mut stream = TcpStream::connect(&metadata_endpoint).unwrap();
let split: Vec<&str> = node_endpoint.split(":").collect(); let split: Vec<&str> = node_endpoint.split(":").collect();
serde_json::to_writer( serde_json::to_writer(
@ -114,7 +119,6 @@ fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: &
port: from_str(split[1]).unwrap(), port: from_str(split[1]).unwrap(),
}) })
.unwrap()), .unwrap()),
data: None,
}) })
.unwrap(); .unwrap();
println!("Unregistered myself"); println!("Unregistered myself");

View File

@ -1,14 +1,10 @@
extern crate a03; extern crate distributed_fs;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
#[macro_use]
extern crate serde_derive; extern crate serde_derive;
use a03::*; use distributed_fs::*;
use std::net::{TcpListener, TcpStream, Shutdown, SocketAddrV4, Ipv4Addr}; use std::net::{TcpStream, Shutdown };
use std::borrow::Cow;
use std::thread;
use std::io::Read;
use std::io::Write; use std::io::Write;
fn main() { fn main() {
@ -19,7 +15,6 @@ fn main() {
&Packet { &Packet {
p_type: PacketType::ListFiles, p_type: PacketType::ListFiles,
json: None, json: None,
data: None,
}) })
.unwrap(); .unwrap();
stream.flush().unwrap(); stream.flush().unwrap();

View File

@ -1,10 +1,10 @@
extern crate a03; extern crate distributed_fs;
extern crate rusqlite; extern crate rusqlite;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate serde_derive; extern crate serde_derive;
use a03::*; use distributed_fs::*;
use rusqlite::types::ToSql; use rusqlite::types::ToSql;
use rusqlite::{Connection, NO_PARAMS}; use rusqlite::{Connection, NO_PARAMS};
use std::borrow::Cow; use std::borrow::Cow;
@ -44,7 +44,6 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Error, p_type: PacketType::Error,
json: Some(String::from("File not found")), json: Some(String::from("File not found")),
data: None,
}) { }) {
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -66,7 +65,6 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Success, p_type: PacketType::Success,
json: Some(serde_json::to_string(&nodes).unwrap()), json: Some(serde_json::to_string(&nodes).unwrap()),
data: None,
}) { }) {
Ok(_) => println!("{}", "Sent nodes with chunks"), Ok(_) => println!("{}", "Sent nodes with chunks"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -75,14 +73,13 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
let file: AddFile = serde_json::from_str(message).unwrap(); let file: AddFile = serde_json::from_str(message).unwrap();
let file_already_exists = add_file(&conn, &file.name, file.size as i32); let file_already_exists = add_file(&conn, &file.name, file.size as i64);
if file_already_exists { if file_already_exists {
match serde_json::to_writer( match serde_json::to_writer(
stream, stream,
&Packet { &Packet {
p_type: PacketType::Error, p_type: PacketType::Error,
json: Some(String::from("File already exists, please remove before re-uploading")), json: Some(String::from("File already exists, please remove before re-uploading")),
data: None,
}) { }) {
Ok(_) => println!("{}", "Copy client attempted to add an existing file"), Ok(_) => println!("{}", "Copy client attempted to add an existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -90,8 +87,6 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
return; return;
} }
let file_info = get_file_info(&conn, &file.name).unwrap(); let file_info = get_file_info(&conn, &file.name).unwrap();
// let file_info = INode { id: 1, name: file.name, size: file.size };
// println!("{:?}", file_info);
let mut blocks: Vec<Block> = Vec::new(); let mut blocks: Vec<Block> = Vec::new();
let mut nodes: Vec<AvailableNodes> = Vec::new(); let mut nodes: Vec<AvailableNodes> = Vec::new();
let dnodes = get_data_nodes(&conn); let dnodes = get_data_nodes(&conn);
@ -115,7 +110,6 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Success, p_type: PacketType::Success,
json: Some(serde_json::to_string(&nodes).unwrap()), json: Some(serde_json::to_string(&nodes).unwrap()),
data: None,
}) { }) {
Ok(_) => println!("{}", "Sent nodes with chunks"), Ok(_) => println!("{}", "Sent nodes with chunks"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -147,7 +141,6 @@ fn report_success(stream: &mut TcpStream, message: &str) {
match serde_json::to_writer(stream, &Packet { match serde_json::to_writer(stream, &Packet {
p_type: PacketType::Success, p_type: PacketType::Success,
json: None, json: None,
data: None,
}) { }) {
Ok(_) => println!("{}", message), Ok(_) => println!("{}", message),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -201,7 +194,7 @@ fn get_data_nodes(conn: &Connection) -> Vec<DataNode> {
nodes nodes
} }
fn add_file(conn: &Connection, fname: &String, fsize: i32) -> bool { fn add_file(conn: &Connection, fname: &String, fsize: i64) -> bool {
let file_exists = conn.query_row( let file_exists = conn.query_row(
"SELECT fid FROM inode WHERE fname = ?1", "SELECT fid FROM inode WHERE fname = ?1",
&[&fname as &ToSql], &[&fname as &ToSql],

View File

@ -4,9 +4,9 @@ extern crate serde_json;
extern crate serde_derive; extern crate serde_derive;
use std::borrow::Cow; use std::borrow::Cow;
use std::net::Ipv4Addr; use std::net::TcpStream;
use std::net::SocketAddrV4; use std::fs::File;
use std::str::FromStr; use std::io::{Write, Read, BufWriter};
pub const DEFAULT_PORT: &str = "8000"; pub const DEFAULT_PORT: &str = "8000";
@ -28,7 +28,6 @@ pub enum PacketType {
pub struct Packet { pub struct Packet {
pub p_type: PacketType, pub p_type: PacketType,
pub json: Option<String>, pub json: Option<String>,
pub data: Option<Vec<u8>>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -46,7 +45,7 @@ pub struct NodeRegistration {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct AddFile { pub struct AddFile {
pub name: String, pub name: String,
pub size: u32 pub size: u64,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -60,6 +59,7 @@ pub struct AvailableNodes {
pub struct Chunk { pub struct Chunk {
pub index: u32, pub index: u32,
pub filename: String, pub filename: String,
pub file_size: i64,
} }
#[derive(Debug)] #[derive(Debug)]
@ -91,7 +91,7 @@ pub struct BlockQuery {
} }
pub fn parse_endpoint_from_cli(arg_index : usize) -> String { pub fn parse_endpoint_from_cli(arg_index : usize) -> String {
let mut args: Vec<String> = std::env::args().skip(1).collect(); let args: Vec<String> = std::env::args().skip(1).collect();
let endpoint_arg: String = args.get(arg_index).expect("No IP provided").clone(); let endpoint_arg: String = args.get(arg_index).expect("No IP provided").clone();
if endpoint_arg.contains(":") { if endpoint_arg.contains(":") {
@ -101,3 +101,16 @@ pub fn parse_endpoint_from_cli(arg_index : usize) -> String {
} }
} }
pub fn receive_chunk(stream: &mut TcpStream, chunk: &Chunk, chunk_buf: &mut BufWriter<File>) {
let mut buf = [0u8; 256];
let mut bytes_read = 0;
while bytes_read < chunk.file_size as usize {
let bytes = stream.read(&mut buf).unwrap();
chunk_buf.write_all(&buf[0..bytes]).unwrap();
bytes_read += bytes;
}
chunk_buf.flush().unwrap();
}