Skip to content

Commit

Permalink
BF: CS-746 Improve automatic session performance with 10k of executio…
Browse files Browse the repository at this point in the history
…n hosts

EH: CS-744 DRMAA requests can be handled by secondary DS if sessions are enabled
EH: CS-743 All execds should fetch configuration updates from reader DS/threads
  • Loading branch information
ernst-bablick committed Oct 29, 2024
1 parent bab3f92 commit 20dfbea
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 69 deletions.
2 changes: 1 addition & 1 deletion source/daemons/qmaster/ocs_MirrorDataStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ namespace ocs {

[[noreturn]] virtual void *main([[maybe_unused]] void *arg);
virtual void subscribe_events() = 0;
virtual void update_sessions_and_move_requests(u_long64 unique_id) = 0;
virtual void update_sessions_and_move_requests(const u_long64 unique_id) = 0;

static void event_mirror_update_func([[maybe_unused]] u_long32 ec_id, [[maybe_unused]] lList **answer_list, lList *event_list, void *arg);

Expand Down
15 changes: 7 additions & 8 deletions source/daemons/qmaster/ocs_MirrorReaderDataStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,23 @@ namespace ocs {
evc->ec_set_edtime(evc, 1);
}

void MirrorReaderDataStore::update_sessions_and_move_requests(u_long unique_id){
void MirrorReaderDataStore::update_sessions_and_move_requests(const u_long unique_id){
DENTER(TOP_LAYER);

// Update the session with the unique ID of the last event
SessionManager::set_process_unique_id(GDI_SESSION_NONE, unique_id);
SessionManager::set_process_unique_id(unique_id);

// Move waiting reader requests that can now be handled
// to the reader request queue (or global request queue if readers are disabled)
int moved_elements = sge_tq_move_from_to_if(ReaderWaitingRequestQueue, ReaderRequestQueue,
const int moved_elements = sge_tq_move_from_to_if(ReaderWaitingRequestQueue, ReaderRequestQueue,
[](const void *always_nullptr, const void *task_void) -> int {
// Find the packet stored in the task of the TQ
auto *task = *static_cast<sge_tq_task_t *const *>(task_void);
auto *packet = static_cast<sge_gdi_packet_class_t *>(task->data);
const auto *task = *static_cast<sge_tq_task_t *const *>(task_void);
const auto *packet = static_cast<sge_gdi_packet_class_t *>(task->data);

// Check if the session is up-to-date
u_long64 session_id = SessionManager::get_session_id(packet);
bool is_uptodate = false;
SessionManager::is_uptodate(session_id, &is_uptodate);
const u_long64 session_id = SessionManager::get_session_id(packet->user);
const bool is_uptodate = SessionManager::is_uptodate(session_id);

// Return the outcome so that the task is moved
return is_uptodate ? 0 : -1;
Expand Down
24 changes: 14 additions & 10 deletions source/daemons/qmaster/sge_qmaster_process_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,22 @@ static ocs::DataStore::Id
get_gdi_executor_ds(sge_gdi_packet_class_t *packet) {
DENTER(TOP_LAYER);

// check the packet itself
// Usually the request type defines which DS to be used but there are some exceptions where the global ds
// is enforced:
// - Internal GDI requests
// - DRMAA requests if automatic sessions are disabled (because DRMAA 1 must have a concise view on the data
// otherwise the lib will fail).
// - Execd requests if secondary DS are disabled for execd (only for test purpose)
if (packet->is_intern_request) {
// Internal GDI requests will always be executed with access to the GLOBAL data store
DRETURN(ocs::DataStore::GLOBAL);
} else if (strcmp(packet->commproc, prognames[DRMAA]) == 0) {
// DRMAA requests will always be executed with access to the GLOBAL data store
// (@todo EB: as long as we do not have automatic GDI sessions)
DRETURN(ocs::DataStore::GLOBAL);
// DRMAA-requests will only be handled by reader if automatic sessions are enabled
if (mconf_get_disable_automatic_session()) {
DRETURN(ocs::DataStore::GLOBAL);
}
} else if (strcmp(packet->commproc, prognames[EXECD]) == 0 && mconf_get_disable_secondary_ds_execd()) {
// request coming from execd should be handled with GLOBAL DS if secondary DS are disabled for execd
// request coming from execd should be handled with global DS if secondary DS are disabled for execd
DRETURN(ocs::DataStore::GLOBAL);
}

Expand Down Expand Up @@ -362,7 +368,7 @@ do_gdi_packet(struct_msg_t *aMsg, monitoring_t *monitor) {
&(packet->gid), packet->group, sizeof(packet->group),
&packet->amount, &packet->grp_array);

packet->gdi_session = ocs::SessionManager::get_session_id(packet);
packet->gdi_session = ocs::SessionManager::get_session_id(packet->user);
}

// check CSP mode if enabled
Expand Down Expand Up @@ -445,16 +451,14 @@ do_gdi_packet(struct_msg_t *aMsg, monitoring_t *monitor) {
// Default is the global request queue unless readers are enabled
sge_tq_queue_t *queue = GlobalRequestQueue;
if (!mconf_get_disable_secondary_ds_reader()) {
u_long64 session_id = ocs::SessionManager::get_session_id(packet);
u_long64 session_id = ocs::SessionManager::get_session_id(packet->user);

// Reader DS is enabled so as default we will use the ReaderRequestQueue unless the auto sessions are enabled
queue = ReaderRequestQueue;
if (!mconf_get_disable_automatic_session()) {

// Sessions are enabled so we have to check if the session is up-to-date
bool is_uptodate = false;
ocs::SessionManager::is_uptodate(session_id, &is_uptodate);
if (!is_uptodate) {
if (!ocs::SessionManager::is_uptodate(session_id)) {
queue = ReaderWaitingRequestQueue;
}
}
Expand Down
4 changes: 2 additions & 2 deletions source/daemons/qmaster/sge_thread_event_master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ sge_event_master_terminate() {

[[noreturn]] void *
sge_event_master_main(void *arg) {
auto *thread_config = (cl_thread_settings_t *) arg;
auto *thread_config = static_cast<cl_thread_settings_t *>(arg);
monitoring_t monitor;
monitoring_t *p_monitor = &monitor;
u_long64 next_prof_output = 0;
Expand All @@ -129,7 +129,7 @@ sge_event_master_main(void *arg) {

// set thread name and id used by logging an others
const char *thread_name = thread_config->thread_name;
int thread_id = thread_config->thread_id;
const int thread_id = thread_config->thread_id;
component_set_thread_name(thread_name);
component_set_thread_id(thread_id);
DPRINTF(SFN "(%d) started\n", thread_name, thread_id);
Expand Down
18 changes: 16 additions & 2 deletions source/libs/evm/sge_event_master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,11 @@ sge_create_event(u_long32 number, u_long64 timestamp, ev_event type, u_long32 in
DRETURN(etp);
}

/** @brief Add a unique ID to all events part of the evr
*
* @param evr - the event request object
* @param unique_id - the unique ID to add
*/
static void
add_unique_id_to_evr(lListElem *evr, u_long64 unique_id) {
lListElem *ev;
Expand All @@ -1531,7 +1536,8 @@ add_list_event_for_client_after_commit(lListElem *evr, lList *evr_list, u_long64
// Find the next unique ID for the event
u_long64 unique_id = oge_get_next_unique_event_id();

// Add a unique ID to to events part of the commited evr or evr_list.
// Add a unique ID to events part of the commited evr or evr_list.
// Check also if the event is execd related (like the config events)
if (single_evr) {
add_unique_id_to_evr(evr, unique_id);
} else {
Expand All @@ -1542,9 +1548,17 @@ add_list_event_for_client_after_commit(lListElem *evr, lList *evr_list, u_long64
}
}

// Process the unique ID for the event that was triggered
// Process the unique ID for the event that was triggered. session is the same as for the initiator of the request.
ocs::SessionManager::set_write_unique_id(gdi_session, unique_id);

// All execd send GDI requests as admin user. In order to enforce them to see most current information
// via reader DS, we need to set the unique ID for the admin user session.
static const u_long64 admin_user_session = ocs::SessionManager::get_session_id(bootstrap_get_admin_user());
ocs::SessionManager::set_write_unique_id(admin_user_session, unique_id);

// @TODO: EB remove this
ocs::SessionManager::dump_all();

// Add the request to the event master request list
sge_mutex_lock("event_master_request_mutex", __func__, __LINE__, &Event_Master_Control.request_mutex);
if (single_evr) {
Expand Down
59 changes: 18 additions & 41 deletions source/libs/sgeobj/ocs_Session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

pthread_mutex_t ocs::SessionManager::mutex = PTHREAD_MUTEX_INITIALIZER;
std::unordered_map<u_long64, ocs::SessionManager::Session> ocs::SessionManager::session_map;
u_long64 ocs::SessionManager::process_unique_id = 0;

/**
* @brief Get a session ID that is unique for all requests of a user.
Expand All @@ -40,13 +41,13 @@ std::unordered_map<u_long64, ocs::SessionManager::Session> ocs::SessionManager::
* to identify requests for a user therefore we use only one session for the
* users whose hash would be 0 and 1.
*
* @param packet GDI packet
* @param user username for whom the session ID is generated
* @return session ID
*/
u_long64
ocs::SessionManager::get_session_id(sge_gdi_packet_class_t *packet) {
std::hash<std::string> hasher;
std::string hash_input(packet->user);
ocs::SessionManager::get_session_id(const char *user) {
constexpr std::hash<std::string> hasher;
const std::string hash_input(user);
u_long64 session_id = hasher(hash_input);

// avoid the use of session ID 0 (GDI_SESSION_NONE)
Expand All @@ -71,13 +72,12 @@ ocs::SessionManager::get_session_id(sge_gdi_packet_class_t *packet) {
* @param write_event_id unique ID for the last write event
*/
void
ocs::SessionManager::set_write_unique_id(u_long64 session_id, u_long64 write_event_id) {
ocs::SessionManager::set_write_unique_id(const u_long64 session_id, const u_long64 write_event_id) {
Session s{};
u_long64 time = sge_get_gmt64();
const u_long64 time = sge_get_gmt64();

pthread_mutex_lock(&mutex);
auto it = session_map.find(session_id);
if (it != session_map.end()) {
if (const auto it = session_map.find(session_id); it != session_map.end()) {
s = it->second;
}
s.write_unique_id = write_event_id;
Expand All @@ -103,58 +103,35 @@ ocs::SessionManager::set_write_unique_id(u_long64 session_id, u_long64 write_eve
* Updates the time when the session was accessed the last time but only for
* specific sessions.
*
* @param session_id session ID
* @param process_event_id unique ID for the last event
*/
void
ocs::SessionManager::set_process_unique_id(u_long64 session_id, u_long64 process_event_id) {
Session s{};

ocs::SessionManager::set_process_unique_id(const u_long64 process_event_id) {
pthread_mutex_lock(&mutex);
if (session_id != GDI_SESSION_NONE) {
auto it = session_map.find(session_id);
if (it != session_map.end()) {
s = it->second;
}
s.process_unique_id = process_event_id;
session_map[session_id] = s;
} else {
for (auto & it : session_map) {
s = it.second;
s.process_unique_id = process_event_id;
session_map[it.first] = s;
}
}
ocs::SessionManager::process_unique_id = process_event_id;
pthread_mutex_unlock(&mutex);
}

/**
* @brief Check if a sessions allows access for a reader
*
* @param session_id session ID
* @param is_uptodate true if the access is allowed
* @return true if the session exists
*/
bool
ocs::SessionManager::is_uptodate(u_long64 session_id, bool *is_uptodate) {
bool ret = true;

ocs::SessionManager::is_uptodate(const u_long64 session_id) {
bool ret;
pthread_mutex_lock(&mutex);
auto it = session_map.find(session_id);
if (it != session_map.end()) {
auto [write_time, write_event_id, process_event_id] = it->second;

if (const auto it = session_map.find(session_id); it != session_map.end()) {
// all write events have been processed (==) => true
// more events are processed than there are expected write events (<) => true
// there are still write events that we expect that have not been processed (>) => false
*is_uptodate = (write_event_id <= process_event_id);
ret = true;
ret = (it->second.write_unique_id <= process_unique_id);
} else {
// session does not exist, and we have to handle a RO-request
// We can be sure that there is no previous write request because
// otherwise the session would exit => session is up-to-date
*is_uptodate = true;
ret = false;
ret = true;
}
pthread_mutex_unlock(&mutex);
return ret;
Expand All @@ -165,8 +142,8 @@ ocs::SessionManager::is_uptodate(u_long64 session_id, bool *is_uptodate) {
*/
void
ocs::SessionManager::remove_unused() {
u_long64 time = sge_get_gmt64();
u_long64 time_threshold = time - 15 * 60 * 1000000;
const u_long64 time = sge_get_gmt64();
const u_long64 time_threshold = time - 15 * 60 * 1000000;

pthread_mutex_lock(&mutex);
for (auto it = session_map.begin(); it != session_map.end(); ) {
Expand Down Expand Up @@ -204,7 +181,7 @@ ocs::SessionManager::dump_all() {
pthread_mutex_lock(&mutex);
for (auto & [session_id, session] : session_map) {
DPRINTF("session %20lu: write_time=%16lu, write_unique_id=%lu, process_unique_id=%lu\n",
session_id, session.write_time, session.write_unique_id, session.process_unique_id);
session_id, session.write_time, session.write_unique_id, process_unique_id);
}
pthread_mutex_unlock(&mutex);
DRETURN_VOID;
Expand Down
10 changes: 5 additions & 5 deletions source/libs/sgeobj/ocs_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ namespace ocs {
struct Session {
u_long64 write_time; //< time when write_unique_id was set
u_long64 write_unique_id; //< unique id for the last write event
u_long64 process_unique_id; //< unique id for the last processed event
};

static pthread_mutex_t mutex; //< mutex that saves access to the session_map
static std::unordered_map<u_long64, Session> session_map; //< hashtable for sessions
static u_long64 process_unique_id; //< unique id for the last processed event

static void remove_unused();

public:
static u_long64 get_session_id(sge_gdi_packet_class_t *packet);
static u_long64 get_session_id(const char *user);

static void set_write_unique_id(u_long64 session_id, u_long64 write_event_id);
static void set_process_unique_id(u_long64 session_id, u_long64 process_event_id);
static bool is_uptodate(u_long64 session_id, bool *is_uptodate);
static void set_write_unique_id(const u_long64 session_id, const u_long64 write_event_id);
static void set_process_unique_id(const u_long64 process_event_id);
static bool is_uptodate(const u_long64 session_id);

static void session_cleanup_handler(te_event_t anEvent, monitoring_t *monitor);

Expand Down

0 comments on commit 20dfbea

Please sign in to comment.