Compare commits

..

No commits in common. "6d5096cf4fede6cba396a214a7a3ab0bc2bdf729" and "65c6bffd857890d604bc0797b252dc7e8429202d" have entirely different histories.

12 changed files with 191 additions and 211 deletions

2
.gitignore vendored
View File

@ -8,5 +8,5 @@
*.db *.db
dfs_skel/ dfs_skel/
venv/ venv/
dn*/ data_node/
copy_dir/ copy_dir/

63
Cargo.lock generated
View File

@ -6,6 +6,7 @@ dependencies = [
"serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -13,6 +14,28 @@ name = "bitflags"
version = "1.0.4" version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "cloudabi"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "0.4.3" version = "0.4.3"
@ -66,6 +89,31 @@ dependencies = [
"proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)", "proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "rand"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)",
"rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand_core"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand_core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.1.40" version = "0.1.40"
@ -137,6 +185,14 @@ name = "unicode-xid"
version = "0.1.0" version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "uuid"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.6" version = "0.2.6"
@ -163,6 +219,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata] [metadata]
"checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12" "checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
"checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b" "checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b"
"checksum libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)" = "10923947f84a519a45c8fefb7dd1b3e8c08747993381adee176d7a82b4195311" "checksum libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)" = "10923947f84a519a45c8fefb7dd1b3e8c08747993381adee176d7a82b4195311"
"checksum libsqlite3-sys 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "742b695cbfb89e549dca6960a55e6802f67d352e33e97859ee46dee835211b0f" "checksum libsqlite3-sys 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "742b695cbfb89e549dca6960a55e6802f67d352e33e97859ee46dee835211b0f"
@ -171,6 +230,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c" "checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c"
"checksum proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)" = "77619697826f31a02ae974457af0b29b723e5619e113e9397b8b82c6bd253f09" "checksum proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)" = "77619697826f31a02ae974457af0b29b723e5619e113e9397b8b82c6bd253f09"
"checksum quote 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "53fa22a1994bd0f9372d7a816207d8a2677ad0325b073f5c5332760f0fb62b5c" "checksum quote 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "53fa22a1994bd0f9372d7a816207d8a2677ad0325b073f5c5332760f0fb62b5c"
"checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c"
"checksum rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1961a422c4d189dfb50ffa9320bf1f2a9bd54ecb92792fb9477f99a1045f3372"
"checksum rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0905b6b7079ec73b314d4c748701f6931eb79fd97c668caa3f1899b22b32c6db"
"checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1" "checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1"
"checksum rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "39bae767eb27866f5c0be918635ae54af705bc09db11be2c43a3c6b361cf3462" "checksum rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "39bae767eb27866f5c0be918635ae54af705bc09db11be2c43a3c6b361cf3462"
"checksum ryu 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "eb9e9b8cde282a9fe6a42dd4681319bfb63f121b8a8ee9439c6f4107e58a46f7" "checksum ryu 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "eb9e9b8cde282a9fe6a42dd4681319bfb63f121b8a8ee9439c6f4107e58a46f7"
@ -180,6 +242,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum syn 0.15.22 (registry+https://github.com/rust-lang/crates.io-index)" = "ae8b29eb5210bc5cf63ed6149cbf9adfc82ac0be023d8735c176ee74a2db4da7" "checksum syn 0.15.22 (registry+https://github.com/rust-lang/crates.io-index)" = "ae8b29eb5210bc5cf63ed6149cbf9adfc82ac0be023d8735c176ee74a2db4da7"
"checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b" "checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b"
"checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
"checksum uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dab5c5526c5caa3d106653401a267fed923e7046f35895ffcb5ca42db64942e6"
"checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d" "checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d"
"checksum winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "92c1eb33641e276cfa214a0522acad57be5c56b10cb348b3c5117db75f3ac4b0" "checksum winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "92c1eb33641e276cfa214a0522acad57be5c56b10cb348b3c5117db75f3ac4b0"
"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"

View File

@ -1,5 +1,5 @@
[package] [package]
name = "distributed-fs" name = "a03"
version = "0.1.0" version = "0.1.0"
authors = ["Joseph Ferano <joseph@ferano.io>"] authors = ["Joseph Ferano <joseph@ferano.io>"]
@ -8,3 +8,4 @@ rusqlite = "0.15.0"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
uuid = { version = "0.7", features = ["v4"] }

View File

@ -1,19 +1,10 @@
all: build all: build
dist: clean build dist: build
@cp target/release/copy .
@cp target/release/ls .
@cp target/release/data_node .
@cp target/release/meta_data .
@tar -czf assig-03-dfs.tar.gz src/ README.md copy ls data_node meta_data Cargo.* clean_db createdb.py
@rm copy
@rm ls
@rm data_node
@rm meta_data
clean:
@rm -f assig-03-dfs.tar.gz @rm -f assig-03-dfs.tar.gz
@cp target/release/copy copy
@tar -czf assig-03-dfs.tar.gz src/ README.md copy
@rm copy
build: build:
cargo build --release cargo build --release

View File

@ -1,79 +1,21 @@
# Distributed File System in Rust ## Distributed File System in Rust for CCOM4017
A distributed file system implementation with client-server architecture, built as one of my early projects exploring Rust and distributed systems concepts. #### Running
## Architecture ```./copy <PARAM1> <PARAM2>```
The system consists of four main components: #### Building
- **copy** - Client for reading and writing files to the distributed system If you wish to compile the code, install rust and cargo
- **ls** - Client for listing files stored in the system [Link](https://www.rust-lang.org/en-US/install.html)
- **data_node** - Storage server that handles file chunks
- **meta_data** - Metadata server managing file locations and node registry
## How It Works Then just run build
The metadata server uses SQLite to track connected data nodes and file locations. When storing a file, the client connects to the metadata server, which provides a list of available data nodes. The client then divides the file into chunks and distributes them across multiple data nodes, transferring 256 bytes at a time. ```cargo build```
The system uses JSON serialization via `serde_json` for communication between components. All network communication happens over TCP with a custom protocol for coordinating file operations. If you wish to run a specific algorithm;
## Usage ```cargo run --bin copy ```
### Starting the Metadata Server #### Testing
```bash
cargo run --bin meta_data [port]
# Defaults to port 8000 if not specified
```
### Starting Data Nodes
```bash
cargo run --bin data_node <node_endpoint> <metadata_endpoint> [base_path]
# Example: cargo run --bin data_node localhost:6771 127.0.0.1:8000 ./data
```
### Listing Files
```bash
cargo run --bin ls <metadata_endpoint>
# Example: cargo run --bin ls 127.0.0.1:8000
```
### Copying Files
**Upload to distributed system:**
```bash
cargo run --bin copy <local_file> <endpoint:remote_path>
# Example: cargo run --bin copy ./document.pdf localhost:8000:docs/document.pdf
```
**Download from distributed system:**
```bash
cargo run --bin copy <endpoint:remote_path> <local_file>
# Example: cargo run --bin copy localhost:8000:docs/document.pdf ./document.pdf
```
## Database Setup
Run the included Python script to initialize the SQLite database:
```bash
python3 createdb.py
```
The database schema includes tables for file metadata (inodes), data node registry, and block location tracking.
## Building
```bash
cargo build
```
## Testing
```bash
cargo test --bin meta_data
```
## Dependencies
- **rusqlite** - SQLite database interface
- **serde** - Serialization framework
- **serde_json** - JSON support for network protocol

BIN
pug.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 21 KiB

2
sm
View File

@ -1 +1 @@
echo $1 | nc -v -N localhost $2 echo $1 | nc -v -N localhost 6770

View File

@ -1,14 +1,14 @@
extern crate distributed_fs; extern crate a03;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
#[macro_use]
extern crate serde_derive; extern crate serde_derive;
use distributed_fs::*; use a03::*;
use std::net::{TcpStream, Shutdown}; use std::net::{TcpStream, Shutdown};
use std::io::{Write, Read};
use std::fs::File; use std::fs::File;
use std::fs; use std::fs;
use std::io::{Write, BufWriter};
use std::time::Instant;
fn main() { fn main() {
let args = get_cli_args(); let args = get_cli_args();
@ -19,15 +19,15 @@ fn main() {
packet_type = PacketType::RequestWrite; packet_type = PacketType::RequestWrite;
let file = fs::read(&args.filename).unwrap(); let file = fs::read(&args.filename).unwrap();
let size = file.len(); let size = file.len();
println!("Sending file of size {}", size); println!("Requesting Write of {}", args.filepath);
json = Some(serde_json::to_string( json = Some(serde_json::to_string(
&AddFile { name: args.filepath.clone(), size: size as u64 }).unwrap()) &AddFile { name: args.filepath.clone(), size: size as u32 }).unwrap())
} else { } else {
packet_type = PacketType::RequestRead; packet_type = PacketType::RequestRead;
println!("Requesting Read of {}", args.filepath); println!("Requesting Read of {}", args.filepath);
json = Some(serde_json::to_string::<String>(&args.filepath).unwrap()) json = Some(serde_json::to_string::<String>(&args.filepath).unwrap())
} }
serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, }) serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, data: None })
.unwrap(); .unwrap();
stream.flush().unwrap(); stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap(); stream.shutdown(Shutdown::Write).unwrap();
@ -49,7 +49,7 @@ fn main() {
let file = fs::read(&args.filename).unwrap(); let file = fs::read(&args.filename).unwrap();
nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file)); nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file));
} else { } else {
nodes.map(|mut ns| get_file_from_data_nodes(&destination, &filename, &mut ns)); nodes.map(|ns| get_file_from_data_nodes(&destination, &filename, &ns));
} }
} }
@ -58,70 +58,58 @@ fn send_file_to_data_nodes(
nodes: &Vec<AvailableNodes>, nodes: &Vec<AvailableNodes>,
file: &Vec<u8>) file: &Vec<u8>)
{ {
let div: usize = ((file.len() as f32) / (nodes.len() as f32)).ceil() as usize; let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port);
let chunks: Vec<_> = file.chunks(div).collect(); let mut stream = TcpStream::connect(endpoint).unwrap();
for node in nodes { println!("Going to send a file! Bytes {}", file.len());
let endpoint = format!("{}:{}", node.ip, node.port); let chunk = Chunk {
println!("{}", endpoint); index: nodes[0].chunk_index,
let mut stream = TcpStream::connect(&endpoint).unwrap(); filename: filename.clone(),
let file_size = chunks[node.chunk_index as usize].len() as i64; };
let chunk = Chunk { let packet = serde_json::to_writer(
index: node.chunk_index, &mut stream,
filename: filename.clone(), &Packet {
file_size, p_type: PacketType::PutFile,
}; json: Some(serde_json::to_string(&chunk).unwrap()),
serde_json::to_writer( data: Some(file.clone()),
&mut stream, }).unwrap();
&Packet { stream.flush().unwrap();
p_type: PacketType::PutFile, stream.shutdown(Shutdown::Write).unwrap();
json: Some(serde_json::to_string(&chunk).unwrap()),
}).unwrap();
stream.flush().unwrap();
stream.write(chunks[node.chunk_index as usize]).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
}
} }
fn get_file_from_data_nodes( fn get_file_from_data_nodes(
destination_path: &String, destination_path: &String,
filename: &String, filename: &String,
nodes: &mut Vec<AvailableNodes>) nodes: &Vec<AvailableNodes>)
{ {
nodes.sort_by_key(|n| n.chunk_index); let chunk = Chunk {
let mut file = BufWriter::new(File::create(destination_path).unwrap()); index: nodes[0].chunk_index,
for node in nodes { filename: filename.clone(),
let chunk = Chunk { };
index: node.chunk_index, let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port);
filename: filename.clone(), let mut stream = TcpStream::connect(endpoint).unwrap();
file_size: 0, let packet = serde_json::to_writer(
}; &stream,
let endpoint = format!("{}:{}", node.ip, node.port); &Packet {
println!("Connecting to endpoint: {}", endpoint); p_type: PacketType::GetFile,
let mut stream = TcpStream::connect(endpoint).unwrap(); json: Some(serde_json::to_string(&chunk).unwrap()),
serde_json::to_writer( data: None,
&stream, }).unwrap();
&Packet { stream.flush().unwrap();
p_type: PacketType::GetFile, stream.shutdown(Shutdown::Write).unwrap();
json: Some(serde_json::to_string(&chunk).unwrap()), match serde_json::from_reader(stream) {
}).unwrap(); Ok(Packet { p_type: PacketType::GetFile, json, data }) => {
stream.flush().unwrap(); let data = data.unwrap();
match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() { let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
Ok(Packet { p_type: PacketType::GetFile, json, }) => { // TODO: Here we have to rebuild the chunks
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap(); let mut copy = File::create(destination_path).unwrap();
println!("Getting chunk: {:?}", chunk); copy.write_all(&data[..]).unwrap();
let start = Instant::now(); }
receive_chunk(&mut stream, &chunk, &mut file); Ok(Packet { p_type: PacketType::Error, json, .. }) => {
let elapsed = start.elapsed(); eprintln!("Data Node Server Error: {}", &json.unwrap());
println!("Elapsed: {} ms", (elapsed.as_secs() * 1_000) + (elapsed.subsec_nanos() / 1_000_000) as u64); }
}, Ok(_) => {}
Ok(Packet { p_type: PacketType::Error, json, .. }) => { Err(e) => eprintln!("Error parsing json {}", e.to_string()),
eprintln!("Data Node Server Error: {}", &json.unwrap()); };
}
Ok(_) => {}
Err(e) => eprintln!("Error parsing json {}", e.to_string()),
};
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -133,7 +121,7 @@ pub struct CliArgs {
} }
pub fn get_cli_args() -> CliArgs { pub fn get_cli_args() -> CliArgs {
let args: Vec<String> = std::env::args().skip(1).collect(); let mut args: Vec<String> = std::env::args().skip(1).collect();
if args.len() < 2 { if args.len() < 2 {
panic!("Requires 2 arguments; IP:PORT:FILEPATH and a Local filename/filepath") panic!("Requires 2 arguments; IP:PORT:FILEPATH and a Local filename/filepath")
} }

View File

@ -1,83 +1,77 @@
extern crate distributed_fs; extern crate a03;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
#[macro_use]
extern crate serde_derive; extern crate serde_derive;
use distributed_fs::*; use a03::*;
use std::net::{TcpStream, Shutdown}; use std::net::{TcpStream, Shutdown};
use std::io::{Write, BufWriter}; use std::io::{Write, Read};
use std::net::TcpListener; use std::net::TcpListener;
use serde_json::from_str; use serde_json::from_str;
use std::fs::File; use std::fs::File;
use std::fs; use std::fs;
use std::error::Error; use std::error::Error;
use std::time::Instant;
fn main() { fn main() {
let node_endpoint = parse_endpoint_from_cli(0); let node_endpoint = parse_endpoint_from_cli(0);
let metadata_endpoint = parse_endpoint_from_cli(1); let metadata_endpoint = parse_endpoint_from_cli(1);
let data_path = std::env::args().skip(3).next() let data_path = std::env::args().skip(3).next()
.unwrap_or(String::from(".")); .expect("Missing data path");
let listener = TcpListener::bind(&node_endpoint).unwrap(); let listener = TcpListener::bind(&node_endpoint).unwrap();
register_with_meta_server(&metadata_endpoint, &node_endpoint); register_with_meta_server(&metadata_endpoint, &node_endpoint);
for stream in listener.incoming() { for stream in listener.incoming() {
let mut stream = stream.unwrap(); let mut stream = stream.unwrap();
match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() { match serde_json::from_reader(&mut stream) {
Ok(Packet { p_type: PacketType::GetFile, json }) => { Ok(Packet { p_type: PacketType::GetFile, json, .. }) => {
send_chunk(&data_path, &mut stream, &json.unwrap()); send_file(&data_path, &mut stream, &json.unwrap());
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
} }
Ok(Packet { p_type: PacketType::PutFile, json }) => { Ok(Packet { p_type: PacketType::PutFile, json, data, }) =>
println!("Receiving chunk"); receive_file(&data_path, &json.unwrap(), &data.unwrap()),
receive_chunk_from_copy(&mut stream, &data_path, &json.unwrap());
},
Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) => Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) =>
shutdown(&metadata_endpoint, &node_endpoint), shutdown(&mut stream, &metadata_endpoint, &node_endpoint),
Ok(_) => eprintln!("We don't handle this PacketType"), Ok(_) => eprintln!("We don't handle this PacketType"),
Err(e) => eprintln!("Error parsing json: {}", e.to_string()), Err(e) => eprintln!("Error parsing json: {}", e.to_string()),
}; };
} }
} }
fn receive_chunk_from_copy(stream: &mut TcpStream, base_path: &String, json: &String) { fn receive_file(base_path: &String, json: &String, data: &Vec<u8>) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index); let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index);
let mut file = BufWriter::new(File::create(filepath).unwrap()); println!("{}", filepath);
let start = Instant::now(); let mut copy = File::create(filepath).unwrap();
receive_chunk(&mut *stream, &chunk, &mut file); copy.write_all(&data[..]).unwrap();
let elapsed = start.elapsed();
println!("Elapsed: {} ms", (elapsed.as_secs() * 1_000) + (elapsed.subsec_nanos() / 1_000_000) as u64);
file.flush().unwrap();
} }
fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) { fn send_file(base_path: &String, stream: &mut TcpStream, json: &String) {
let chunk: Chunk = serde_json::from_str(json).unwrap(); let chunk: Chunk = serde_json::from_str(json).unwrap();
println!("Sending {}", chunk.filename); println!("{}", chunk.filename);
match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) { match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) {
Ok(file) => { Ok(f) => {
serde_json::to_writer( serde_json::to_writer(
&mut *stream, stream,
&Packet { &Packet {
p_type: PacketType::GetFile, p_type: PacketType::GetFile,
json: Some(serde_json::to_string( json: Some(json.clone()),
&Chunk { file_size: file.len() as i64, ..chunk}).unwrap()), data: Some(Vec::from(f)),
}).unwrap(); }).unwrap();
stream.flush().unwrap(); },
stream.write(&file).unwrap();
stream.flush().unwrap();
stream.shutdown(Shutdown::Write).unwrap();
}
Err(e) => { Err(e) => {
match serde_json::to_writer( match serde_json::to_writer(
stream, stream,
&Packet { &Packet {
p_type: PacketType::Error, p_type: PacketType::Error,
json: Some(String::from(e.description())), json: Some(String::from(e.description())),
data: None,
}) { }) {
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
} }
} },
}; };
} }
@ -96,6 +90,7 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String)
port: from_str(split[1]).unwrap(), port: from_str(split[1]).unwrap(),
}) })
.unwrap()), .unwrap()),
data: None,
}) })
.unwrap(); .unwrap();
println!("Registered myself"); println!("Registered myself");
@ -105,7 +100,7 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String)
println!("{:?}", result); println!("{:?}", result);
} }
fn shutdown(metadata_endpoint: &String, node_endpoint: &String) { fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: &String) {
let mut stream = TcpStream::connect(&metadata_endpoint).unwrap(); let mut stream = TcpStream::connect(&metadata_endpoint).unwrap();
let split: Vec<&str> = node_endpoint.split(":").collect(); let split: Vec<&str> = node_endpoint.split(":").collect();
serde_json::to_writer( serde_json::to_writer(
@ -119,6 +114,7 @@ fn shutdown(metadata_endpoint: &String, node_endpoint: &String) {
port: from_str(split[1]).unwrap(), port: from_str(split[1]).unwrap(),
}) })
.unwrap()), .unwrap()),
data: None,
}) })
.unwrap(); .unwrap();
println!("Unregistered myself"); println!("Unregistered myself");

