Skip to content

Commit

Permalink
Merge pull request #45 from DICL/connecting-dfsrm
Browse files Browse the repository at this point in the history
Connecting dfsrm
  • Loading branch information
wonbaekimys committed Mar 14, 2016
2 parents 290b4b0 + 05d0356 commit 9e7d0a5
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 21 deletions.
11 changes: 9 additions & 2 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ include tests/Makefile.am
AM_CPPFLAGS = -I@srcdir@/src/common -I@srcdir@/src -include ./config.h
AM_CXXFLAGS = -Wall -std=c++14 -ggdb3

bin_PROGRAMS = eclipse_node dfsput dfsget dfsls dfsinit
bin_PROGRAMS = eclipse_node dfsput dfsget dfsls dfsinit dfsrm


messages_files = src/messages/boundaries.cc \
Expand All @@ -21,7 +21,9 @@ messages_files = src/messages/boundaries.cc \
src/messages/blockrequest.cc \
src/messages/filelist.cc \
src/messages/filedescription.cc \
src/messages/cacheinfo.cc
src/messages/cacheinfo.cc \
src/messages/blockdel.cc \
src/messages/filedel.cc
# libs -----
lib_LTLIBRARIES = libecfs.la
libecfs_la_SOURCES = src/common/hash.cc src/common/settings.cc \
Expand Down Expand Up @@ -73,6 +75,11 @@ dfsinit_SOURCES = src/fs/dfsinit.cc src/fs/directory.cc \

dfsinit_LDADD = $(LDADD) -lsqlite3

dfsrm_SOURCES = src/fs/dfsrm.cc src/fs/directory.cc \
$(messages_files)

dfsrm_LDADD = $(LDADD) -lsqlite3

pkginclude_HEADERS = src/common/ecfs.hh src/common/settings.hh

# Scripts -------
Expand Down
135 changes: 117 additions & 18 deletions src/fs/dfsrm.cc
Original file line number Diff line number Diff line change
@@ -1,14 +1,78 @@
#include <iostream>
#include <fstream>
#include <string>
#include "common/hash.hh"
#include "common/context.hh"
#include "fileinfo.hh"
#include "blockinfo.hh"
//#include "directory.hh" metadata save/load
#include <iomanip>
#include <sstream>
#include "../common/hash.hh"
#include "../common/context.hh"
#include "../messages/filedel.hh"
#include "../messages/blockdel.hh"
#include "../messages/filedescription.hh"
#include "../messages/filerequest.hh"
#include "../messages/factory.hh"
#include "../messages/reply.hh"
#include "directory.hh"
#include <boost/asio.hpp>

using namespace std;
using namespace eclipse;
using namespace eclipse::messages;
using boost::asio::ip::tcp;
using vec_str = std::vector<std::string>;

boost::asio::io_service iosvc;

tcp::socket* connect (int hash_value) {
tcp::socket* socket = new tcp::socket (iosvc);
Settings setted = Settings().load();

int port = setted.get<int> ("network.port_mapreduce");
vec_str nodes = setted.get<vec_str> ("network.nodes");

string host = nodes[ hash_value % nodes.size() ];

tcp::resolver resolver (iosvc);
tcp::resolver::query query (host, to_string(port));
tcp::resolver::iterator it (resolver.resolve(query));
auto ep = new tcp::endpoint (*it);
socket->connect(*ep);
delete ep;
return socket;
}

void send_message (tcp::socket* socket, eclipse::messages::Message* msg) {
string out = save_message(msg);
stringstream ss;
ss << setfill('0') << setw(16) << out.length() << out;

socket->send(boost::asio::buffer(ss.str()));
}

eclipse::messages::Reply* read_reply(tcp::socket* socket) {
char header[17] = {0};
header[16] = '\0';
socket->receive(boost::asio::buffer(header, 16));
size_t size_of_msg = atoi(header);
char* body = new char[size_of_msg];
socket->receive(boost::asio::buffer(body, size_of_msg));
string recv_msg(body, size_of_msg);
eclipse::messages::Message* m = load_message(recv_msg);
delete[] body;
return dynamic_cast<eclipse::messages::Reply*>(m);
}

eclipse::messages::FileDescription* read_fd(tcp::socket* socket) {
char header[17] = {0};
header[16] = '\0';
socket->receive(boost::asio::buffer(header, 16));
size_t size_of_msg = atoi(header);
char* body = new char[size_of_msg];
socket->receive(boost::asio::buffer(body, size_of_msg));
string recv_msg(body, size_of_msg);
eclipse::messages::Message* m = load_message(recv_msg);
delete[] body;
return dynamic_cast<eclipse::messages::FileDescription*>(m);
}

