Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVPath upstream for release branch #4429

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion thirdparty/EVPath/EVPath/.indent.pro
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-nbad -bap -nbc -br -c33 -cd33 -ncdb -ce -ci4 -brs
-cli0 -cp33 -d0 -di1 -fc1 -fca -i4 -ip0 -l75 -lp
-cli0 -cp33 -d0 -di1 -fc1 -fca -i4 -ip0 -l120 -lp
-npcs -psl -sc -nsob -nss -ts8 -TIOFile -TIOFormat -TIOConversionPtr
-TIOFieldList -TIORecordType -TIOFieldPtr
99 changes: 54 additions & 45 deletions thirdparty/EVPath/EVPath/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,28 +1,10 @@
cmake_minimum_required(VERSION 3.5)
cmake_minimum_required(VERSION 3.14)

# The directory label is used for CDash to treat EVPath as a subproject of
# GTKorvo
set(CMAKE_DIRECTORY_LABELS EVPath)

project(EVPath VERSION 4.5.0 LANGUAGES C CXX)

# Enable <PackageName>_ROOT variables for dependency searching
# CMake v3.12
if(POLICY CMP0074)
cmake_policy(SET CMP0074 NEW)
endif()

# Enable imported targets as library dependencies for CHECK_INCLUDE_FILE
if(POLICY CMP0075)
cmake_policy(SET CMP0075 NEW)
endif()

# Let option(...) setings work with non-cache variables. Useful for including
# EVPath as a third party dependency in another project.
# CMake v3.13
if(POLICY CMP0077)
cmake_policy(SET CMP0077 NEW)
endif()
project(EVPath VERSION 4.5.4 LANGUAGES C CXX)

# Some boilerplate to setup nice output directories
include(GNUInstallDirs)
Expand Down Expand Up @@ -87,7 +69,6 @@ if(WIN32)
-D_SCL_SECURE_NO_DEPRECATE
-D_WINSOCK_DEPRECATED_NO_WARNINGS
-D_CRT_NONSTDC_NO_DEPRECATE)
set (MSVC_PERL_FLAGS "-msvc-long")
endif()
endif()

Expand Down Expand Up @@ -138,7 +119,7 @@ add_library(EVPath
cm.c cm_control.c cm_formats.c cm_util.c cm_transport.c
cm_lock.c cm_perf.c cm_pbio.c cm_interface.c version.c
cm_threadio.c cm_evol.c evp.c response.c metrics.c
dlloader.c ip_config.c chr_time.c
dlloader.c ip_config.c chr_time.c revpath.h revp_internal.h
revp.c evp_compat.c thin_server.c evp_threads.c ev_dfg.c)
add_library(EVPath::EVPath ALIAS EVPath)
add_library(evpath ALIAS EVPath)
Expand Down Expand Up @@ -297,10 +278,13 @@ if(EVPATH_TRANSPORT_MODULES)

add_library(cmselect MODULE cmselect.c)
add_library(cmsockets MODULE cmsockets.c ip_config.c)
add_library(cmudp MODULE cmudp.c)
add_library(cmmulticast MODULE cmmulticast.c)