View File

@ -1,10 +1,14 @@
extern crate distributed_fs; extern crate a03;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
#[macro_use]
extern crate serde_derive; extern crate serde_derive;
use distributed_fs::*; use a03::*;
use std::net::{TcpStream, Shutdown }; use std::net::{TcpListener, TcpStream, Shutdown, SocketAddrV4, Ipv4Addr};
use std::borrow::Cow;
use std::thread;
use std::io::Read;
use std::io::Write; use std::io::Write;
fn main() { fn main() {
@ -15,6 +19,7 @@ fn main() {
&Packet { &Packet {
p_type: PacketType::ListFiles, p_type: PacketType::ListFiles,
json: None, json: None,
data: None,
}) })
.unwrap(); .unwrap();
stream.flush().unwrap(); stream.flush().unwrap();

View File

@ -1,10 +1,10 @@
extern crate distributed_fs; extern crate a03;
extern crate rusqlite; extern crate rusqlite;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate serde_derive; extern crate serde_derive;
use distributed_fs::*; use a03::*;
use rusqlite::types::ToSql; use rusqlite::types::ToSql;
use rusqlite::{Connection, NO_PARAMS}; use rusqlite::{Connection, NO_PARAMS};
use std::borrow::Cow; use std::borrow::Cow;
@ -44,6 +44,7 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Error, p_type: PacketType::Error,
json: Some(String::from("File not found")), json: Some(String::from("File not found")),
data: None,
}) { }) {
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"), Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -65,6 +66,7 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Success, p_type: PacketType::Success,
json: Some(serde_json::to_string(&nodes).unwrap()), json: Some(serde_json::to_string(&nodes).unwrap()),
data: None,
}) { }) {
Ok(_) => println!("{}", "Sent nodes with chunks"), Ok(_) => println!("{}", "Sent nodes with chunks"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -73,13 +75,14 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) { fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
let file: AddFile = serde_json::from_str(message).unwrap(); let file: AddFile = serde_json::from_str(message).unwrap();
let file_already_exists = add_file(&conn, &file.name, file.size as i64); let file_already_exists = add_file(&conn, &file.name, file.size as i32);
if file_already_exists { if file_already_exists {
match serde_json::to_writer( match serde_json::to_writer(
stream, stream,
&Packet { &Packet {
p_type: PacketType::Error, p_type: PacketType::Error,
json: Some(String::from("File already exists, please remove before re-uploading")), json: Some(String::from("File already exists, please remove before re-uploading")),
data: None,
}) { }) {
Ok(_) => println!("{}", "Copy client attempted to add an existing file"), Ok(_) => println!("{}", "Copy client attempted to add an existing file"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -87,6 +90,8 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
return; return;
} }
let file_info = get_file_info(&conn, &file.name).unwrap(); let file_info = get_file_info(&conn, &file.name).unwrap();
// let file_info = INode { id: 1, name: file.name, size: file.size };
// println!("{:?}", file_info);
let mut blocks: Vec<Block> = Vec::new(); let mut blocks: Vec<Block> = Vec::new();
let mut nodes: Vec<AvailableNodes> = Vec::new(); let mut nodes: Vec<AvailableNodes> = Vec::new();
let dnodes = get_data_nodes(&conn); let dnodes = get_data_nodes(&conn);
@ -110,6 +115,7 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
&Packet { &Packet {
p_type: PacketType::Success, p_type: PacketType::Success,
json: Some(serde_json::to_string(&nodes).unwrap()), json: Some(serde_json::to_string(&nodes).unwrap()),
data: None,
}) { }) {
Ok(_) => println!("{}", "Sent nodes with chunks"), Ok(_) => println!("{}", "Sent nodes with chunks"),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -141,6 +147,7 @@ fn report_success(stream: &mut TcpStream, message: &str) {
match serde_json::to_writer(stream, &Packet { match serde_json::to_writer(stream, &Packet {
p_type: PacketType::Success, p_type: PacketType::Success,
json: None, json: None,
data: None,
}) { }) {
Ok(_) => println!("{}", message), Ok(_) => println!("{}", message),
Err(e) => println!("{}", e), Err(e) => println!("{}", e),
@ -194,7 +201,7 @@ fn get_data_nodes(conn: &Connection) -> Vec<DataNode> {
nodes nodes
} }
fn add_file(conn: &Connection, fname: &String, fsize: i64) -> bool { fn add_file(conn: &Connection, fname: &String, fsize: i32) -> bool {
let file_exists = conn.query_row( let file_exists = conn.query_row(
"SELECT fid FROM inode WHERE fname = ?1", "SELECT fid FROM inode WHERE fname = ?1",
&[&fname as &ToSql], &[&fname as &ToSql],

View File

@ -4,9 +4,9 @@ extern crate serde_json;
extern crate serde_derive; extern crate serde_derive;
use std::borrow::Cow; use std::borrow::Cow;
use std::net::TcpStream; use std::net::Ipv4Addr;
use std::fs::File; use std::net::SocketAddrV4;
use std::io::{Write, Read, BufWriter}; use std::str::FromStr;
pub const DEFAULT_PORT: &str = "8000"; pub const DEFAULT_PORT: &str = "8000";
@ -28,6 +28,7 @@ pub enum PacketType {
pub struct Packet { pub struct Packet {
pub p_type: PacketType, pub p_type: PacketType,
pub json: Option<String>, pub json: Option<String>,
pub data: Option<Vec<u8>>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -45,7 +46,7 @@ pub struct NodeRegistration {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct AddFile { pub struct AddFile {
pub name: String, pub name: String,
pub size: u64, pub size: u32
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -59,7 +60,6 @@ pub struct AvailableNodes {
pub struct Chunk { pub struct Chunk {
pub index: u32, pub index: u32,
pub filename: String, pub filename: String,
pub file_size: i64,
} }
#[derive(Debug)] #[derive(Debug)]
@ -91,7 +91,7 @@ pub struct BlockQuery {
} }
pub fn parse_endpoint_from_cli(arg_index : usize) -> String { pub fn parse_endpoint_from_cli(arg_index : usize) -> String {
let args: Vec<String> = std::env::args().skip(1).collect(); let mut args: Vec<String> = std::env::args().skip(1).collect();
let endpoint_arg: String = args.get(arg_index).expect("No IP provided").clone(); let endpoint_arg: String = args.get(arg_index).expect("No IP provided").clone();
if endpoint_arg.contains(":") { if endpoint_arg.contains(":") {
@ -101,16 +101,3 @@ pub fn parse_endpoint_from_cli(arg_index : usize) -> String {
} }
} }
pub fn receive_chunk(stream: &mut TcpStream, chunk: &Chunk, chunk_buf: &mut BufWriter<File>) {
let mut buf = [0u8; 256];
let mut bytes_read = 0;
while bytes_read < chunk.file_size as usize {
let bytes = stream.read(&mut buf).unwrap();
chunk_buf.write_all(&buf[0..bytes]).unwrap();
bytes_read += bytes;
}
chunk_buf.flush().unwrap();
}