int main(int argc, char* argv[])
{
Expand All @@ -20,39 +84,72 @@ int main(int argc, char* argv[])
}
else
{
//uint32_t BLOCK_SIZE = con.settings.get<int>("filesystem.block");
//uint32_t NUMSERVERS = con.settings.get<vector<string>>(
//"network.nodes").size();
string path = con.settings.get<string>("path.scratch");
for(int i=1; i<argc; i++)
{
string file_name = argv[i];
//uint32_t file_hash_key = h(file_name);
uint32_t file_hash_key = h(file_name);
auto socket = connect(file_hash_key);
FileRequest fr;
fr.file_name = file_name;

//TODO: remote_metadata_server = lookup(file_hash_key);
send_message(socket, &fr);
auto fd = read_fd(socket);

// TODO: if(!remote_metadata_server.is_exist(file_name))
if(0)
unsigned int block_seq = 0;
for (auto block_name : fd->nodes) {
auto *tmp_socket = connect(h(block_name.c_str()));
BlockDel bd;
bd.block_name = block_name;
bd.file_name = file_name;
bd.block_seq = block_seq++;
send_message(tmp_socket, &bd);
auto msg = read_reply(tmp_socket);
if (msg->message != "OK") {
cerr << "[ERROR]: block " << block_name << "doesn't exist" << endl;
return EXIT_FAILURE;
}
tmp_socket->close();
}

FileDel file_del;
file_del.file_name = file_name;
send_message(socket, &file_del);
auto reply = read_reply(socket);
if (reply->message != "OK") {
cerr << "[ERROR]: file " << file_name << " does not exist" << endl;
return EXIT_FAILURE;
}
/*
if (reply->message != "OK") {
{
cout << "[ERROR]: file " << file_name << " does not exist" << endl;
cerr << "[ERROR]: file " << file_name << " does not exist" << endl;
return EXIT_FAILURE;
}
else
{
FileInfo file_info;
// TODO: remote_metadata_server.select_file_metadata(file_name, &file_info);
cout << "remote_metadata_server.select_file_metadata(file_name, &file_info);" << endl;
FileRequest file_request;
send_message(scoket, &file_reqeust);
FileDescription file_description;
file_description = read_fd(scoket);
//cout << "remote_metadata_server.select_file_metadata(file_name, &file_info);" << endl;
// for test
file_info.num_block = 16;
file_del.num_block = file_description.nodes.size();
for(unsigned int block_seq=0; block_seq<file_info.num_block; block_seq++)
for(unsigned int block_seq=0; block_seq<file_del.num_block; block_seq++)
{
BlockInfo block_info;
// TODO: remote_metadata_server.select_block_metadata(file_name, block_seq, &block_info);
//cout << "remote_metadata_server.select_block_metadata(file_name, block_seq, &block_info)" << endl;
// TODO: remote_block_server.lookup(block_info.block_hash_key);
//cout << "remote_block_server.lookup(block_info.block_hash_key)" << endl;
// for test
block_info.block_name = file_name + "_" + to_string(block_seq);
//block_info.block_name = file_name + "_" + to_string(block_seq);
BlockRequest
string rmblock = path + "/" + block_info.block_name;
Expand All @@ -70,6 +167,8 @@ int main(int argc, char* argv[])
}
}
}
*/
}
}
return 0;
}
6 changes: 6 additions & 0 deletions src/messages/blockdel.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#include "blockdel.hh"
namespace eclipse {
namespace messages {
std::string BlockDel::get_type() const { return "BlockDel"; }
}
}
25 changes: 25 additions & 0 deletions src/messages/blockdel.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once
#include "message.hh"

namespace eclipse {
namespace messages {

struct BlockDel: public Message {
std::string get_type() const override;

//uint32_t file_id;
std::string file_name;
unsigned int block_seq;
//uint32_t block_hash_key;
std::string block_name;
//uint32_t block_size;
//unsigned int is_inter;
//std::string node;
//std::string l_node;
//std::string r_node;
//unsigned int is_commit;
//std::string content;
};

}
}
2 changes: 2 additions & 0 deletions src/messages/boost_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ BOOST_CLASS_EXPORT(eclipse::messages::CacheInfo);
BOOST_CLASS_EXPORT(eclipse::messages::FileRequest);
BOOST_CLASS_EXPORT(eclipse::messages::BlockRequest);
BOOST_CLASS_EXPORT(eclipse::messages::FileDescription);
BOOST_CLASS_EXPORT(eclipse::messages::FileDel);
BOOST_CLASS_EXPORT(eclipse::messages::BlockDel);
18 changes: 18 additions & 0 deletions src/messages/boost_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "filerequest.hh"
#include "blockrequest.hh"
#include "filedescription.hh"
#include "blockdel.hh"
#include "filedel.hh"