foreach(M cmselect cmsockets cmudp cmmulticast)
list (APPEND tgts cmselect cmsockets)
if(NOT WIN32)
add_library(cmudp MODULE cmudp.c)
add_library(cmmulticast MODULE cmmulticast.c)
list (APPEND tgts cmudp cmmulticast)
endif()
foreach(M ${tgts})
set_target_properties(${M} PROPERTIES
OUTPUT_NAME ${EVPATH_LIBRARY_PREFIX}${M}
LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/${EVPATH_INSTALL_MODULE_DIR}
Expand All @@ -310,10 +294,12 @@ if(EVPATH_TRANSPORT_MODULES)

target_link_libraries(cmselect PRIVATE evpath_headers atl::atl)
target_link_libraries(cmsockets PRIVATE evpath_headers atl::atl)
target_link_libraries(cmudp PRIVATE evpath_headers atl::atl)
target_link_libraries(cmmulticast PRIVATE evpath_headers atl::atl)

list(APPEND EVPATH_TRANSPORT_TARGETS cmselect cmsockets cmudp cmmulticast)
list(APPEND EVPATH_TRANSPORT_TARGETS cmselect cmsockets)
if(NOT WIN32)
target_link_libraries(cmudp PRIVATE evpath_headers atl::atl)
target_link_libraries(cmmulticast PRIVATE evpath_headers atl::atl)
list (APPEND EVPATH_TRANSPORT_TARGETS cmudp cmmulticast)
endif()

if (HAVE_SYS_EPOLL_H)
add_library(cmepoll MODULE cmepoll.c)
Expand Down Expand Up @@ -580,21 +566,42 @@ install(DIRECTORY ${PROJECT_SOURCE_DIR}/cmake/
FILES_MATCHING PATTERN "Find*.cmake" PATTERN "CMake*.cmake"
)

add_custom_command(
OUTPUT "cm_interface.c" "revp.c" "revpath.h"
SOURCE
${CMAKE_CURRENT_SOURCE_DIR}/evpath.h ${CMAKE_CURRENT_SOURCE_DIR}/ev_dfg.h
COMMAND perl
${CMAKE_CURRENT_SOURCE_DIR}/gen_interface.pl
${CMAKE_CURRENT_SOURCE_DIR}/evpath.h
${CMAKE_CURRENT_SOURCE_DIR}/ev_dfg.h
${CMAKE_CURRENT_SOURCE_DIR}/cm_schedule.h
DEPENDS
${CMAKE_CURRENT_SOURCE_DIR}/gen_interface.pl
${CMAKE_CURRENT_SOURCE_DIR}/evpath.h
${CMAKE_CURRENT_SOURCE_DIR}/ev_dfg.h
${CMAKE_CURRENT_SOURCE_DIR}/cm_schedule.h
)
find_package(Perl)

if (PERL_FOUND)
add_custom_command(
OUTPUT "cm_interface.c" "revp.c" "revpath.h" "revp_internal.h"
COMMAND ${PERL_EXECUTABLE}
${CMAKE_CURRENT_SOURCE_DIR}/gen_interface.pl
${CMAKE_CURRENT_SOURCE_DIR}/evpath.h
${CMAKE_CURRENT_SOURCE_DIR}/ev_dfg.h
${CMAKE_CURRENT_SOURCE_DIR}/cm_schedule.h
DEPENDS
${CMAKE_CURRENT_SOURCE_DIR}/gen_interface.pl
${CMAKE_CURRENT_SOURCE_DIR}/evpath.h
${CMAKE_CURRENT_SOURCE_DIR}/ev_dfg.h
${CMAKE_CURRENT_SOURCE_DIR}/cm_schedule.h
)
else()
add_custom_command(OUTPUT cm_interface.c
COMMAND ${CMAKE_COMMAND} -E copy
${PROJECT_SOURCE_DIR}/pregen-source/cm_interface.c cm_interface.c
COMMENT "Copying pregen cm_interface.c to build area")
add_custom_command(OUTPUT revp.c
COMMAND ${CMAKE_COMMAND} -E copy
${PROJECT_SOURCE_DIR}/pregen-source/revp.c revp.c
COMMENT "Copying pregen revp.c to build area")
add_custom_command(OUTPUT revpath.h
COMMAND ${CMAKE_COMMAND} -E copy
${PROJECT_SOURCE_DIR}/pregen-source/revpath.h revpath.h
COMMENT "Copying pregen revpath.h to build area")

add_custom_command(OUTPUT revp_internal.h
COMMAND ${CMAKE_COMMAND} -E copy
${PROJECT_SOURCE_DIR}/pregen-source/revp_internal.h revp_internal.h
COMMENT "Copying pregen revp_internal.h to build area")

endif()

if(Threads_FOUND AND CMAKE_USE_PTHREADS_INIT)
set(USE_PTHREADS TRUE)
Expand All @@ -604,6 +611,8 @@ CHECK_INCLUDE_FILE(hostlib.h HAVE_HOSTLIB_H)
CHECK_INCLUDE_FILE(malloc.h HAVE_MALLOC_H)
CHECK_INCLUDE_FILE(memory.h HAVE_MEMORY_H)
CHECK_INCLUDE_FILE(netdb.h HAVE_NETDB_H)
CHECK_INCLUDE_FILE(netinet/in.h HAVE_NETINET_IN_H)
CHECK_INCLUDE_FILE(arpa/inet.h HAVE_ARPA_INET_H)
CHECK_INCLUDE_FILE(sockLib.h HAVE_SOCKLIB_H)
CHECK_INCLUDE_FILE(stdarg.h STDC_HEADERS)
CHECK_INCLUDE_FILE(stdint.h HAVE_STDINT_H)
Expand Down
2 changes: 1 addition & 1 deletion thirdparty/EVPath/EVPath/chr_time.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#ifdef _MSC_VER
#ifdef _WIN32
#include <winsock2.h>
#include <time.h>
#include <sys/timeb.h>
Expand Down
33 changes: 24 additions & 9 deletions thirdparty/EVPath/EVPath/cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
#include <stdlib.h>
#include <limits.h>
#ifdef HAVE_WINDOWS_H
#ifndef FD_SETSIZE
#define FD_SETSIZE 1024
#endif
#include <winsock2.h>
#define __ANSI_CPP__
#define lrand48() rand()
#define srand48(x) srand((unsigned int)(x))
#else
#include <netinet/in.h>
#include <arpa/inet.h>
Expand Down Expand Up @@ -60,6 +64,7 @@ extern void libcmselect_LTX_select_stop(CMtrans_services svc,void *client_data);
static void CMinitialize (CManager cm);

static atom_t CM_TRANSPORT = -1;
static atom_t CM_CMANAGER_ID = -1;
static atom_t CM_NETWORK_POSTFIX = -1;
static atom_t CM_CONN_BLOCKING = -1;
atom_t CM_REBWM_RLEN = -1;
Expand Down Expand Up @@ -232,7 +237,7 @@ static thr_thread_t
thr_fork(void*(*func)(void*), void *arg)
{
thr_thread_t new_thread = 0;
int err = thr_thread_create(&new_thread, NULL, (void*(*)(void*))func, arg);
int err = thr_thread_create(&new_thread, NULL, (void*)func, arg);
if (err != 0) {
return (thr_thread_t) (intptr_t)NULL;
} else {
Expand All @@ -258,7 +263,7 @@ INT_CMfork_comm_thread(CManager cm)
if (server_thread == (thr_thread_t)(intptr_t) NULL) {
return 0;
}
cm->control_list->server_thread = server_thread;
cm->control_list->server_thread = thr_get_thread_id(server_thread);
cm->control_list->has_thread = 1;
cm->reference_count++;
CMtrace_out(cm, CMFreeVerbose, "Forked - CManager %p ref count now %d\n",
Expand Down Expand Up @@ -575,6 +580,7 @@ CMinternal_listen(CManager cm, attr_list listen_info, int try_others)
attrs = (*trans_list)->listen(cm, &CMstatic_trans_svcs,
*trans_list,
listen_info);
add_attr(attrs, CM_CMANAGER_ID, Attr_Int4, (intptr_t)cm->CManager_ID);
if (iface) {
add_string_attr(attrs, CM_IP_INTERFACE, strdup(iface));
}
Expand Down Expand Up @@ -752,6 +758,7 @@ INT_CManager_create_control(char *control_module)

if (atom_init == 0) {
CM_TRANSPORT = attr_atom_from_string("CM_TRANSPORT");
CM_CMANAGER_ID = attr_atom_from_string("CM_CMANAGER_ID");
CM_NETWORK_POSTFIX = attr_atom_from_string("CM_NETWORK_POSTFIX");
CM_CONN_BLOCKING = attr_atom_from_string("CM_CONN_BLOCKING");
CM_REBWM_RLEN = attr_atom_from_string("CM_REG_BW_RUN_LEN");
Expand All @@ -772,6 +779,9 @@ INT_CManager_create_control(char *control_module)
cm->transports = NULL;
cm->initialized = 0;
cm->reference_count = 1;
uint64_t seed = getpid() + time(NULL);
srand48(seed);
cm->CManager_ID = (int)lrand48();

char *tmp;
if ((tmp = getenv("CMControlModule"))) {
Expand Down Expand Up @@ -1053,7 +1063,7 @@ CManager_free(CManager cm)
new_list->select_data = NULL;
new_list->add_select = NULL;
new_list->remove_select = NULL;
new_list->server_thread = (thr_thread_t)(intptr_t) NULL;
new_list->server_thread = (thr_thread_id)0;
new_list->network_blocking_function.func = NULL;
new_list->network_polling_function.func = NULL;
new_list->polling_function_list = NULL;
Expand Down Expand Up @@ -1490,7 +1500,7 @@ INT_CMget_ip_config_diagnostics(CManager cm)
msg[0] = 0x434d4800; /* CMH\0 */
msg[1] = (CURRENT_HANDSHAKE_VERSION << 24) + sizeof(msg);
msg[2] = cm->FFSserver_identifier;
msg[3] = 5; /* not implemented yet */
msg[3] = cm->CManager_ID;
msg[4] = 0; /* not implemented yet */
if (conn->remote_format_server_ID != 0) {
/* set high bit if we already have his ID */
Expand Down Expand Up @@ -1691,12 +1701,16 @@ timeout_conn(CManager cm, void *client_data)
fprintf(cm->CMTrace_file, "In CMinternal_get_conn, attrs ");
if (attrs) fdump_attr_list(cm->CMTrace_file, attrs); else fprintf(cm->CMTrace_file, "\n");
}
int target_cm_id = -1;
(void) get_int_attr(attrs, CM_CMANAGER_ID, &target_cm_id);
for (i=0; i<cm->connection_count; i++) {
CMConnection tmp = cm->connections[i];
if (tmp->closed || tmp->failed) continue;
if (tmp->trans->connection_eq(cm, &CMstatic_trans_svcs,
tmp->trans, attrs,
tmp->transport_data)) {

if ((tmp->remote_CManager_ID == target_cm_id) ||
tmp->trans->connection_eq(cm, &CMstatic_trans_svcs,
tmp->trans, attrs,
tmp->transport_data)) {

CMtrace_out(tmp->cm, CMFreeVerbose, "internal_get_conn found conn=%p ref count will be %d\n",
tmp, tmp->conn_ref_count +1);
Expand Down Expand Up @@ -2088,6 +2102,7 @@ timeout_conn(CManager cm, void *client_data)
if (cm_preread_hook) {
do_read = cm_preread_hook(buffer_full_point - buffer_data_end, tmp_message_buffer);
}
CMtrace_out(cm, CMLowLevelVerbose, "P5\n");
if (do_read) {
if (trans->read_to_buffer_func) {
/*
Expand Down Expand Up @@ -3851,7 +3866,7 @@ CM_init_select(CMControlList cl, CManager cm)
}
CMtrace_out(cm, CMLowLevelVerbose,
"CM - Forked comm thread %p\n", (void*)(intptr_t)server_thread);
cm->control_list->server_thread = server_thread;
cm->control_list->server_thread = thr_get_thread_id(server_thread);
cm->control_list->cl_reference_count++;
cm->control_list->free_reference_count++;
cl->has_thread = 1;
Expand Down Expand Up @@ -3968,7 +3983,7 @@ int offset_compare(const void* lhsv, const void* rhsv)
return lhs->offset.tv_usec - rhs->offset.tv_usec;
}

#ifdef _MSC_VER
#ifdef _WIN32
static inline void timeradd(struct timeval *a, struct timeval *b,
struct timeval *res)
{
Expand Down
4 changes: 2 additions & 2 deletions thirdparty/EVPath/EVPath/cm_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ INT_CMCondition_wait(CManager cm, int condition)
(void*)(intptr_t)cl->server_thread);
}
if (!cl->has_thread) {
if ((cl->server_thread == (thr_thread_t) (intptr_t) NULL) || (cl->server_thread == thr_thread_self())) {
if ((cl->server_thread == (thr_thread_id) 0) || (cl->server_thread == thr_thread_self())) {
cl->cond_polling = 1;
while (!(cond->signaled || cond->failed)) {
if (cm_control_debug_flag) {
Expand All @@ -255,7 +255,7 @@ INT_CMCondition_wait(CManager cm, int condition)
fprintf(cm->CMTrace_file, "CMLowLevel after Polling for CMcondition %d\n", condition);
}
/* the poll and handle will set cl->server_thread, restore it */
cl->server_thread = (thr_thread_t) (intptr_t)NULL;
cl->server_thread = (thr_thread_id) (intptr_t)NULL;
if (cm_control_debug_flag) {
fprintf(cm->CMTrace_file, "CMLowLevel In condition wait, reset server thread = %lx\n",
(long)cl->server_thread);
Expand Down
2 changes: 2 additions & 0 deletions thirdparty/EVPath/EVPath/cm_evol.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#endif
#include <stdlib.h>
#ifdef HAVE_WINDOWS_H
#ifndef FD_SETSIZE
#define FD_SETSIZE 1024
#endif
#include <winsock2.h>
#define __ANSI_CPP__
#else
Expand Down
44 changes: 29 additions & 15 deletions thirdparty/EVPath/EVPath/cm_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <pthread.h>
#define thr_mutex_t pthread_mutex_t
#define thr_thread_t pthread_t
#define thr_thread_id thr_thread_t
#define thr_get_thread_id(t) (t)
#define thr_condition_t pthread_cond_t
#define thr_thread_self() pthread_self()
#define thr_thread_exit(status) pthread_exit(status);
Expand All @@ -34,24 +36,35 @@
#else
//#include <mutex>
#include <Windows.h>
#define thr_mutex_t HANDLE
#define thr_thread_t DWORD
#define thr_condition_t HANDLE
#define thr_thread_create(w,x,y,z) 0
extern int win_thread_create(HANDLE* w, void* x, void* y, void* z);
extern void win_mutex_init(SRWLOCK *m);
extern void win_mutex_lock(SRWLOCK* m);
extern void win_mutex_unlock(SRWLOCK* m);
extern void win_mutex_free(SRWLOCK* m);
extern void win_condition_init(CONDITION_VARIABLE *c);
extern void win_condition_wait(CONDITION_VARIABLE *c, SRWLOCK *m);
extern void win_condition_signal(CONDITION_VARIABLE *c);
extern void win_condition_free(CONDITION_VARIABLE *c);
#define thr_mutex_t SRWLOCK
#define thr_thread_t HANDLE
#define thr_thread_id DWORD
#define thr_condition_t CONDITION_VARIABLE
#define thr_thread_create(w,x,y,z) win_thread_create(w,x,y,z)
#define thr_thread_self() GetCurrentThreadId()
#define thr_thread_exit(status)
#define thr_thread_exit(status) ExitThread((DWORD)(intptr_t)status)
#define thr_get_thread_id(t) GetThreadId(t)
#define thr_thread_detach(thread)
#define thr_thread_yield()
#define thr_thread_join(t, s) (void)s
#define thr_mutex_init(m)
#define thr_mutex_lock(m)
#define thr_mutex_unlock(m)
#define thr_mutex_free(m)
#define thr_condition_init(c)
#define thr_condition_wait(c, m)
#define thr_condition_signal(c)
#define thr_condition_broadcast(c)
#define thr_condition_free(c)
#define thr_mutex_init(m) win_mutex_init(&m)
#define thr_mutex_lock(m) win_mutex_lock(&m)
#define thr_mutex_unlock(m) win_mutex_unlock(&m)
#define thr_mutex_free(m) win_mutex_free(&m)
#define thr_condition_init(c) win_condition_init(&c)
#define thr_condition_wait(c, m) win_condition_wait(&c, &m)
#define thr_condition_signal(c) win_condition_signal(&c)
#define thr_condition_broadcast(c) error
#define thr_condition_free(c) win_condition_free(&c)
#endif

#include <ev_internal.h>
Expand Down Expand Up @@ -138,6 +151,7 @@ typedef struct _CManager {
int initialized;
int reference_count;
char *control_module_choice; /* this is static, doesn't need to be free'd */
int CManager_ID;

CMControlList control_list; /* the control list for this DE */

Expand Down Expand Up @@ -240,7 +254,7 @@ typedef struct _CMControlList {
int closed;
int has_thread;
int cond_polling;
thr_thread_t server_thread;
thr_thread_id server_thread;
} CMControlList_s;

struct queued_data_rec {
Expand Down
Loading
Loading