Skip to content

Commit

Permalink
EH: CS-668 add a GDI session store and feed it with data from mirror …
Browse files Browse the repository at this point in the history
…threads
  • Loading branch information
ernst-bablick committed Oct 28, 2024
1 parent 7eb0957 commit bab3f92
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 16 deletions.
19 changes: 18 additions & 1 deletion doc/markdown/manual/release-notes/03_major_enhancements.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
# Major Enhancements

## v9.0.1beta
## v9.0.2beta

### Automatic Session Management

Patch 9.0.2 introduces the new concept of automatic sessions. This concept allows the xxQS_NAMExx system to synchronise internal data stores more efficiently, so that client commands can be enforced to get the most recent data. Session management is enabled, but can be disabled by setting the `DISABLE_AUTOMATIC_SESSIONS` parameter in the `qmaster_params` of the cluster configuration.

The default for the `qmaster_param` `DISABLE_SECONDARY_DS_READER` is now *false*. This means that the reader thread pool is enabled by default and does not need to be enabled manually as in patch 9.0.1.

What does all this mean? Sessions ensure that commands that trigger changes within the cluster, such as submitting a job, modifying a queue or changing a complex value, are executed in a consistent way. Sessions ensure that the result of changing commands in the cluster is immediately visible to the user who initiated the change. Commands that only read data, such as `qstat`, `qhost` or `qconf -s...`, always return the most recent data although all read-requests in the system are executed completely in parallel to the xxQS_NAMExx core components.

Unlike other workload management systems, session management in xxQS_NAMExx is automatic. There is no need to manually create or destroy sessions. Session management runs silently in the background to offload the most critical internal components.

All this further enhances cluster performance in large environments and improves cluster responsiveness, especially with tens of thousands of execution nodes, thousands of active users and millions of jobs/day.

(Available in Open Cluster Scheduler and Gridware Cluster Scheduler)

## v9.0.1

### Utilization of additional data stores and activation of new thread pools

Expand Down Expand Up @@ -76,6 +92,7 @@ additional data stores that can be utilized by pools of threads.
GDI (g:0.00,t:0.00,p:0.00)/s)
out: 0.00m/s APT: 0.0001s/m idle: 98.54% wait: 0.00% time: 10.00s
```
(Available in Open Cluster Scheduler and Gridware Cluster Scheduler)

## v9.0.0

Expand Down
19 changes: 11 additions & 8 deletions source/daemons/qmaster/sge_qmaster_process_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,17 +447,20 @@ do_gdi_packet(struct_msg_t *aMsg, monitoring_t *monitor) {
if (!mconf_get_disable_secondary_ds_reader()) {
u_long64 session_id = ocs::SessionManager::get_session_id(packet);

// status of the session decides if the request is put into the reader request queue or the waiting queue
bool is_uptodate = false;
ocs::SessionManager::is_uptodate(session_id, &is_uptodate);
if (is_uptodate) {
queue = ReaderRequestQueue;
} else {
queue = ReaderWaitingRequestQueue;
// Reader DS is enabled so as default we will use the ReaderRequestQueue unless the auto sessions are enabled
queue = ReaderRequestQueue;
if (!mconf_get_disable_automatic_session()) {

// Sessions are enabled so we have to check if the session is up-to-date
bool is_uptodate = false;
ocs::SessionManager::is_uptodate(session_id, &is_uptodate);
if (!is_uptodate) {
queue = ReaderWaitingRequestQueue;
}
}
}

// Store the decision about the queue also in the packet
// Store the decision about the DS also in the packet
packet->ds_type = ds_type;
sge_tq_store_notify(queue, SGE_TQ_GDI_PACKET, packet);
} else {
Expand Down
4 changes: 2 additions & 2 deletions source/libs/sgeobj/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ set(LIBRARY_NAME sgeobj)
set(LIBRARY_SOURCES
config.cc
cull_parse_util.cc
ocs_binding_io.cc
ocs_DataStore.cc
ocs_Session.cc
parse.cc
sge_ack.cc
sge_advance_reservation.cc
sge_all_listsL.cc
sge_answer.cc
sge_attr.cc
sge_binding.cc
ocs_binding_io.cc
sge_calendar.cc
sge_centry.cc
sge_centry_rsmap.cc
Expand Down Expand Up @@ -83,7 +84,6 @@ set(LIBRARY_SOURCES
sge_utility.cc
sge_var.cc
../../../../${PROJECT_FEATURES}/source/libs/sgeobj/ocs_RequestLimits.cc
../../../../${PROJECT_FEATURES}/source/libs/sgeobj/ocs_Session.cc
cull/sge_sub_object.cc)
set(LIBRARY_INCLUDES ${SGE_INCLUDES} "./")

Expand Down
193 changes: 190 additions & 3 deletions source/libs/sgeobj/ocs_Session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,195 @@
***************************************************************************/
/*___INFO__MARK_END_NEW__*/

#include "sgeobj/ocs_Session.h"
#include <functional>
#include <string>
#include <unordered_map>
#include <vector>

namespace ocs {
#include "basis_types.h"

#include "uti/sge_time.h"
#include "uti/sge_rmon_macros.h"

#include "ocs_Session.h"

pthread_mutex_t ocs::SessionManager::mutex = PTHREAD_MUTEX_INITIALIZER;
std::unordered_map<u_long64, ocs::SessionManager::Session> ocs::SessionManager::session_map;

/**
* @brief Get a session ID that is unique for all requests of a user.
*
* Session ID 0 has a special meaning (GDI_SESSION_NONE). It cannot be used
* to identify requests for a user therefore we use only one session for the
* users whose hash would be 0 and 1.
*
* @param packet GDI packet
* @return session ID
*/
u_long64
ocs::SessionManager::get_session_id(sge_gdi_packet_class_t *packet) {
std::hash<std::string> hasher;
std::string hash_input(packet->user);
u_long64 session_id = hasher(hash_input);

// avoid the use of session ID 0 (GDI_SESSION_NONE)
// in has a hash is 0, we use 1 as session ID. This means
// that there might be only one session for the two users whose hash is 0 and 1
if (session_id == GDI_SESSION_NONE) {
session_id = 1;
}
return session_id;
}

/**
* @brief Set the unique ID for the last write event processed for the session.
*
* The unique ID is used to check if a session is up-to-date.
*
* Updates also the time when the session was accessed the last time.
*
* Implicitly creates a session if it does not exist.
*
* @param session_id session ID
* @param write_event_id unique ID for the last write event
*/
void
ocs::SessionManager::set_write_unique_id(u_long64 session_id, u_long64 write_event_id) {
Session s{};
u_long64 time = sge_get_gmt64();

pthread_mutex_lock(&mutex);
auto it = session_map.find(session_id);
if (it != session_map.end()) {
s = it->second;
}
s.write_unique_id = write_event_id;

// update the time when the session was accessed the last time
if (session_id != GDI_SESSION_NONE) {
s.write_time = time;
}

session_map[session_id] = s;
pthread_mutex_unlock(&mutex);
}

/**
* @brief Set the unique ID for the last event processed for the session.
*
* *processed* means 'handled by the mirror thread' of the corresponding DS.
* If session_id is GDI_SESSION_NONE, the process_unique_id is set for all sessions.
*
* Implicitly creates a session if it does not exist and if the session_id is
* a real session ID and not GDI_SESSION_NONE.
*
* Updates the time when the session was accessed the last time but only for
* specific sessions.
*
* @param session_id session ID
* @param process_event_id unique ID for the last event
*/
void
ocs::SessionManager::set_process_unique_id(u_long64 session_id, u_long64 process_event_id) {
Session s{};

pthread_mutex_lock(&mutex);
if (session_id != GDI_SESSION_NONE) {
auto it = session_map.find(session_id);
if (it != session_map.end()) {
s = it->second;
}
s.process_unique_id = process_event_id;
session_map[session_id] = s;
} else {
for (auto & it : session_map) {
s = it.second;
s.process_unique_id = process_event_id;
session_map[it.first] = s;
}
}
pthread_mutex_unlock(&mutex);
}

/**
* @brief Check if a sessions allows access for a reader
*
* @param session_id session ID
* @param is_uptodate true if the access is allowed
* @return true if the session exists
*/
bool
ocs::SessionManager::is_uptodate(u_long64 session_id, bool *is_uptodate) {
bool ret = true;

pthread_mutex_lock(&mutex);
auto it = session_map.find(session_id);
if (it != session_map.end()) {
auto [write_time, write_event_id, process_event_id] = it->second;

// all write events have been processed (==) => true
// more events are processed than there are expected write events (<) => true
// there are still write events that we expect that have not been processed (>) => false
*is_uptodate = (write_event_id <= process_event_id);
ret = true;
} else {
// session does not exist, and we have to handle a RO-request
// We can be sure that there is no previous write request because
// otherwise the session would exit => session is up-to-date
*is_uptodate = true;
ret = false;
}
pthread_mutex_unlock(&mutex);
return ret;
}

/**
* @brief Remove all sessions that have not been used for 15 minutes.
*/
void
ocs::SessionManager::remove_unused() {
u_long64 time = sge_get_gmt64();
u_long64 time_threshold = time - 15 * 60 * 1000000;

pthread_mutex_lock(&mutex);
for (auto it = session_map.begin(); it != session_map.end(); ) {

// delete sessions that have a write time (== all expect for GDI_SESSION_NONE)
// and that is older than 15 minutes (time since last write happened)
if (it->second.write_time != 0 && it->second.write_time < time_threshold) {
it = session_map.erase(it);
} else {
++it;
}
}
pthread_mutex_unlock(&mutex);
}

/**
* @brief Event handler that removes unused sessions.
*
* @param anEvent event
* @param monitor monitoring object
*/
void
ocs::SessionManager::session_cleanup_handler(te_event_t anEvent, monitoring_t *monitor) {
DENTER(TOP_LAYER);
remove_unused();
DRETURN_VOID;
}

/**
* @brief Dump all sessions to the debug output.
*/
void
ocs::SessionManager::dump_all() {
DENTER(TOP_LAYER);
pthread_mutex_lock(&mutex);
for (auto & [session_id, session] : session_map) {
DPRINTF("session %20lu: write_time=%16lu, write_unique_id=%lu, process_unique_id=%lu\n",
session_id, session.write_time, session.write_unique_id, session.process_unique_id);
}
pthread_mutex_unlock(&mutex);
DRETURN_VOID;
}

}
34 changes: 34 additions & 0 deletions source/libs/sgeobj/ocs_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,39 @@
***************************************************************************/
/*___INFO__MARK_END_NEW__*/

#include <unordered_map>

#include <pthread.h>

#include "uti/sge_monitor.h"

#include "gdi/sge_gdi_packet.h"

#include "sge_qmaster_timed_event.h"

namespace ocs {
class SessionManager {
private:
struct Session {
u_long64 write_time; //< time when write_unique_id was set
u_long64 write_unique_id; //< unique id for the last write event
u_long64 process_unique_id; //< unique id for the last processed event
};

static pthread_mutex_t mutex; //< mutex that saves access to the session_map
static std::unordered_map<u_long64, Session> session_map; //< hashtable for sessions

static void remove_unused();

public:
static u_long64 get_session_id(sge_gdi_packet_class_t *packet);

static void set_write_unique_id(u_long64 session_id, u_long64 write_event_id);
static void set_process_unique_id(u_long64 session_id, u_long64 process_event_id);
static bool is_uptodate(u_long64 session_id, bool *is_uptodate);

static void session_cleanup_handler(te_event_t anEvent, monitoring_t *monitor);

static void dump_all();
};
}
27 changes: 25 additions & 2 deletions source/libs/sgeobj/sge_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,16 @@ static bool is_monitor_message = true;
static bool use_qidle = false;
static bool disable_reschedule = false;
static bool disable_secondary_ds = false;

#define DEFAULT_DISABLE_SECONDARY_DS_READER (false)
static bool disable_secondary_ds_reader = DEFAULT_DISABLE_SECONDARY_DS_READER;
static bool disable_secondary_ds_execd = false;

#define DEFAULT_DISABLE_SECONDARY_DS_EXECD (false)
static bool disable_secondary_ds_execd = DEFAULT_DISABLE_SECONDARY_DS_EXECD;

#define DEFAULT_DISABLE_AUTOMATIC_SESSIONS (false)
static bool disable_automatic_sessions = DEFAULT_DISABLE_AUTOMATIC_SESSIONS;

static bool prof_listener_thrd = false;
static bool prof_worker_thrd = false;
static bool prof_signal_thrd = false;
Expand Down Expand Up @@ -679,7 +686,8 @@ int merge_configuration(lList **answer_list, u_long32 progid, const char *cell_r
disable_reschedule = false;
disable_secondary_ds = false;
disable_secondary_ds_reader = DEFAULT_DISABLE_SECONDARY_DS_READER;
disable_secondary_ds_execd = false;
disable_secondary_ds_execd = DEFAULT_DISABLE_SECONDARY_DS_EXECD;
disable_automatic_sessions = DEFAULT_DISABLE_AUTOMATIC_SESSIONS;
simulate_execds = false;
simulate_jobs = false;
prof_listener_thrd = false;
Expand Down Expand Up @@ -784,6 +792,9 @@ int merge_configuration(lList **answer_list, u_long32 progid, const char *cell_r
if (parse_bool_param(s, "DISABLE_SECONDARY_DS_EXECD", &disable_secondary_ds_execd)) {
continue;
}
if (parse_bool_param(s, "DISABLE_AUTOMATIC_SESSIONS", &disable_automatic_sessions)) {
continue;
}
if (parse_int_param(s, "MAX_DS_DEVIATION", &max_ds_deviation, TYPE_TIM)) {
if (max_ds_deviation < 0 || max_ds_deviation > 5000) {
max_ds_deviation = DEFAULT_DS_DEVIATION;
Expand Down Expand Up @@ -2350,6 +2361,18 @@ bool mconf_get_disable_secondary_ds_execd() {
DRETURN(ret);
}

bool mconf_get_disable_automatic_session() {
bool ret;

DENTER(BASIS_LAYER);
SGE_LOCK(LOCK_MASTER_CONF, LOCK_READ);

ret = disable_automatic_sessions;

SGE_UNLOCK(LOCK_MASTER_CONF, LOCK_READ);
DRETURN(ret);
}

int mconf_get_scheduler_timeout() {
int timeout;

Expand Down
1 change: 1 addition & 0 deletions source/libs/sgeobj/sge_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ bool mconf_get_disable_reschedule();
bool mconf_get_disable_secondary_ds();
bool mconf_get_disable_secondary_ds_reader();
bool mconf_get_disable_secondary_ds_execd();
bool mconf_get_disable_automatic_session();
int mconf_get_scheduler_timeout();
int mconf_get_max_dynamic_event_clients();
void mconf_set_max_dynamic_event_clients(int value);
Expand Down

0 comments on commit bab3f92

Please sign in to comment.