#include <boost/serialization/export.hpp>
#include <boost/serialization/access.hpp>
Expand Down Expand Up @@ -143,6 +145,20 @@ template <typename Archive>
ar & BOOST_SERIALIZATION_NVP(c.file_name);
ar & BOOST_SERIALIZATION_NVP(c.nodes);
}

template <typename Archive>
void serialize (Archive& ar, eclipse::messages::FileDel& c, unsigned int) {
ar & BASE_OBJECT(Message, c);
ar & BOOST_SERIALIZATION_NVP(c.file_name);
}

template <typename Archive>
void serialize (Archive& ar, eclipse::messages::BlockDel& c, unsigned int) {
ar & BASE_OBJECT(Message, c);
ar & BOOST_SERIALIZATION_NVP(c.file_name);
ar & BOOST_SERIALIZATION_NVP(c.block_seq);
ar & BOOST_SERIALIZATION_NVP(c.block_name);
}
}
}

Expand All @@ -163,3 +179,5 @@ BOOST_CLASS_TRACKING(eclipse::messages::CacheInfo, boost::serialization::track_n
BOOST_CLASS_TRACKING(eclipse::messages::FileRequest, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::BlockRequest, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::FileDescription, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::FileDel, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::BlockDel, boost::serialization::track_never);
7 changes: 7 additions & 0 deletions src/messages/filedel.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#include "filedel.hh"

namespace eclipse {
namespace messages {
std::string FileDel::get_type() const { return "FileDel"; }
}
}
23 changes: 23 additions & 0 deletions src/messages/filedel.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include "message.hh"

namespace eclipse {
namespace messages {

struct FileDel: public Message {
FileDel() = default;
~FileDel() = default;

std::string get_type() const override;

std::string file_name;
//uint32_t file_id;
//uint32_t file_hash_key;
//uint64_t file_size;
//unsigned int num_block;
//unsigned int replica;
};

}
}
34 changes: 33 additions & 1 deletion src/nodes/peerdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <iterator>
#include <memory>
#include <fstream>
#include <cstdio>

using namespace eclipse;
using namespace eclipse::messages;
Expand Down Expand Up @@ -56,7 +57,6 @@ void PeerDFS::insert (std::string k, std::string v) {
ofstream file (file_path);
file << v;
file.close();
sleep(1);

} else {
logger->info ("[DFS] Forwaring KEY: %s -> %d",k.c_str(), which_node);
Expand Down Expand Up @@ -184,6 +184,38 @@ bool PeerDFS::insert_block (messages::BlockInfo* m) {
return true;
}
// }}}
// Delete {{{
void PeerDFS::Delete (std::string k) {
string file_path = disk_path + string("/") + k;
remove(file_path.c_str());
}
// }}}
// delete_block {{{
bool PeerDFS::delete_block (messages::BlockDel* m) {
string file_name = m->file_name;
unsigned int block_seq = m->block_seq;
string key = m->block_name;
Delete(key);
directory.delete_block_metadata(file_name, block_seq);
return true;
}
// }}}
// delete_file {{{
bool PeerDFS::delete_file (messages::FileDel* f) {
bool ret = directory.is_exist(f->file_name.c_str());

if (!ret) {
logger->info ("File:%s doesn't exist in db, ret = %i", f->file_name.c_str(),
ret);
return false;
}

directory.delete_file_metadata(f->file_name);

logger->info ("Removing from SQLite db");
return true;
}
// }}}
// request_block {{{
FileDescription PeerDFS::request_file (messages::FileRequest* m) {
string file_name = m->file_name;
Expand Down
5 changes: 5 additions & 0 deletions src/nodes/peerdfs.hh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "../messages/filerequest.hh"
#include "../messages/filedescription.hh"
#include "../messages/filelist.hh"
#include "../messages/filedel.hh"
#include "../messages/blockdel.hh"
#include "../fs/directory.hh"

#include <string>
Expand All @@ -30,9 +32,12 @@ class PeerDFS: public Node, public AsyncNode {

void insert (std::string, std::string);
void request (std::string, req_func);
void Delete (std::string);
void close ();
bool insert_block (messages::BlockInfo*);
bool insert_file (messages::FileInfo*);
bool delete_block (messages::BlockDel*);
bool delete_file (messages::FileDel*);
bool list (messages::FileList*);
FileDescription request_file (messages::FileRequest*);

Expand Down
Loading

0 comments on commit 9e7d0a5

Please sign in to comment.