From 20dfbea676d20414a882d29482f45c58d8d5089f Mon Sep 17 00:00:00 2001 From: Ernst Bablick Date: Tue, 29 Oct 2024 07:47:52 +0100 Subject: [PATCH] BF: CS-746 Improve automatic session performance with 10k of execution hosts EH: CS-744 DRMAA requests can be handled by secondary DS if sessions are enabled EH: CS-743 All execds should fetch configuration updates from reader DS/threads --- source/daemons/qmaster/ocs_MirrorDataStore.h | 2 +- .../qmaster/ocs_MirrorReaderDataStore.cc | 15 +++-- .../qmaster/sge_qmaster_process_message.cc | 24 ++++---- .../qmaster/sge_thread_event_master.cc | 4 +- source/libs/evm/sge_event_master.cc | 18 +++++- source/libs/sgeobj/ocs_Session.cc | 59 ++++++------------- source/libs/sgeobj/ocs_Session.h | 10 ++-- 7 files changed, 63 insertions(+), 69 deletions(-) diff --git a/source/daemons/qmaster/ocs_MirrorDataStore.h b/source/daemons/qmaster/ocs_MirrorDataStore.h index cd32c2ae7..f88fa28c2 100644 --- a/source/daemons/qmaster/ocs_MirrorDataStore.h +++ b/source/daemons/qmaster/ocs_MirrorDataStore.h @@ -61,7 +61,7 @@ namespace ocs { [[noreturn]] virtual void *main([[maybe_unused]] void *arg); virtual void subscribe_events() = 0; - virtual void update_sessions_and_move_requests(u_long64 unique_id) = 0; + virtual void update_sessions_and_move_requests(const u_long64 unique_id) = 0; static void event_mirror_update_func([[maybe_unused]] u_long32 ec_id, [[maybe_unused]] lList **answer_list, lList *event_list, void *arg); diff --git a/source/daemons/qmaster/ocs_MirrorReaderDataStore.cc b/source/daemons/qmaster/ocs_MirrorReaderDataStore.cc index c95ee191d..8ec91c83f 100644 --- a/source/daemons/qmaster/ocs_MirrorReaderDataStore.cc +++ b/source/daemons/qmaster/ocs_MirrorReaderDataStore.cc @@ -37,24 +37,23 @@ namespace ocs { evc->ec_set_edtime(evc, 1); } - void MirrorReaderDataStore::update_sessions_and_move_requests(u_long unique_id){ + void MirrorReaderDataStore::update_sessions_and_move_requests(const u_long unique_id){ DENTER(TOP_LAYER); // Update the session with the unique ID of the last event - SessionManager::set_process_unique_id(GDI_SESSION_NONE, unique_id); + SessionManager::set_process_unique_id(unique_id); // Move waiting reader requests that can now be handled // to the reader request queue (or global request queue if readers are disabled) - int moved_elements = sge_tq_move_from_to_if(ReaderWaitingRequestQueue, ReaderRequestQueue, + const int moved_elements = sge_tq_move_from_to_if(ReaderWaitingRequestQueue, ReaderRequestQueue, [](const void *always_nullptr, const void *task_void) -> int { // Find the packet stored in the task of the TQ - auto *task = *static_cast(task_void); - auto *packet = static_cast(task->data); + const auto *task = *static_cast(task_void); + const auto *packet = static_cast(task->data); // Check if the session is up-to-date - u_long64 session_id = SessionManager::get_session_id(packet); - bool is_uptodate = false; - SessionManager::is_uptodate(session_id, &is_uptodate); + const u_long64 session_id = SessionManager::get_session_id(packet->user); + const bool is_uptodate = SessionManager::is_uptodate(session_id); // Return the outcome so that the task is moved return is_uptodate ? 0 : -1; diff --git a/source/daemons/qmaster/sge_qmaster_process_message.cc b/source/daemons/qmaster/sge_qmaster_process_message.cc index 8d5368d60..69d969836 100644 --- a/source/daemons/qmaster/sge_qmaster_process_message.cc +++ b/source/daemons/qmaster/sge_qmaster_process_message.cc @@ -270,16 +270,22 @@ static ocs::DataStore::Id get_gdi_executor_ds(sge_gdi_packet_class_t *packet) { DENTER(TOP_LAYER); - // check the packet itself + // Usually the request type defines which DS to be used but there are some exceptions where the global ds + // is enforced: + // - Internal GDI requests + // - DRMAA requests if automatic sessions are disabled (because DRMAA 1 must have a concise view on the data + // otherwise the lib will fail). + // - Execd requests if secondary DS are disabled for execd (only for test purpose) if (packet->is_intern_request) { // Internal GDI requests will always be executed with access to the GLOBAL data store DRETURN(ocs::DataStore::GLOBAL); } else if (strcmp(packet->commproc, prognames[DRMAA]) == 0) { - // DRMAA requests will always be executed with access to the GLOBAL data store - // (@todo EB: as long as we do not have automatic GDI sessions) - DRETURN(ocs::DataStore::GLOBAL); + // DRMAA-requests will only be handled by reader if automatic sessions are enabled + if (mconf_get_disable_automatic_session()) { + DRETURN(ocs::DataStore::GLOBAL); + } } else if (strcmp(packet->commproc, prognames[EXECD]) == 0 && mconf_get_disable_secondary_ds_execd()) { - // request coming from execd should be handled with GLOBAL DS if secondary DS are disabled for execd + // request coming from execd should be handled with global DS if secondary DS are disabled for execd DRETURN(ocs::DataStore::GLOBAL); } @@ -362,7 +368,7 @@ do_gdi_packet(struct_msg_t *aMsg, monitoring_t *monitor) { &(packet->gid), packet->group, sizeof(packet->group), &packet->amount, &packet->grp_array); - packet->gdi_session = ocs::SessionManager::get_session_id(packet); + packet->gdi_session = ocs::SessionManager::get_session_id(packet->user); } // check CSP mode if enabled @@ -445,16 +451,14 @@ do_gdi_packet(struct_msg_t *aMsg, monitoring_t *monitor) { // Default is the global request queue unless readers are enabled sge_tq_queue_t *queue = GlobalRequestQueue; if (!mconf_get_disable_secondary_ds_reader()) { - u_long64 session_id = ocs::SessionManager::get_session_id(packet); + u_long64 session_id = ocs::SessionManager::get_session_id(packet->user); // Reader DS is enabled so as default we will use the ReaderRequestQueue unless the auto sessions are enabled queue = ReaderRequestQueue; if (!mconf_get_disable_automatic_session()) { // Sessions are enabled so we have to check if the session is up-to-date - bool is_uptodate = false; - ocs::SessionManager::is_uptodate(session_id, &is_uptodate); - if (!is_uptodate) { + if (!ocs::SessionManager::is_uptodate(session_id)) { queue = ReaderWaitingRequestQueue; } } diff --git a/source/daemons/qmaster/sge_thread_event_master.cc b/source/daemons/qmaster/sge_thread_event_master.cc index 54935422d..244116ed3 100644 --- a/source/daemons/qmaster/sge_thread_event_master.cc +++ b/source/daemons/qmaster/sge_thread_event_master.cc @@ -120,7 +120,7 @@ sge_event_master_terminate() { [[noreturn]] void * sge_event_master_main(void *arg) { - auto *thread_config = (cl_thread_settings_t *) arg; + auto *thread_config = static_cast(arg); monitoring_t monitor; monitoring_t *p_monitor = &monitor; u_long64 next_prof_output = 0; @@ -129,7 +129,7 @@ sge_event_master_main(void *arg) { // set thread name and id used by logging an others const char *thread_name = thread_config->thread_name; - int thread_id = thread_config->thread_id; + const int thread_id = thread_config->thread_id; component_set_thread_name(thread_name); component_set_thread_id(thread_id); DPRINTF(SFN "(%d) started\n", thread_name, thread_id); diff --git a/source/libs/evm/sge_event_master.cc b/source/libs/evm/sge_event_master.cc index 9659733e9..f9d099228 100644 --- a/source/libs/evm/sge_event_master.cc +++ b/source/libs/evm/sge_event_master.cc @@ -1513,6 +1513,11 @@ sge_create_event(u_long32 number, u_long64 timestamp, ev_event type, u_long32 in DRETURN(etp); } +/** @brief Add a unique ID to all events part of the evr + * + * @param evr - the event request object + * @param unique_id - the unique ID to add + */ static void add_unique_id_to_evr(lListElem *evr, u_long64 unique_id) { lListElem *ev; @@ -1531,7 +1536,8 @@ add_list_event_for_client_after_commit(lListElem *evr, lList *evr_list, u_long64 // Find the next unique ID for the event u_long64 unique_id = oge_get_next_unique_event_id(); - // Add a unique ID to to events part of the commited evr or evr_list. + // Add a unique ID to events part of the commited evr or evr_list. + // Check also if the event is execd related (like the config events) if (single_evr) { add_unique_id_to_evr(evr, unique_id); } else { @@ -1542,9 +1548,17 @@ add_list_event_for_client_after_commit(lListElem *evr, lList *evr_list, u_long64 } } - // Process the unique ID for the event that was triggered + // Process the unique ID for the event that was triggered. session is the same as for the initiator of the request. ocs::SessionManager::set_write_unique_id(gdi_session, unique_id); + // All execd send GDI requests as admin user. In order to enforce them to see most current information + // via reader DS, we need to set the unique ID for the admin user session. + static const u_long64 admin_user_session = ocs::SessionManager::get_session_id(bootstrap_get_admin_user()); + ocs::SessionManager::set_write_unique_id(admin_user_session, unique_id); + + // @TODO: EB remove this + ocs::SessionManager::dump_all(); + // Add the request to the event master request list sge_mutex_lock("event_master_request_mutex", __func__, __LINE__, &Event_Master_Control.request_mutex); if (single_evr) { diff --git a/source/libs/sgeobj/ocs_Session.cc b/source/libs/sgeobj/ocs_Session.cc index b1d04d490..44555e89b 100644 --- a/source/libs/sgeobj/ocs_Session.cc +++ b/source/libs/sgeobj/ocs_Session.cc @@ -32,6 +32,7 @@ pthread_mutex_t ocs::SessionManager::mutex = PTHREAD_MUTEX_INITIALIZER; std::unordered_map ocs::SessionManager::session_map; +u_long64 ocs::SessionManager::process_unique_id = 0; /** * @brief Get a session ID that is unique for all requests of a user. @@ -40,13 +41,13 @@ std::unordered_map ocs::SessionManager:: * to identify requests for a user therefore we use only one session for the * users whose hash would be 0 and 1. * - * @param packet GDI packet + * @param user username for whom the session ID is generated * @return session ID */ u_long64 -ocs::SessionManager::get_session_id(sge_gdi_packet_class_t *packet) { - std::hash hasher; - std::string hash_input(packet->user); +ocs::SessionManager::get_session_id(const char *user) { + constexpr std::hash hasher; + const std::string hash_input(user); u_long64 session_id = hasher(hash_input); // avoid the use of session ID 0 (GDI_SESSION_NONE) @@ -71,13 +72,12 @@ ocs::SessionManager::get_session_id(sge_gdi_packet_class_t *packet) { * @param write_event_id unique ID for the last write event */ void -ocs::SessionManager::set_write_unique_id(u_long64 session_id, u_long64 write_event_id) { +ocs::SessionManager::set_write_unique_id(const u_long64 session_id, const u_long64 write_event_id) { Session s{}; - u_long64 time = sge_get_gmt64(); + const u_long64 time = sge_get_gmt64(); pthread_mutex_lock(&mutex); - auto it = session_map.find(session_id); - if (it != session_map.end()) { + if (const auto it = session_map.find(session_id); it != session_map.end()) { s = it->second; } s.write_unique_id = write_event_id; @@ -103,28 +103,12 @@ ocs::SessionManager::set_write_unique_id(u_long64 session_id, u_long64 write_eve * Updates the time when the session was accessed the last time but only for * specific sessions. * - * @param session_id session ID * @param process_event_id unique ID for the last event */ void -ocs::SessionManager::set_process_unique_id(u_long64 session_id, u_long64 process_event_id) { - Session s{}; - +ocs::SessionManager::set_process_unique_id(const u_long64 process_event_id) { pthread_mutex_lock(&mutex); - if (session_id != GDI_SESSION_NONE) { - auto it = session_map.find(session_id); - if (it != session_map.end()) { - s = it->second; - } - s.process_unique_id = process_event_id; - session_map[session_id] = s; - } else { - for (auto & it : session_map) { - s = it.second; - s.process_unique_id = process_event_id; - session_map[it.first] = s; - } - } + ocs::SessionManager::process_unique_id = process_event_id; pthread_mutex_unlock(&mutex); } @@ -132,29 +116,22 @@ ocs::SessionManager::set_process_unique_id(u_long64 session_id, u_long64 process * @brief Check if a sessions allows access for a reader * * @param session_id session ID - * @param is_uptodate true if the access is allowed * @return true if the session exists */ bool -ocs::SessionManager::is_uptodate(u_long64 session_id, bool *is_uptodate) { - bool ret = true; - +ocs::SessionManager::is_uptodate(const u_long64 session_id) { + bool ret; pthread_mutex_lock(&mutex); - auto it = session_map.find(session_id); - if (it != session_map.end()) { - auto [write_time, write_event_id, process_event_id] = it->second; - + if (const auto it = session_map.find(session_id); it != session_map.end()) { // all write events have been processed (==) => true // more events are processed than there are expected write events (<) => true // there are still write events that we expect that have not been processed (>) => false - *is_uptodate = (write_event_id <= process_event_id); - ret = true; + ret = (it->second.write_unique_id <= process_unique_id); } else { // session does not exist, and we have to handle a RO-request // We can be sure that there is no previous write request because // otherwise the session would exit => session is up-to-date - *is_uptodate = true; - ret = false; + ret = true; } pthread_mutex_unlock(&mutex); return ret; @@ -165,8 +142,8 @@ ocs::SessionManager::is_uptodate(u_long64 session_id, bool *is_uptodate) { */ void ocs::SessionManager::remove_unused() { - u_long64 time = sge_get_gmt64(); - u_long64 time_threshold = time - 15 * 60 * 1000000; + const u_long64 time = sge_get_gmt64(); + const u_long64 time_threshold = time - 15 * 60 * 1000000; pthread_mutex_lock(&mutex); for (auto it = session_map.begin(); it != session_map.end(); ) { @@ -204,7 +181,7 @@ ocs::SessionManager::dump_all() { pthread_mutex_lock(&mutex); for (auto & [session_id, session] : session_map) { DPRINTF("session %20lu: write_time=%16lu, write_unique_id=%lu, process_unique_id=%lu\n", - session_id, session.write_time, session.write_unique_id, session.process_unique_id); + session_id, session.write_time, session.write_unique_id, process_unique_id); } pthread_mutex_unlock(&mutex); DRETURN_VOID; diff --git a/source/libs/sgeobj/ocs_Session.h b/source/libs/sgeobj/ocs_Session.h index 4568633ad..111996730 100644 --- a/source/libs/sgeobj/ocs_Session.h +++ b/source/libs/sgeobj/ocs_Session.h @@ -35,20 +35,20 @@ namespace ocs { struct Session { u_long64 write_time; //< time when write_unique_id was set u_long64 write_unique_id; //< unique id for the last write event - u_long64 process_unique_id; //< unique id for the last processed event }; static pthread_mutex_t mutex; //< mutex that saves access to the session_map static std::unordered_map session_map; //< hashtable for sessions + static u_long64 process_unique_id; //< unique id for the last processed event static void remove_unused(); public: - static u_long64 get_session_id(sge_gdi_packet_class_t *packet); + static u_long64 get_session_id(const char *user); - static void set_write_unique_id(u_long64 session_id, u_long64 write_event_id); - static void set_process_unique_id(u_long64 session_id, u_long64 process_event_id); - static bool is_uptodate(u_long64 session_id, bool *is_uptodate); + static void set_write_unique_id(const u_long64 session_id, const u_long64 write_event_id); + static void set_process_unique_id(const u_long64 process_event_id); + static bool is_uptodate(const u_long64 session_id); static void session_cleanup_handler(te_event_t anEvent, monitoring_t *monitor);