Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
phlalx committed May 21, 2017
1 parent 1aabe59 commit ca9d097
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 331 deletions.
42 changes: 17 additions & 25 deletions extent_client.cc
Original file line number Diff line number Diff line change
@@ -1,57 +1,49 @@
// RPC stubs for clients to talk to extent_server

#include "extent_client.h"
#include <sstream>
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include <jsl_log.h>

// The calls assume that the caller holds a lock on the extent

extent_client::extent_client(std::string dst)
{
extent_client::extent_client(const std::string &dst) {
sockaddr_in dstsock;
make_sockaddr(dst.c_str(), &dstsock);
cl = new rpcc(dstsock);
if (cl->bind() != 0) {
printf("extent_client: bind failed\n");
}
cl = std::unique_ptr<rpcc>(new rpcc(dstsock));
VERIFY(cl->bind() == 0);
}

extent_protocol::status
extent_client::get(extent_protocol::extentid_t eid, std::string &buf)
{
extent_protocol::status extent_client::get(extent_protocol::extentid_t eid,
std::string &buf) {
jsl_log(JSL_DBG_ME, "extent_client: get %llu\n", eid);
extent_protocol::status ret = extent_protocol::OK;
ret = cl->call(extent_protocol::get, eid, buf);
jsl_log(JSL_DBG_ME, "extent_client: get %llu buf = %s\n", eid, buf.c_str());
return ret;
}

extent_protocol::status
extent_client::getattr(extent_protocol::extentid_t eid,
extent_protocol::attr &attr)
{
extent_protocol::status extent_client::getattr(extent_protocol::extentid_t eid,
extent_protocol::attr &attr) {
jsl_log(JSL_DBG_ME, "extent_client: getattr %llu\n", eid);
extent_protocol::status ret = extent_protocol::OK;
ret = cl->call(extent_protocol::getattr, eid, attr);
return ret;
}

extent_protocol::status
extent_client::put(extent_protocol::extentid_t eid, std::string buf)
{
extent_protocol::status extent_client::put(extent_protocol::extentid_t eid,
std::string buf) {
jsl_log(JSL_DBG_ME, "extent_client: put %llu %s\n", eid, buf.c_str());
extent_protocol::status ret = extent_protocol::OK;
int r;
ret = cl->call(extent_protocol::put, eid, buf, r);
return ret;
}

extent_protocol::status
extent_client::remove(extent_protocol::extentid_t eid)
extent_protocol::status extent_client::remove(extent_protocol::extentid_t eid)
{
jsl_log(JSL_DBG_ME, "extent_client: remove %llu\n", eid);
extent_protocol::status ret = extent_protocol::OK;
int r;
ret = cl->call(extent_protocol::remove, eid, r);
return ret;
}


}
22 changes: 15 additions & 7 deletions extent_client.h
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
// extent client interface.

#ifndef extent_client_h
#define extent_client_h

#include <string>
#include "extent_protocol.h"
#include "rpc.h"
#include <memory>

/**
* RPC stub to talk to the extent_server
* Assume that the caller always holds a lock on the extent
* Thread-safe (thanks to rpcc)
*/
class extent_client {

private:
rpcc *cl;
std::unique_ptr<rpcc> cl;

public:
extent_client(std::string dst);
extent_client(const std::string &dst);

extent_protocol::status get(extent_protocol::extentid_t eid,
std::string &buf);
std::string &buf);

extent_protocol::status getattr(extent_protocol::extentid_t eid,
extent_protocol::attr &a);
extent_protocol::status put(extent_protocol::extentid_t eid, std::string buf);
extent_protocol::attr &a);

extent_protocol::status put(extent_protocol::extentid_t eid,
std::string buf);

extent_protocol::status remove(extent_protocol::extentid_t eid);
};

Expand Down
106 changes: 47 additions & 59 deletions extent_client_cache.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// RPC stubs for clxents to talk to extent_server

#include "extent_client_cache.h"
#include "extent_client.h"
#include <sstream>
Expand All @@ -10,139 +8,129 @@
#include <slock.h>
#include <jsl_log.h>

// The calls assume that the caller holds a lock on the extent

extent_client_cache::extent_client_cache(std::string dst)
{
ec = new extent_client(dst);
extent_client_cache::extent_client_cache(const std::string &dst) : extent_client(dst) {
}

extent_protocol::status
extent_client_cache::get(extent_protocol::extentid_t eid, std::string &buf)
{
extent_protocol::status extent_client_cache::get(extent_protocol::extentid_t eid,
std::string &buf) {
extent_protocol::status res = extent_protocol::OK;
bool found = true;
{
ScopedLock ml(&mutex);
ScopedLock ml(&mutex_);
jsl_log(JSL_DBG_ME, "extent_client_cache: get %lud %llu\n", pthread_self(), eid);
if (kv_store.find(eid) == kv_store.end()) {
if (store_.find(eid) == store_.end()) {
found = false;
}
}
Value copy;
if (!found) { // not found locally
// TODO -> ajouter une opération sur le serveur qui renvoie tout ?
res = ec->get(eid, copy.buf);
Extent copy;
if (!found) {
// TODO(phil) ajouter une opération sur le serveur qui renvoie tout ?
res = extent_client::get(eid, copy.buf);
if (res != extent_protocol::OK) {
return res;
}
res = ec->getattr(eid, copy.attr);
res = extent_client::getattr(eid, copy.attr);
if (res != extent_protocol::OK) {
return res;
}
}
// qu'est ce qui peut arriver quand on n'a pas le verrou ici ?

// TODO(phil) qu'est ce qui peut arriver quand on n'a pas le verrou ici ?
{
ScopedLock ml(&mutex);
Value &v = kv_store[eid];
ScopedLock ml(&mutex_);
Extent &v = store_[eid];
if (!found) {
kv_store[eid] = copy;
store_[eid] = copy;
}
buf = v.buf;
}
return res;
}

extent_protocol::status
extent_client_cache::getattr(extent_protocol::extentid_t eid,
extent_protocol::attr &attr)
{
extent_protocol::status extent_client_cache::getattr(extent_protocol::extentid_t eid,
extent_protocol::attr &attr) {
// TODO(phil) factorize this
extent_protocol::status res = extent_protocol::OK;
bool found = true;
{
ScopedLock ml(&mutex);
jsl_log(JSL_DBG_ME, "extent_client_cache: getattr %lud %llu\n", pthread_self(), eid);
if (kv_store.find(eid) == kv_store.end()) {
found = false;
{
ScopedLock ml(&mutex_);
jsl_log(JSL_DBG_ME, "extent_client_cache: getattr %lud %llu\n", pthread_self(), eid);
if (store_.find(eid) == store_.end()) {
found = false;
}
}
}
Value copy;
Extent copy;
if (!found) { // not found locally
// TODO -> ajouter une opération sur le serveur qui renvoie tout ?
res = ec->get(eid, copy.buf);
res = extent_client::get(eid, copy.buf);
if (res != extent_protocol::OK) {
return res;
}
res = ec->getattr(eid, copy.attr);
res = extent_client::getattr(eid, copy.attr);
if (res != extent_protocol::OK) {
return res;
}
}
// qu'est ce qui peut arriver quand on n'a pas le verrou ici ?

{
ScopedLock ml(&mutex);
Value &v = kv_store[eid];
ScopedLock ml(&mutex_);
Extent &v = store_[eid];
if (!found) {
kv_store[eid] = copy;
store_[eid] = copy;
}
attr = v.attr;
}
return res;
}

extent_protocol::status
extent_client_cache::put(extent_protocol::extentid_t eid, std::string buf)
{
ScopedLock ml(&mutex);
extent_protocol::status extent_client_cache::put(extent_protocol::extentid_t eid,
std::string buf) {
ScopedLock ml(&mutex_);
jsl_log(JSL_DBG_ME, "extent_client_cache: put %lud %llu\n", pthread_self(), eid);
Value &v = kv_store[eid];
Extent &v = store_[eid];
v.buf = buf;
v.attr.size = buf.size();
v.dirty = true;
time_t cur_time = time(NULL);

// TODO why change ctime at every access?
// TODO(phil) why change ctime at every access?
v.attr.ctime = cur_time;
v.attr.mtime = cur_time;
return extent_protocol::OK;
}

extent_protocol::status
extent_client_cache::remove(extent_protocol::extentid_t eid)
{
ScopedLock ml(&mutex);
extent_protocol::status extent_client_cache::remove(
extent_protocol::extentid_t eid) {
ScopedLock ml(&mutex_);
jsl_log(JSL_DBG_ME, "extent_client_cache remove %lud %llu\n", pthread_self(), eid);
if (kv_store.find(eid) == kv_store.end()) {
if (store_.find(eid) == store_.end()) {
return extent_protocol::NOENT;
}
kv_store.erase(eid);
store_.erase(eid);
return extent_protocol::OK;
}

void extent_client_cache::flush(extent_protocol::extentid_t eid) {
ScopedLock ml(&mutex);
ScopedLock ml(&mutex_);
jsl_log(JSL_DBG_ME, "extent_client_cache: flush %lud %llu %p\n", pthread_self(), eid, this);
extent_protocol::status res = extent_protocol::OK;

if (kv_store.find(eid) == kv_store.end()) {
if (store_.find(eid) == store_.end()) {
jsl_log(JSL_DBG_ME, "extent_client_cache: flush - remove %llu\n", eid);
res = ec->remove(eid);
res = extent_client::remove(eid);
// TODO pas toujours bon... test-lab-3-b
VERIFY(res == extent_protocol::OK);
return;
}

Value &v = kv_store[eid];
Extent &v = store_[eid];
if (v.dirty) {
jsl_log(JSL_DBG_ME, "extent_client_cache: flush - write back %llu\n", eid);
jsl_log(JSL_DBG_ME, "extent_client_cache: flush - write back %llu buf = %s\n ", eid, v.buf.c_str());
// writeback
res = ec->put(eid, v.buf);
res = extent_client::put(eid, v.buf);
VERIFY(res == extent_protocol::OK);
}
jsl_log(JSL_DBG_ME, "extent_client_cache: flush - erase %llu\n", eid);
kv_store.erase(eid);
store_.erase(eid);
return;
}


32 changes: 24 additions & 8 deletions extent_client_cache.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// extent client interface.

#ifndef extent_client_cache_h
#define extent_client_cache_h

Expand All @@ -8,30 +6,48 @@
#include "extent_client.h"
#include "rpc.h"

class extent_client_cache {
/**
* Cached client to the extent server
* Operations are mostly performed locally
* All operations are protected by a mutex
* Sequential consistency is guaranteed as long as
* the caller owns the lock on the extent
*/
class extent_client_cache : extent_client {

private:
extent_client *ec;

struct Value {
struct Extent {
extent_protocol::attr attr;
std::string buf;
bool dirty;
};

std::map<extent_protocol::extentid_t, Value> kv_store;
std::map<extent_protocol::extentid_t, Extent> store_;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER;

public:
extent_client_cache(std::string dst);
extent_client_cache(const std::string &dst);

/** local operation, unless eid is not present */
extent_protocol::status get(extent_protocol::extentid_t eid,
std::string &buf);

/** local operation, unless eid is not present */
extent_protocol::status getattr(extent_protocol::extentid_t eid,
extent_protocol::attr &a);

/* always local */
extent_protocol::status put(extent_protocol::extentid_t eid, std::string buf);

/* always local */
extent_protocol::status remove(extent_protocol::extentid_t eid);

/**
* Flush writes back eid if it's dirty or has been removed
* Called by the lock client before release the lock
*/
void flush(extent_protocol::extentid_t eid);
};

Expand Down
9 changes: 9 additions & 0 deletions extent_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,20 @@

#include "rpc.h"

/**
* Common types for RPC between extent_client and extent_server
*/

class extent_protocol {
public:
typedef int status;

typedef unsigned long long extentid_t;

static const extentid_t root_inum = 1L;

enum xxstatus { OK, RPCERR, NOENT, IOERR };

enum rpc_numbers {
put = 0x6001,
get,
Expand Down
Loading

0 comments on commit ca9d097

Please sign in to comment.