Compare commits
10 Commits
65c6bffd85
...
6d5096cf4f
Author | SHA1 | Date | |
---|---|---|---|
6d5096cf4f | |||
b2db3cb046 | |||
d947a147c7 | |||
f935529b47 | |||
e70859ef17 | |||
99cbb4166b | |||
0f2582c37a | |||
6b792a95ac | |||
2602a97892 | |||
b363fe5585 |
2
.gitignore
vendored
2
.gitignore
vendored
@ -8,5 +8,5 @@
|
||||
*.db
|
||||
dfs_skel/
|
||||
venv/
|
||||
data_node/
|
||||
dn*/
|
||||
copy_dir/
|
||||
|
63
Cargo.lock
generated
63
Cargo.lock
generated
@ -6,7 +6,6 @@ dependencies = [
|
||||
"serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -14,28 +13,6 @@ name = "bitflags"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "cloudabi"
|
||||
version = "0.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fuchsia-zircon"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fuchsia-zircon-sys"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "0.4.3"
|
||||
@ -89,31 +66,6 @@ dependencies = [
|
||||
"proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.5.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.1.40"
|
||||
@ -185,14 +137,6 @@ name = "unicode-xid"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.6"
|
||||
@ -219,9 +163,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[metadata]
|
||||
"checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12"
|
||||
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
|
||||
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
|
||||
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
|
||||
"checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b"
|
||||
"checksum libc 0.2.44 (registry+https://github.com/rust-lang/crates.io-index)" = "10923947f84a519a45c8fefb7dd1b3e8c08747993381adee176d7a82b4195311"
|
||||
"checksum libsqlite3-sys 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "742b695cbfb89e549dca6960a55e6802f67d352e33e97859ee46dee835211b0f"
|
||||
@ -230,9 +171,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
"checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c"
|
||||
"checksum proc-macro2 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)" = "77619697826f31a02ae974457af0b29b723e5619e113e9397b8b82c6bd253f09"
|
||||
"checksum quote 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "53fa22a1994bd0f9372d7a816207d8a2677ad0325b073f5c5332760f0fb62b5c"
|
||||
"checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c"
|
||||
"checksum rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1961a422c4d189dfb50ffa9320bf1f2a9bd54ecb92792fb9477f99a1045f3372"
|
||||
"checksum rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0905b6b7079ec73b314d4c748701f6931eb79fd97c668caa3f1899b22b32c6db"
|
||||
"checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1"
|
||||
"checksum rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "39bae767eb27866f5c0be918635ae54af705bc09db11be2c43a3c6b361cf3462"
|
||||
"checksum ryu 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "eb9e9b8cde282a9fe6a42dd4681319bfb63f121b8a8ee9439c6f4107e58a46f7"
|
||||
@ -242,7 +180,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
"checksum syn 0.15.22 (registry+https://github.com/rust-lang/crates.io-index)" = "ae8b29eb5210bc5cf63ed6149cbf9adfc82ac0be023d8735c176ee74a2db4da7"
|
||||
"checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b"
|
||||
"checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
|
||||
"checksum uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dab5c5526c5caa3d106653401a267fed923e7046f35895ffcb5ca42db64942e6"
|
||||
"checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d"
|
||||
"checksum winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "92c1eb33641e276cfa214a0522acad57be5c56b10cb348b3c5117db75f3ac4b0"
|
||||
"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
@ -1,5 +1,5 @@
|
||||
[package]
|
||||
name = "a03"
|
||||
name = "distributed-fs"
|
||||
version = "0.1.0"
|
||||
authors = ["Joseph Ferano <joseph@ferano.io>"]
|
||||
|
||||
@ -8,4 +8,3 @@ rusqlite = "0.15.0"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
uuid = { version = "0.7", features = ["v4"] }
|
||||
|
17
Makefile
17
Makefile
@ -1,10 +1,19 @@
|
||||
all: build
|
||||
|
||||
dist: build
|
||||
@rm -f assig-03-dfs.tar.gz
|
||||
@cp target/release/copy copy
|
||||
@tar -czf assig-03-dfs.tar.gz src/ README.md copy
|
||||
dist: clean build
|
||||
@cp target/release/copy .
|
||||
@cp target/release/ls .
|
||||
@cp target/release/data_node .
|
||||
@cp target/release/meta_data .
|
||||
@tar -czf assig-03-dfs.tar.gz src/ README.md copy ls data_node meta_data Cargo.* clean_db createdb.py
|
||||
@rm copy
|
||||
@rm ls
|
||||
@rm data_node
|
||||
@rm meta_data
|
||||
|
||||
clean:
|
||||
@rm -f assig-03-dfs.tar.gz
|
||||
|
||||
|
||||
build:
|
||||
cargo build --release
|
||||
|
80
README.md
80
README.md
@ -1,21 +1,79 @@
|
||||
## Distributed File System in Rust for CCOM4017
|
||||
# Distributed File System in Rust
|
||||
|
||||
#### Running
|
||||
A distributed file system implementation with client-server architecture, built as one of my early projects exploring Rust and distributed systems concepts.
|
||||
|
||||
```./copy <PARAM1> <PARAM2>```
|
||||
## Architecture
|
||||
|
||||
#### Building
|
||||
The system consists of four main components:
|
||||
|
||||
If you wish to compile the code, install rust and cargo
|
||||
[Link](https://www.rust-lang.org/en-US/install.html)
|
||||
- **copy** - Client for reading and writing files to the distributed system
|
||||
- **ls** - Client for listing files stored in the system
|
||||
- **data_node** - Storage server that handles file chunks
|
||||
- **meta_data** - Metadata server managing file locations and node registry
|
||||
|
||||
Then just run build
|
||||
## How It Works
|
||||
|
||||
```cargo build```
|
||||
The metadata server uses SQLite to track connected data nodes and file locations. When storing a file, the client connects to the metadata server, which provides a list of available data nodes. The client then divides the file into chunks and distributes them across multiple data nodes, transferring 256 bytes at a time.
|
||||
|
||||
If you wish to run a specific algorithm;
|
||||
The system uses JSON serialization via `serde_json` for communication between components. All network communication happens over TCP with a custom protocol for coordinating file operations.
|
||||
|
||||
```cargo run --bin copy ```
|
||||
## Usage
|
||||
|
||||
#### Testing
|
||||
### Starting the Metadata Server
|
||||
```bash
|
||||
cargo run --bin meta_data [port]
|
||||
# Defaults to port 8000 if not specified
|
||||
```
|
||||
|
||||
### Starting Data Nodes
|
||||
```bash
|
||||
cargo run --bin data_node <node_endpoint> <metadata_endpoint> [base_path]
|
||||
# Example: cargo run --bin data_node localhost:6771 127.0.0.1:8000 ./data
|
||||
```
|
||||
|
||||
### Listing Files
|
||||
```bash
|
||||
cargo run --bin ls <metadata_endpoint>
|
||||
# Example: cargo run --bin ls 127.0.0.1:8000
|
||||
```
|
||||
|
||||
### Copying Files
|
||||
|
||||
**Upload to distributed system:**
|
||||
```bash
|
||||
cargo run --bin copy <local_file> <endpoint:remote_path>
|
||||
# Example: cargo run --bin copy ./document.pdf localhost:8000:docs/document.pdf
|
||||
```
|
||||
|
||||
**Download from distributed system:**
|
||||
```bash
|
||||
cargo run --bin copy <endpoint:remote_path> <local_file>
|
||||
# Example: cargo run --bin copy localhost:8000:docs/document.pdf ./document.pdf
|
||||
```
|
||||
|
||||
## Database Setup
|
||||
|
||||
Run the included Python script to initialize the SQLite database:
|
||||
```bash
|
||||
python3 createdb.py
|
||||
```
|
||||
|
||||
The database schema includes tables for file metadata (inodes), data node registry, and block location tracking.
|
||||
|
||||
## Building
|
||||
|
||||
```bash
|
||||
cargo build
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
```bash
|
||||
cargo test --bin meta_data
|
||||
```
|
||||
|
||||
## Dependencies
|
||||
|
||||
- **rusqlite** - SQLite database interface
|
||||
- **serde** - Serialization framework
|
||||
- **serde_json** - JSON support for network protocol
|
@ -1,14 +1,14 @@
|
||||
extern crate a03;
|
||||
extern crate distributed_fs;
|
||||
extern crate serde;
|
||||
extern crate serde_json;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
|
||||
use a03::*;
|
||||
use distributed_fs::*;
|
||||
use std::net::{TcpStream, Shutdown};
|
||||
use std::io::{Write, Read};
|
||||
use std::fs::File;
|
||||
use std::fs;
|
||||
use std::io::{Write, BufWriter};
|
||||
use std::time::Instant;
|
||||
|
||||
fn main() {
|
||||
let args = get_cli_args();
|
||||
@ -19,15 +19,15 @@ fn main() {
|
||||
packet_type = PacketType::RequestWrite;
|
||||
let file = fs::read(&args.filename).unwrap();
|
||||
let size = file.len();
|
||||
println!("Requesting Write of {}", args.filepath);
|
||||
println!("Sending file of size {}", size);
|
||||
json = Some(serde_json::to_string(
|
||||
&AddFile { name: args.filepath.clone(), size: size as u32 }).unwrap())
|
||||
&AddFile { name: args.filepath.clone(), size: size as u64 }).unwrap())
|
||||
} else {
|
||||
packet_type = PacketType::RequestRead;
|
||||
println!("Requesting Read of {}", args.filepath);
|
||||
json = Some(serde_json::to_string::<String>(&args.filepath).unwrap())
|
||||
}
|
||||
serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, data: None })
|
||||
serde_json::to_writer(&mut stream, &Packet { p_type: packet_type, json, })
|
||||
.unwrap();
|
||||
stream.flush().unwrap();
|
||||
stream.shutdown(Shutdown::Write).unwrap();
|
||||
@ -49,7 +49,7 @@ fn main() {
|
||||
let file = fs::read(&args.filename).unwrap();
|
||||
nodes.map(|ns| send_file_to_data_nodes(&filename, &ns, &file));
|
||||
} else {
|
||||
nodes.map(|ns| get_file_from_data_nodes(&destination, &filename, &ns));
|
||||
nodes.map(|mut ns| get_file_from_data_nodes(&destination, &filename, &mut ns));
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,58 +58,70 @@ fn send_file_to_data_nodes(
|
||||
nodes: &Vec<AvailableNodes>,
|
||||
file: &Vec<u8>)
|
||||
{
|
||||
let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port);
|
||||
let mut stream = TcpStream::connect(endpoint).unwrap();
|
||||
println!("Going to send a file! Bytes {}", file.len());
|
||||
let div: usize = ((file.len() as f32) / (nodes.len() as f32)).ceil() as usize;
|
||||
let chunks: Vec<_> = file.chunks(div).collect();
|
||||
for node in nodes {
|
||||
let endpoint = format!("{}:{}", node.ip, node.port);
|
||||
println!("{}", endpoint);
|
||||
let mut stream = TcpStream::connect(&endpoint).unwrap();
|
||||
let file_size = chunks[node.chunk_index as usize].len() as i64;
|
||||
let chunk = Chunk {
|
||||
index: nodes[0].chunk_index,
|
||||
index: node.chunk_index,
|
||||
filename: filename.clone(),
|
||||
file_size,
|
||||
};
|
||||
let packet = serde_json::to_writer(
|
||||
serde_json::to_writer(
|
||||
&mut stream,
|
||||
&Packet {
|
||||
p_type: PacketType::PutFile,
|
||||
json: Some(serde_json::to_string(&chunk).unwrap()),
|
||||
data: Some(file.clone()),
|
||||
}).unwrap();
|
||||
stream.flush().unwrap();
|
||||
stream.write(chunks[node.chunk_index as usize]).unwrap();
|
||||
stream.flush().unwrap();
|
||||
stream.shutdown(Shutdown::Write).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn get_file_from_data_nodes(
|
||||
destination_path: &String,
|
||||
filename: &String,
|
||||
nodes: &Vec<AvailableNodes>)
|
||||
nodes: &mut Vec<AvailableNodes>)
|
||||
{
|
||||
nodes.sort_by_key(|n| n.chunk_index);
|
||||
let mut file = BufWriter::new(File::create(destination_path).unwrap());
|
||||
for node in nodes {
|
||||
let chunk = Chunk {
|
||||
index: nodes[0].chunk_index,
|
||||
index: node.chunk_index,
|
||||
filename: filename.clone(),
|
||||
file_size: 0,
|
||||
};
|
||||
let endpoint = format!("{}:{}", nodes[0].ip, nodes[0].port);
|
||||
let endpoint = format!("{}:{}", node.ip, node.port);
|
||||
println!("Connecting to endpoint: {}", endpoint);
|
||||
let mut stream = TcpStream::connect(endpoint).unwrap();
|
||||
let packet = serde_json::to_writer(
|
||||
serde_json::to_writer(
|
||||
&stream,
|
||||
&Packet {
|
||||
p_type: PacketType::GetFile,
|
||||
json: Some(serde_json::to_string(&chunk).unwrap()),
|
||||
data: None,
|
||||
}).unwrap();
|
||||
stream.flush().unwrap();
|
||||
stream.shutdown(Shutdown::Write).unwrap();
|
||||
match serde_json::from_reader(stream) {
|
||||
Ok(Packet { p_type: PacketType::GetFile, json, data }) => {
|
||||
let data = data.unwrap();
|
||||
match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() {
|
||||
Ok(Packet { p_type: PacketType::GetFile, json, }) => {
|
||||
let chunk: Chunk = serde_json::from_str(&json.unwrap()).unwrap();
|
||||
// TODO: Here we have to rebuild the chunks
|
||||
let mut copy = File::create(destination_path).unwrap();
|
||||
copy.write_all(&data[..]).unwrap();
|
||||
}
|
||||
println!("Getting chunk: {:?}", chunk);
|
||||
let start = Instant::now();
|
||||
receive_chunk(&mut stream, &chunk, &mut file);
|
||||
let elapsed = start.elapsed();
|
||||
println!("Elapsed: {} ms", (elapsed.as_secs() * 1_000) + (elapsed.subsec_nanos() / 1_000_000) as u64);
|
||||
},
|
||||
Ok(Packet { p_type: PacketType::Error, json, .. }) => {
|
||||
eprintln!("Data Node Server Error: {}", &json.unwrap());
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(e) => eprintln!("Error parsing json {}", e.to_string()),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -121,7 +133,7 @@ pub struct CliArgs {
|
||||
}
|
||||
|
||||
pub fn get_cli_args() -> CliArgs {
|
||||
let mut args: Vec<String> = std::env::args().skip(1).collect();
|
||||
let args: Vec<String> = std::env::args().skip(1).collect();
|
||||
if args.len() < 2 {
|
||||
panic!("Requires 2 arguments; IP:PORT:FILEPATH and a Local filename/filepath")
|
||||
}
|
||||
|
@ -1,77 +1,83 @@
|
||||
extern crate a03;
|
||||
extern crate distributed_fs;
|
||||
extern crate serde;
|
||||
extern crate serde_json;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
|
||||
use a03::*;
|
||||
use distributed_fs::*;
|
||||
use std::net::{TcpStream, Shutdown};
|
||||
use std::io::{Write, Read};
|
||||
use std::io::{Write, BufWriter};
|
||||
use std::net::TcpListener;
|
||||
use serde_json::from_str;
|
||||
use std::fs::File;
|
||||
use std::fs;
|
||||
use std::error::Error;
|
||||
use std::time::Instant;
|
||||
|
||||
fn main() {
|
||||
let node_endpoint = parse_endpoint_from_cli(0);
|
||||
let metadata_endpoint = parse_endpoint_from_cli(1);
|
||||
let data_path = std::env::args().skip(3).next()
|
||||
.expect("Missing data path");
|
||||
.unwrap_or(String::from("."));
|
||||
let listener = TcpListener::bind(&node_endpoint).unwrap();
|
||||
register_with_meta_server(&metadata_endpoint, &node_endpoint);
|
||||
|
||||
for stream in listener.incoming() {
|
||||
let mut stream = stream.unwrap();
|
||||
match serde_json::from_reader(&mut stream) {
|
||||
Ok(Packet { p_type: PacketType::GetFile, json, .. }) => {
|
||||
send_file(&data_path, &mut stream, &json.unwrap());
|
||||
stream.flush().unwrap();
|
||||
stream.shutdown(Shutdown::Write).unwrap();
|
||||
match serde_json::Deserializer::from_reader(&mut stream).into_iter().next().unwrap() {
|
||||
Ok(Packet { p_type: PacketType::GetFile, json }) => {
|
||||
send_chunk(&data_path, &mut stream, &json.unwrap());
|
||||
}
|
||||
Ok(Packet { p_type: PacketType::PutFile, json, data, }) =>
|
||||
receive_file(&data_path, &json.unwrap(), &data.unwrap()),
|
||||
Ok(Packet { p_type: PacketType::PutFile, json }) => {
|
||||
println!("Receiving chunk");
|
||||
receive_chunk_from_copy(&mut stream, &data_path, &json.unwrap());
|
||||
},
|
||||
Ok(Packet { p_type: PacketType::ShutdownDataNode, .. }) =>
|
||||
shutdown(&mut stream, &metadata_endpoint, &node_endpoint),
|
||||
shutdown(&metadata_endpoint, &node_endpoint),
|
||||
Ok(_) => eprintln!("We don't handle this PacketType"),
|
||||
Err(e) => eprintln!("Error parsing json: {}", e.to_string()),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn receive_file(base_path: &String, json: &String, data: &Vec<u8>) {
|
||||
fn receive_chunk_from_copy(stream: &mut TcpStream, base_path: &String, json: &String) {
|
||||
let chunk: Chunk = serde_json::from_str(json).unwrap();
|
||||
let filepath = format!("{}/{}_{}", base_path, chunk.filename, chunk.index);
|
||||
println!("{}", filepath);
|
||||
let mut copy = File::create(filepath).unwrap();
|
||||
copy.write_all(&data[..]).unwrap();
|
||||
let mut file = BufWriter::new(File::create(filepath).unwrap());
|
||||
let start = Instant::now();
|
||||
receive_chunk(&mut *stream, &chunk, &mut file);
|
||||
let elapsed = start.elapsed();
|
||||
println!("Elapsed: {} ms", (elapsed.as_secs() * 1_000) + (elapsed.subsec_nanos() / 1_000_000) as u64);
|
||||
file.flush().unwrap();
|
||||
}
|
||||
|
||||
fn send_file(base_path: &String, stream: &mut TcpStream, json: &String) {
|
||||
fn send_chunk(base_path: &String, stream: &mut TcpStream, json: &String) {
|
||||
let chunk: Chunk = serde_json::from_str(json).unwrap();
|
||||
println!("{}", chunk.filename);
|
||||
println!("Sending {}", chunk.filename);
|
||||
match fs::read(format!("{}/{}_{}", base_path, &chunk.filename, &chunk.index)) {
|
||||
Ok(f) => {
|
||||
Ok(file) => {
|
||||
serde_json::to_writer(
|
||||
stream,
|
||||
&mut *stream,
|
||||
&Packet {
|
||||
p_type: PacketType::GetFile,
|
||||
json: Some(json.clone()),
|
||||
data: Some(Vec::from(f)),
|
||||
json: Some(serde_json::to_string(
|
||||
&Chunk { file_size: file.len() as i64, ..chunk}).unwrap()),
|
||||
}).unwrap();
|
||||
},
|
||||
stream.flush().unwrap();
|
||||
stream.write(&file).unwrap();
|
||||
stream.flush().unwrap();
|
||||
stream.shutdown(Shutdown::Write).unwrap();
|
||||
}
|
||||
Err(e) => {
|
||||
match serde_json::to_writer(
|
||||
stream,
|
||||
&Packet {
|
||||
p_type: PacketType::Error,
|
||||
json: Some(String::from(e.description())),
|
||||
data: None,
|
||||
}) {
|
||||
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
|
||||
Err(e) => println!("{}", e),
|
||||
}
|
||||
},
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ -90,7 +96,6 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String)
|
||||
port: from_str(split[1]).unwrap(),
|
||||
})
|
||||
.unwrap()),
|
||||
data: None,
|
||||
})
|
||||
.unwrap();
|
||||
println!("Registered myself");
|
||||
@ -100,7 +105,7 @@ fn register_with_meta_server(metadata_endpoint: &String, node_endpoint: &String)
|
||||
println!("{:?}", result);
|
||||
}
|
||||
|
||||
fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: &String) {
|
||||
fn shutdown(metadata_endpoint: &String, node_endpoint: &String) {
|
||||
let mut stream = TcpStream::connect(&metadata_endpoint).unwrap();
|
||||
let split: Vec<&str> = node_endpoint.split(":").collect();
|
||||
serde_json::to_writer(
|
||||
@ -114,7 +119,6 @@ fn shutdown(stream: &mut TcpStream, metadata_endpoint: &String, node_endpoint: &
|
||||
port: from_str(split[1]).unwrap(),
|
||||
})
|
||||
.unwrap()),
|
||||
data: None,
|
||||
})
|
||||
.unwrap();
|
||||
println!("Unregistered myself");
|
||||
|
@ -1,14 +1,10 @@
|
||||
extern crate a03;
|
||||
extern crate distributed_fs;
|
||||
extern crate serde;
|
||||
extern crate serde_json;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
|
||||
use a03::*;
|
||||
use std::net::{TcpListener, TcpStream, Shutdown, SocketAddrV4, Ipv4Addr};
|
||||
use std::borrow::Cow;
|
||||
use std::thread;
|
||||
use std::io::Read;
|
||||
use distributed_fs::*;
|
||||
use std::net::{TcpStream, Shutdown };
|
||||
use std::io::Write;
|
||||
|
||||
fn main() {
|
||||
@ -19,7 +15,6 @@ fn main() {
|
||||
&Packet {
|
||||
p_type: PacketType::ListFiles,
|
||||
json: None,
|
||||
data: None,
|
||||
})
|
||||
.unwrap();
|
||||
stream.flush().unwrap();
|
||||
|
@ -1,10 +1,10 @@
|
||||
extern crate a03;
|
||||
extern crate distributed_fs;
|
||||
extern crate rusqlite;
|
||||
extern crate serde;
|
||||
extern crate serde_json;
|
||||
extern crate serde_derive;
|
||||
|
||||
use a03::*;
|
||||
use distributed_fs::*;
|
||||
use rusqlite::types::ToSql;
|
||||
use rusqlite::{Connection, NO_PARAMS};
|
||||
use std::borrow::Cow;
|
||||
@ -44,7 +44,6 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
|
||||
&Packet {
|
||||
p_type: PacketType::Error,
|
||||
json: Some(String::from("File not found")),
|
||||
data: None,
|
||||
}) {
|
||||
Ok(_) => println!("{}", "Copy client attempted to read non-existing file"),
|
||||
Err(e) => println!("{}", e),
|
||||
@ -66,7 +65,6 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
|
||||
&Packet {
|
||||
p_type: PacketType::Success,
|
||||
json: Some(serde_json::to_string(&nodes).unwrap()),
|
||||
data: None,
|
||||
}) {
|
||||
Ok(_) => println!("{}", "Sent nodes with chunks"),
|
||||
Err(e) => println!("{}", e),
|
||||
@ -75,14 +73,13 @@ fn request_read(stream: &mut TcpStream, conn: &Connection, message: &str) {
|
||||
|
||||
fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
|
||||
let file: AddFile = serde_json::from_str(message).unwrap();
|
||||
let file_already_exists = add_file(&conn, &file.name, file.size as i32);
|
||||
let file_already_exists = add_file(&conn, &file.name, file.size as i64);
|
||||
if file_already_exists {
|
||||
match serde_json::to_writer(
|
||||
stream,
|
||||
&Packet {
|
||||
p_type: PacketType::Error,
|
||||
json: Some(String::from("File already exists, please remove before re-uploading")),
|
||||
data: None,
|
||||
}) {
|
||||
Ok(_) => println!("{}", "Copy client attempted to add an existing file"),
|
||||
Err(e) => println!("{}", e),
|
||||
@ -90,8 +87,6 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
|
||||
return;
|
||||
}
|
||||
let file_info = get_file_info(&conn, &file.name).unwrap();
|
||||
// let file_info = INode { id: 1, name: file.name, size: file.size };
|
||||
// println!("{:?}", file_info);
|
||||
let mut blocks: Vec<Block> = Vec::new();
|
||||
let mut nodes: Vec<AvailableNodes> = Vec::new();
|
||||
let dnodes = get_data_nodes(&conn);
|
||||
@ -115,7 +110,6 @@ fn request_write(stream: &mut TcpStream, conn: &Connection, message: &str) {
|
||||
&Packet {
|
||||
p_type: PacketType::Success,
|
||||
json: Some(serde_json::to_string(&nodes).unwrap()),
|
||||
data: None,
|
||||
}) {
|
||||
Ok(_) => println!("{}", "Sent nodes with chunks"),
|
||||
Err(e) => println!("{}", e),
|
||||
@ -147,7 +141,6 @@ fn report_success(stream: &mut TcpStream, message: &str) {
|
||||
match serde_json::to_writer(stream, &Packet {
|
||||
p_type: PacketType::Success,
|
||||
json: None,
|
||||
data: None,
|
||||
}) {
|
||||
Ok(_) => println!("{}", message),
|
||||
Err(e) => println!("{}", e),
|
||||
@ -201,7 +194,7 @@ fn get_data_nodes(conn: &Connection) -> Vec<DataNode> {
|
||||
nodes
|
||||
}
|
||||
|
||||
fn add_file(conn: &Connection, fname: &String, fsize: i32) -> bool {
|
||||
fn add_file(conn: &Connection, fname: &String, fsize: i64) -> bool {
|
||||
let file_exists = conn.query_row(
|
||||
"SELECT fid FROM inode WHERE fname = ?1",
|
||||
&[&fname as &ToSql],
|
||||
|
25
src/lib.rs
25
src/lib.rs
@ -4,9 +4,9 @@ extern crate serde_json;
|
||||
extern crate serde_derive;
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::str::FromStr;
|
||||
use std::net::TcpStream;
|
||||
use std::fs::File;
|
||||
use std::io::{Write, Read, BufWriter};
|
||||
|
||||
pub const DEFAULT_PORT: &str = "8000";
|
||||
|
||||
@ -28,7 +28,6 @@ pub enum PacketType {
|
||||
pub struct Packet {
|
||||
pub p_type: PacketType,
|
||||
pub json: Option<String>,
|
||||
pub data: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@ -46,7 +45,7 @@ pub struct NodeRegistration {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct AddFile {
|
||||
pub name: String,
|
||||
pub size: u32
|
||||
pub size: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@ -60,6 +59,7 @@ pub struct AvailableNodes {
|
||||
pub struct Chunk {
|
||||
pub index: u32,
|
||||
pub filename: String,
|
||||
pub file_size: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -91,7 +91,7 @@ pub struct BlockQuery {
|
||||
}
|
||||
|
||||
pub fn parse_endpoint_from_cli(arg_index : usize) -> String {
|
||||
let mut args: Vec<String> = std::env::args().skip(1).collect();
|
||||
let args: Vec<String> = std::env::args().skip(1).collect();
|
||||
let endpoint_arg: String = args.get(arg_index).expect("No IP provided").clone();
|
||||
|
||||
if endpoint_arg.contains(":") {
|
||||
@ -101,3 +101,16 @@ pub fn parse_endpoint_from_cli(arg_index : usize) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn receive_chunk(stream: &mut TcpStream, chunk: &Chunk, chunk_buf: &mut BufWriter<File>) {
|
||||
let mut buf = [0u8; 256];
|
||||
let mut bytes_read = 0;
|
||||
while bytes_read < chunk.file_size as usize {
|
||||
let bytes = stream.read(&mut buf).unwrap();
|
||||
chunk_buf.write_all(&buf[0..bytes]).unwrap();
|
||||
bytes_read += bytes;
|
||||
}
|
||||
chunk_buf.flush().unwrap();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user