diff --git a/Makefile.am b/Makefile.am index b5941b18..00270392 100644 --- a/Makefile.am +++ b/Makefile.am @@ -3,7 +3,7 @@ include tests/Makefile.am AM_CPPFLAGS = -I@srcdir@/src/common -I@srcdir@/src -include ./config.h AM_CXXFLAGS = -Wall -std=c++14 -ggdb3 -bin_PROGRAMS = eclipse_node dfsput dfsget dfsls dfsinit +bin_PROGRAMS = eclipse_node dfsput dfsget dfsls dfsinit dfsrm messages_files = src/messages/boundaries.cc \ @@ -21,7 +21,9 @@ messages_files = src/messages/boundaries.cc \ src/messages/blockrequest.cc \ src/messages/filelist.cc \ src/messages/filedescription.cc \ - src/messages/cacheinfo.cc + src/messages/cacheinfo.cc \ + src/messages/blockdel.cc \ + src/messages/filedel.cc # libs ----- lib_LTLIBRARIES = libecfs.la libecfs_la_SOURCES = src/common/hash.cc src/common/settings.cc \ @@ -73,6 +75,11 @@ dfsinit_SOURCES = src/fs/dfsinit.cc src/fs/directory.cc \ dfsinit_LDADD = $(LDADD) -lsqlite3 +dfsrm_SOURCES = src/fs/dfsrm.cc src/fs/directory.cc \ + $(messages_files) + +dfsrm_LDADD = $(LDADD) -lsqlite3 + pkginclude_HEADERS = src/common/ecfs.hh src/common/settings.hh # Scripts ------- diff --git a/src/fs/dfsrm.cc b/src/fs/dfsrm.cc index fda1fdb8..9e8a10da 100644 --- a/src/fs/dfsrm.cc +++ b/src/fs/dfsrm.cc @@ -1,14 +1,78 @@ #include #include #include -#include "common/hash.hh" -#include "common/context.hh" -#include "fileinfo.hh" -#include "blockinfo.hh" -//#include "directory.hh" metadata save/load +#include +#include +#include "../common/hash.hh" +#include "../common/context.hh" +#include "../messages/filedel.hh" +#include "../messages/blockdel.hh" +#include "../messages/filedescription.hh" +#include "../messages/filerequest.hh" +#include "../messages/factory.hh" +#include "../messages/reply.hh" +#include "directory.hh" +#include using namespace std; using namespace eclipse; +using namespace eclipse::messages; +using boost::asio::ip::tcp; +using vec_str = std::vector; + +boost::asio::io_service iosvc; + +tcp::socket* connect (int hash_value) { + tcp::socket* socket = new tcp::socket (iosvc); + Settings setted = Settings().load(); + + int port = setted.get ("network.port_mapreduce"); + vec_str nodes = setted.get ("network.nodes"); + + string host = nodes[ hash_value % nodes.size() ]; + + tcp::resolver resolver (iosvc); + tcp::resolver::query query (host, to_string(port)); + tcp::resolver::iterator it (resolver.resolve(query)); + auto ep = new tcp::endpoint (*it); + socket->connect(*ep); + delete ep; + return socket; +} + +void send_message (tcp::socket* socket, eclipse::messages::Message* msg) { + string out = save_message(msg); + stringstream ss; + ss << setfill('0') << setw(16) << out.length() << out; + + socket->send(boost::asio::buffer(ss.str())); +} + +eclipse::messages::Reply* read_reply(tcp::socket* socket) { + char header[17] = {0}; + header[16] = '\0'; + socket->receive(boost::asio::buffer(header, 16)); + size_t size_of_msg = atoi(header); + char* body = new char[size_of_msg]; + socket->receive(boost::asio::buffer(body, size_of_msg)); + string recv_msg(body, size_of_msg); + eclipse::messages::Message* m = load_message(recv_msg); + delete[] body; + return dynamic_cast(m); +} + +eclipse::messages::FileDescription* read_fd(tcp::socket* socket) { + char header[17] = {0}; + header[16] = '\0'; + socket->receive(boost::asio::buffer(header, 16)); + size_t size_of_msg = atoi(header); + char* body = new char[size_of_msg]; + socket->receive(boost::asio::buffer(body, size_of_msg)); + string recv_msg(body, size_of_msg); + eclipse::messages::Message* m = load_message(recv_msg); + delete[] body; + return dynamic_cast(m); +} int main(int argc, char* argv[]) { @@ -20,31 +84,63 @@ int main(int argc, char* argv[]) } else { + //uint32_t BLOCK_SIZE = con.settings.get("filesystem.block"); + //uint32_t NUMSERVERS = con.settings.get>( + //"network.nodes").size(); string path = con.settings.get("path.scratch"); for(int i=1; inodes) { + auto *tmp_socket = connect(h(block_name.c_str())); + BlockDel bd; + bd.block_name = block_name; + bd.file_name = file_name; + bd.block_seq = block_seq++; + send_message(tmp_socket, &bd); + auto msg = read_reply(tmp_socket); + if (msg->message != "OK") { + cerr << "[ERROR]: block " << block_name << "doesn't exist" << endl; + return EXIT_FAILURE; + } + tmp_socket->close(); + } + + FileDel file_del; + file_del.file_name = file_name; + send_message(socket, &file_del); + auto reply = read_reply(socket); + if (reply->message != "OK") { + cerr << "[ERROR]: file " << file_name << " does not exist" << endl; + return EXIT_FAILURE; + } +/* + if (reply->message != "OK") { { - cout << "[ERROR]: file " << file_name << " does not exist" << endl; + cerr << "[ERROR]: file " << file_name << " does not exist" << endl; + return EXIT_FAILURE; } else { - FileInfo file_info; - // TODO: remote_metadata_server.select_file_metadata(file_name, &file_info); - cout << "remote_metadata_server.select_file_metadata(file_name, &file_info);" << endl; + FileRequest file_request; + send_message(scoket, &file_reqeust); + FileDescription file_description; + file_description = read_fd(scoket); + //cout << "remote_metadata_server.select_file_metadata(file_name, &file_info);" << endl; - // for test - file_info.num_block = 16; + file_del.num_block = file_description.nodes.size(); - for(unsigned int block_seq=0; block_seq #include @@ -143,6 +145,20 @@ template ar & BOOST_SERIALIZATION_NVP(c.file_name); ar & BOOST_SERIALIZATION_NVP(c.nodes); } + +template + void serialize (Archive& ar, eclipse::messages::FileDel& c, unsigned int) { + ar & BASE_OBJECT(Message, c); + ar & BOOST_SERIALIZATION_NVP(c.file_name); + } + +template + void serialize (Archive& ar, eclipse::messages::BlockDel& c, unsigned int) { + ar & BASE_OBJECT(Message, c); + ar & BOOST_SERIALIZATION_NVP(c.file_name); + ar & BOOST_SERIALIZATION_NVP(c.block_seq); + ar & BOOST_SERIALIZATION_NVP(c.block_name); + } } } @@ -163,3 +179,5 @@ BOOST_CLASS_TRACKING(eclipse::messages::CacheInfo, boost::serialization::track_n BOOST_CLASS_TRACKING(eclipse::messages::FileRequest, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::BlockRequest, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::FileDescription, boost::serialization::track_never); +BOOST_CLASS_TRACKING(eclipse::messages::FileDel, boost::serialization::track_never); +BOOST_CLASS_TRACKING(eclipse::messages::BlockDel, boost::serialization::track_never); diff --git a/src/messages/filedel.cc b/src/messages/filedel.cc new file mode 100644 index 00000000..43466319 --- /dev/null +++ b/src/messages/filedel.cc @@ -0,0 +1,7 @@ +#include "filedel.hh" + +namespace eclipse { +namespace messages { +std::string FileDel::get_type() const { return "FileDel"; } +} +} diff --git a/src/messages/filedel.hh b/src/messages/filedel.hh new file mode 100644 index 00000000..e4469e67 --- /dev/null +++ b/src/messages/filedel.hh @@ -0,0 +1,23 @@ +#pragma once + +#include "message.hh" + +namespace eclipse { +namespace messages { + +struct FileDel: public Message { + FileDel() = default; + ~FileDel() = default; + + std::string get_type() const override; + + std::string file_name; + //uint32_t file_id; + //uint32_t file_hash_key; + //uint64_t file_size; + //unsigned int num_block; + //unsigned int replica; +}; + +} +} diff --git a/src/nodes/peerdfs.cc b/src/nodes/peerdfs.cc index ef136d0e..d746499e 100644 --- a/src/nodes/peerdfs.cc +++ b/src/nodes/peerdfs.cc @@ -13,6 +13,7 @@ #include #include #include +#include using namespace eclipse; using namespace eclipse::messages; @@ -56,7 +57,6 @@ void PeerDFS::insert (std::string k, std::string v) { ofstream file (file_path); file << v; file.close(); - sleep(1); } else { logger->info ("[DFS] Forwaring KEY: %s -> %d",k.c_str(), which_node); @@ -184,6 +184,38 @@ bool PeerDFS::insert_block (messages::BlockInfo* m) { return true; } // }}} +// Delete {{{ +void PeerDFS::Delete (std::string k) { + string file_path = disk_path + string("/") + k; + remove(file_path.c_str()); +} +// }}} +// delete_block {{{ +bool PeerDFS::delete_block (messages::BlockDel* m) { + string file_name = m->file_name; + unsigned int block_seq = m->block_seq; + string key = m->block_name; + Delete(key); + directory.delete_block_metadata(file_name, block_seq); + return true; +} +// }}} +// delete_file {{{ +bool PeerDFS::delete_file (messages::FileDel* f) { + bool ret = directory.is_exist(f->file_name.c_str()); + + if (!ret) { + logger->info ("File:%s doesn't exist in db, ret = %i", f->file_name.c_str(), + ret); + return false; + } + + directory.delete_file_metadata(f->file_name); + + logger->info ("Removing from SQLite db"); + return true; +} +// }}} // request_block {{{ FileDescription PeerDFS::request_file (messages::FileRequest* m) { string file_name = m->file_name; diff --git a/src/nodes/peerdfs.hh b/src/nodes/peerdfs.hh index 7b226d58..bc8ef35e 100644 --- a/src/nodes/peerdfs.hh +++ b/src/nodes/peerdfs.hh @@ -8,6 +8,8 @@ #include "../messages/filerequest.hh" #include "../messages/filedescription.hh" #include "../messages/filelist.hh" +#include "../messages/filedel.hh" +#include "../messages/blockdel.hh" #include "../fs/directory.hh" #include @@ -30,9 +32,12 @@ class PeerDFS: public Node, public AsyncNode { void insert (std::string, std::string); void request (std::string, req_func); + void Delete (std::string); void close (); bool insert_block (messages::BlockInfo*); bool insert_file (messages::FileInfo*); + bool delete_block (messages::BlockDel*); + bool delete_file (messages::FileDel*); bool list (messages::FileList*); FileDescription request_file (messages::FileRequest*); diff --git a/src/nodes/remotedfs.cc b/src/nodes/remotedfs.cc index 63dbff3f..1fe29902 100644 --- a/src/nodes/remotedfs.cc +++ b/src/nodes/remotedfs.cc @@ -13,6 +13,8 @@ RemoteDFS::RemoteDFS (Context& c) : Router(c), peer(c) { routing_table.insert({"FileRequest", bind(&RemoteDFS::request_file, this, ph::_1)}); routing_table.insert({"BlockRequest", bind(&RemoteDFS::request_block, this, ph::_1)}); routing_table.insert({"FileList", bind(&RemoteDFS::request_ls, this, ph::_1)}); + routing_table.insert({"BlockDel", bind(&RemoteDFS::delete_block, this, ph::_1)}); + routing_table.insert({"FileDel", bind(&RemoteDFS::delete_file, this, ph::_1)}); } // }}} // establish {{{ @@ -40,6 +42,23 @@ void RemoteDFS::insert_block (messages::Message* m_) { network->send(0, &reply); } +void RemoteDFS::delete_block (messages::Message* m_) { + auto m = dynamic_cast (m_); + logger->info ("BlockDel received"); + + bool ret = peer.delete_block(m); + Reply reply; + + if (ret) { + reply.message = "OK"; + } else { + reply.message = "FAIL"; + reply.details = "Block doesn't exist"; + } + + network->send(0, &reply); +} + // }}} // FileInfo* {{{ void RemoteDFS::insert_file (messages::Message* m_) { @@ -59,6 +78,22 @@ void RemoteDFS::insert_file (messages::Message* m_) { network->send(0, &reply); } +void RemoteDFS::delete_file (messages::Message* m_) { + auto m = dynamic_cast (m_); + logger->info ("FileDel received"); + + bool ret = peer.delete_file (m); + Reply reply; + + if (ret) { + reply.message = "OK"; + } else { + reply.message = "FAIL"; + reply.details = "File doesn't exist"; + } + + network->send(0, &reply); +} // }}} // request_file {{{ void RemoteDFS::request_file (messages::Message* m_) { diff --git a/src/nodes/remotedfs.hh b/src/nodes/remotedfs.hh index ea9beddc..1e3ac8f5 100644 --- a/src/nodes/remotedfs.hh +++ b/src/nodes/remotedfs.hh @@ -19,6 +19,8 @@ class RemoteDFS: public Router { void request_file (messages::Message*); void request_block (messages::Message*); void request_ls (messages::Message*); + void delete_file (messages::Message*); + void delete_block (messages::Message*); void send_block (std::string, std::string); protected: