From bab3f92c4c6f1ff0b8755a059e31ace341ce7a84 Mon Sep 17 00:00:00 2001 From: Ernst Bablick Date: Mon, 28 Oct 2024 20:22:34 +0100 Subject: [PATCH] EH: CS-668 add a GDI session store and feed it with data from mirror threads --- .../release-notes/03_major_enhancements.md | 19 +- .../qmaster/sge_qmaster_process_message.cc | 19 +- source/libs/sgeobj/CMakeLists.txt | 4 +- source/libs/sgeobj/ocs_Session.cc | 193 +++++++++++++++++- source/libs/sgeobj/ocs_Session.h | 34 +++ source/libs/sgeobj/sge_conf.cc | 27 ++- source/libs/sgeobj/sge_conf.h | 1 + 7 files changed, 281 insertions(+), 16 deletions(-) diff --git a/doc/markdown/manual/release-notes/03_major_enhancements.md b/doc/markdown/manual/release-notes/03_major_enhancements.md index c3a216482..6c3294f51 100644 --- a/doc/markdown/manual/release-notes/03_major_enhancements.md +++ b/doc/markdown/manual/release-notes/03_major_enhancements.md @@ -1,6 +1,22 @@ # Major Enhancements -## v9.0.1beta +## v9.0.2beta + +### Automatic Session Management + +Patch 9.0.2 introduces the new concept of automatic sessions. This concept allows the xxQS_NAMExx system to synchronise internal data stores more efficiently, so that client commands can be enforced to get the most recent data. Session management is enabled, but can be disabled by setting the `DISABLE_AUTOMATIC_SESSIONS` parameter in the `qmaster_params` of the cluster configuration. + +The default for the `qmaster_param` `DISABLE_SECONDARY_DS_READER` is now *false*. This means that the reader thread pool is enabled by default and does not need to be enabled manually as in patch 9.0.1. + +What does all this mean? Sessions ensure that commands that trigger changes within the cluster, such as submitting a job, modifying a queue or changing a complex value, are executed in a consistent way. Sessions ensure that the result of changing commands in the cluster is immediately visible to the user who initiated the change. Commands that only read data, such as `qstat`, `qhost` or `qconf -s...`, always return the most recent data although all read-requests in the system are executed completely in parallel to the xxQS_NAMExx core components. + +Unlike other workload management systems, session management in xxQS_NAMExx is automatic. There is no need to manually create or destroy sessions. Session management runs silently in the background to offload the most critical internal components. + +All this further enhances cluster performance in large environments and improves cluster responsiveness, especially with tens of thousands of execution nodes, thousands of active users and millions of jobs/day. + +(Available in Open Cluster Scheduler and Gridware Cluster Scheduler) + +## v9.0.1 ### Utilization of additional data stores and activation of new thread pools @@ -76,6 +92,7 @@ additional data stores that can be utilized by pools of threads. GDI (g:0.00,t:0.00,p:0.00)/s) out: 0.00m/s APT: 0.0001s/m idle: 98.54% wait: 0.00% time: 10.00s ``` +(Available in Open Cluster Scheduler and Gridware Cluster Scheduler) ## v9.0.0 diff --git a/source/daemons/qmaster/sge_qmaster_process_message.cc b/source/daemons/qmaster/sge_qmaster_process_message.cc index bfca2a7ac..8d5368d60 100644 --- a/source/daemons/qmaster/sge_qmaster_process_message.cc +++ b/source/daemons/qmaster/sge_qmaster_process_message.cc @@ -447,17 +447,20 @@ do_gdi_packet(struct_msg_t *aMsg, monitoring_t *monitor) { if (!mconf_get_disable_secondary_ds_reader()) { u_long64 session_id = ocs::SessionManager::get_session_id(packet); - // status of the session decides if the request is put into the reader request queue or the waiting queue - bool is_uptodate = false; - ocs::SessionManager::is_uptodate(session_id, &is_uptodate); - if (is_uptodate) { - queue = ReaderRequestQueue; - } else { - queue = ReaderWaitingRequestQueue; + // Reader DS is enabled so as default we will use the ReaderRequestQueue unless the auto sessions are enabled + queue = ReaderRequestQueue; + if (!mconf_get_disable_automatic_session()) { + + // Sessions are enabled so we have to check if the session is up-to-date + bool is_uptodate = false; + ocs::SessionManager::is_uptodate(session_id, &is_uptodate); + if (!is_uptodate) { + queue = ReaderWaitingRequestQueue; + } } } - // Store the decision about the queue also in the packet + // Store the decision about the DS also in the packet packet->ds_type = ds_type; sge_tq_store_notify(queue, SGE_TQ_GDI_PACKET, packet); } else { diff --git a/source/libs/sgeobj/CMakeLists.txt b/source/libs/sgeobj/CMakeLists.txt index 87852744b..183757de1 100644 --- a/source/libs/sgeobj/CMakeLists.txt +++ b/source/libs/sgeobj/CMakeLists.txt @@ -23,7 +23,9 @@ set(LIBRARY_NAME sgeobj) set(LIBRARY_SOURCES config.cc cull_parse_util.cc + ocs_binding_io.cc ocs_DataStore.cc + ocs_Session.cc parse.cc sge_ack.cc sge_advance_reservation.cc @@ -31,7 +33,6 @@ set(LIBRARY_SOURCES sge_answer.cc sge_attr.cc sge_binding.cc - ocs_binding_io.cc sge_calendar.cc sge_centry.cc sge_centry_rsmap.cc @@ -83,7 +84,6 @@ set(LIBRARY_SOURCES sge_utility.cc sge_var.cc ../../../../${PROJECT_FEATURES}/source/libs/sgeobj/ocs_RequestLimits.cc - ../../../../${PROJECT_FEATURES}/source/libs/sgeobj/ocs_Session.cc cull/sge_sub_object.cc) set(LIBRARY_INCLUDES ${SGE_INCLUDES} "./") diff --git a/source/libs/sgeobj/ocs_Session.cc b/source/libs/sgeobj/ocs_Session.cc index 92709823f..b1d04d490 100644 --- a/source/libs/sgeobj/ocs_Session.cc +++ b/source/libs/sgeobj/ocs_Session.cc @@ -18,8 +18,195 @@ ***************************************************************************/ /*___INFO__MARK_END_NEW__*/ -#include "sgeobj/ocs_Session.h" +#include +#include +#include +#include -namespace ocs { +#include "basis_types.h" + +#include "uti/sge_time.h" +#include "uti/sge_rmon_macros.h" + +#include "ocs_Session.h" + +pthread_mutex_t ocs::SessionManager::mutex = PTHREAD_MUTEX_INITIALIZER; +std::unordered_map ocs::SessionManager::session_map; + +/** + * @brief Get a session ID that is unique for all requests of a user. + * + * Session ID 0 has a special meaning (GDI_SESSION_NONE). It cannot be used + * to identify requests for a user therefore we use only one session for the + * users whose hash would be 0 and 1. + * + * @param packet GDI packet + * @return session ID + */ +u_long64 +ocs::SessionManager::get_session_id(sge_gdi_packet_class_t *packet) { + std::hash hasher; + std::string hash_input(packet->user); + u_long64 session_id = hasher(hash_input); + + // avoid the use of session ID 0 (GDI_SESSION_NONE) + // in has a hash is 0, we use 1 as session ID. This means + // that there might be only one session for the two users whose hash is 0 and 1 + if (session_id == GDI_SESSION_NONE) { + session_id = 1; + } + return session_id; +} + +/** + * @brief Set the unique ID for the last write event processed for the session. + * + * The unique ID is used to check if a session is up-to-date. + * + * Updates also the time when the session was accessed the last time. + * + * Implicitly creates a session if it does not exist. + * + * @param session_id session ID + * @param write_event_id unique ID for the last write event + */ +void +ocs::SessionManager::set_write_unique_id(u_long64 session_id, u_long64 write_event_id) { + Session s{}; + u_long64 time = sge_get_gmt64(); + + pthread_mutex_lock(&mutex); + auto it = session_map.find(session_id); + if (it != session_map.end()) { + s = it->second; + } + s.write_unique_id = write_event_id; + + // update the time when the session was accessed the last time + if (session_id != GDI_SESSION_NONE) { + s.write_time = time; + } + + session_map[session_id] = s; + pthread_mutex_unlock(&mutex); +} + +/** + * @brief Set the unique ID for the last event processed for the session. + * + * *processed* means 'handled by the mirror thread' of the corresponding DS. + * If session_id is GDI_SESSION_NONE, the process_unique_id is set for all sessions. + * + * Implicitly creates a session if it does not exist and if the session_id is + * a real session ID and not GDI_SESSION_NONE. + * + * Updates the time when the session was accessed the last time but only for + * specific sessions. + * + * @param session_id session ID + * @param process_event_id unique ID for the last event + */ +void +ocs::SessionManager::set_process_unique_id(u_long64 session_id, u_long64 process_event_id) { + Session s{}; + + pthread_mutex_lock(&mutex); + if (session_id != GDI_SESSION_NONE) { + auto it = session_map.find(session_id); + if (it != session_map.end()) { + s = it->second; + } + s.process_unique_id = process_event_id; + session_map[session_id] = s; + } else { + for (auto & it : session_map) { + s = it.second; + s.process_unique_id = process_event_id; + session_map[it.first] = s; + } + } + pthread_mutex_unlock(&mutex); +} + +/** + * @brief Check if a sessions allows access for a reader + * + * @param session_id session ID + * @param is_uptodate true if the access is allowed + * @return true if the session exists + */ +bool +ocs::SessionManager::is_uptodate(u_long64 session_id, bool *is_uptodate) { + bool ret = true; + + pthread_mutex_lock(&mutex); + auto it = session_map.find(session_id); + if (it != session_map.end()) { + auto [write_time, write_event_id, process_event_id] = it->second; + + // all write events have been processed (==) => true + // more events are processed than there are expected write events (<) => true + // there are still write events that we expect that have not been processed (>) => false + *is_uptodate = (write_event_id <= process_event_id); + ret = true; + } else { + // session does not exist, and we have to handle a RO-request + // We can be sure that there is no previous write request because + // otherwise the session would exit => session is up-to-date + *is_uptodate = true; + ret = false; + } + pthread_mutex_unlock(&mutex); + return ret; +} + +/** + * @brief Remove all sessions that have not been used for 15 minutes. + */ +void +ocs::SessionManager::remove_unused() { + u_long64 time = sge_get_gmt64(); + u_long64 time_threshold = time - 15 * 60 * 1000000; + + pthread_mutex_lock(&mutex); + for (auto it = session_map.begin(); it != session_map.end(); ) { + + // delete sessions that have a write time (== all expect for GDI_SESSION_NONE) + // and that is older than 15 minutes (time since last write happened) + if (it->second.write_time != 0 && it->second.write_time < time_threshold) { + it = session_map.erase(it); + } else { + ++it; + } + } + pthread_mutex_unlock(&mutex); +} + +/** + * @brief Event handler that removes unused sessions. + * + * @param anEvent event + * @param monitor monitoring object + */ +void +ocs::SessionManager::session_cleanup_handler(te_event_t anEvent, monitoring_t *monitor) { + DENTER(TOP_LAYER); + remove_unused(); + DRETURN_VOID; +} + +/** + * @brief Dump all sessions to the debug output. + */ +void +ocs::SessionManager::dump_all() { + DENTER(TOP_LAYER); + pthread_mutex_lock(&mutex); + for (auto & [session_id, session] : session_map) { + DPRINTF("session %20lu: write_time=%16lu, write_unique_id=%lu, process_unique_id=%lu\n", + session_id, session.write_time, session.write_unique_id, session.process_unique_id); + } + pthread_mutex_unlock(&mutex); + DRETURN_VOID; +} -} \ No newline at end of file diff --git a/source/libs/sgeobj/ocs_Session.h b/source/libs/sgeobj/ocs_Session.h index 6f332ee94..4568633ad 100644 --- a/source/libs/sgeobj/ocs_Session.h +++ b/source/libs/sgeobj/ocs_Session.h @@ -19,5 +19,39 @@ ***************************************************************************/ /*___INFO__MARK_END_NEW__*/ +#include + +#include + +#include "uti/sge_monitor.h" + +#include "gdi/sge_gdi_packet.h" + +#include "sge_qmaster_timed_event.h" + namespace ocs { + class SessionManager { + private: + struct Session { + u_long64 write_time; //< time when write_unique_id was set + u_long64 write_unique_id; //< unique id for the last write event + u_long64 process_unique_id; //< unique id for the last processed event + }; + + static pthread_mutex_t mutex; //< mutex that saves access to the session_map + static std::unordered_map session_map; //< hashtable for sessions + + static void remove_unused(); + + public: + static u_long64 get_session_id(sge_gdi_packet_class_t *packet); + + static void set_write_unique_id(u_long64 session_id, u_long64 write_event_id); + static void set_process_unique_id(u_long64 session_id, u_long64 process_event_id); + static bool is_uptodate(u_long64 session_id, bool *is_uptodate); + + static void session_cleanup_handler(te_event_t anEvent, monitoring_t *monitor); + + static void dump_all(); + }; } \ No newline at end of file diff --git a/source/libs/sgeobj/sge_conf.cc b/source/libs/sgeobj/sge_conf.cc index 4d92a64a5..2950bddf0 100644 --- a/source/libs/sgeobj/sge_conf.cc +++ b/source/libs/sgeobj/sge_conf.cc @@ -150,9 +150,16 @@ static bool is_monitor_message = true; static bool use_qidle = false; static bool disable_reschedule = false; static bool disable_secondary_ds = false; + #define DEFAULT_DISABLE_SECONDARY_DS_READER (false) static bool disable_secondary_ds_reader = DEFAULT_DISABLE_SECONDARY_DS_READER; -static bool disable_secondary_ds_execd = false; + +#define DEFAULT_DISABLE_SECONDARY_DS_EXECD (false) +static bool disable_secondary_ds_execd = DEFAULT_DISABLE_SECONDARY_DS_EXECD; + +#define DEFAULT_DISABLE_AUTOMATIC_SESSIONS (false) +static bool disable_automatic_sessions = DEFAULT_DISABLE_AUTOMATIC_SESSIONS; + static bool prof_listener_thrd = false; static bool prof_worker_thrd = false; static bool prof_signal_thrd = false; @@ -679,7 +686,8 @@ int merge_configuration(lList **answer_list, u_long32 progid, const char *cell_r disable_reschedule = false; disable_secondary_ds = false; disable_secondary_ds_reader = DEFAULT_DISABLE_SECONDARY_DS_READER; - disable_secondary_ds_execd = false; + disable_secondary_ds_execd = DEFAULT_DISABLE_SECONDARY_DS_EXECD; + disable_automatic_sessions = DEFAULT_DISABLE_AUTOMATIC_SESSIONS; simulate_execds = false; simulate_jobs = false; prof_listener_thrd = false; @@ -784,6 +792,9 @@ int merge_configuration(lList **answer_list, u_long32 progid, const char *cell_r if (parse_bool_param(s, "DISABLE_SECONDARY_DS_EXECD", &disable_secondary_ds_execd)) { continue; } + if (parse_bool_param(s, "DISABLE_AUTOMATIC_SESSIONS", &disable_automatic_sessions)) { + continue; + } if (parse_int_param(s, "MAX_DS_DEVIATION", &max_ds_deviation, TYPE_TIM)) { if (max_ds_deviation < 0 || max_ds_deviation > 5000) { max_ds_deviation = DEFAULT_DS_DEVIATION; @@ -2350,6 +2361,18 @@ bool mconf_get_disable_secondary_ds_execd() { DRETURN(ret); } +bool mconf_get_disable_automatic_session() { + bool ret; + + DENTER(BASIS_LAYER); + SGE_LOCK(LOCK_MASTER_CONF, LOCK_READ); + + ret = disable_automatic_sessions; + + SGE_UNLOCK(LOCK_MASTER_CONF, LOCK_READ); + DRETURN(ret); +} + int mconf_get_scheduler_timeout() { int timeout; diff --git a/source/libs/sgeobj/sge_conf.h b/source/libs/sgeobj/sge_conf.h index c69a51ea8..6d8bb92eb 100644 --- a/source/libs/sgeobj/sge_conf.h +++ b/source/libs/sgeobj/sge_conf.h @@ -144,6 +144,7 @@ bool mconf_get_disable_reschedule(); bool mconf_get_disable_secondary_ds(); bool mconf_get_disable_secondary_ds_reader(); bool mconf_get_disable_secondary_ds_execd(); +bool mconf_get_disable_automatic_session(); int mconf_get_scheduler_timeout(); int mconf_get_max_dynamic_event_clients(); void mconf_set_max_dynamic_event_clients(int value);