From 25bac8a62b275d52cfdeecd56cc2324dbbc94491 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Grubbstr=C3=B6m=20=28Grubba=29?= Date: Wed, 4 Dec 2024 11:04:50 +0100 Subject: [PATCH 1/3] Threads: interrupt() and kill() now actually attempt to interrupt the thread. Use SIGCHLD to interrupt the other thread. This will typically cause any blocking system calls in the target thread to fail with EINTR, but not affect anything else. --- src/pike_threadlib.h | 2 ++ src/threads.c | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pike_threadlib.h b/src/pike_threadlib.h index 710dce2a58..c666cb2b28 100644 --- a/src/pike_threadlib.h +++ b/src/pike_threadlib.h @@ -322,6 +322,7 @@ PMOD_EXPORT extern pthread_attr_t small_pattr; #define low_th_yield() Sleep(0) #define th_equal(X,Y) ((X)==(Y)) #define th_hash(X) (X) +#define th_kill(ID, sig) /* FIXME: Check if we can switch to the cheaper CRITICAL_SECTION objects. */ @@ -700,6 +701,7 @@ PMOD_EXPORT void pike_threads_disallow_ext (struct thread_state *ts #define th_cleanup() #define th_init_programs() #define th_self() NULL +#define th_kill(ID, sig) #define co_wait(X,Y) #define co_wait_timeout(X,Y,S,N) #define co_signal(X) diff --git a/src/threads.c b/src/threads.c index 5e7d2c5eec..ea70461640 100644 --- a/src/threads.c +++ b/src/threads.c @@ -2794,7 +2794,8 @@ static void f_thread_id_interrupt(INT32 args) thread_interrupt_callback = add_to_callback(&evaluator_callbacks, check_thread_interrupt, 0, 0); } - /* FIXME: Actually interrupt the thread. */ + /* Actually interrupt the thread. */ + th_kill(THIS_THREAD->id, SIGCHLD); } THIS_THREAD->flags |= THREAD_FLAG_INTR; push_int(0); @@ -2808,7 +2809,8 @@ static void low_thread_kill (struct thread_state *th) thread_interrupt_callback = add_to_callback(&evaluator_callbacks, check_thread_interrupt, 0, 0); } - /* FIXME: Actually interrupt the thread. */ + /* Actually interrupt the thread. */ + th_kill(th->id, SIGCHLD); } th->flags |= THREAD_FLAG_TERM; } From dec85e98afe7e0f97530ce030a164ecfac1bcd87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Grubbstr=C3=B6m=20=28Grubba=29?= Date: Wed, 4 Dec 2024 11:09:08 +0100 Subject: [PATCH 2/3] Threads: Fix clean up on exit for late threads. --- src/threads.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/threads.c b/src/threads.c index ea70461640..fdc4e37e7b 100644 --- a/src/threads.c +++ b/src/threads.c @@ -2856,16 +2856,16 @@ static void cleanup_thread_state (struct thread_state *th) if (th->status == THREAD_RUNNING || th->waiting) return; - if (THIS_THREAD->flags & THREAD_FLAG_SIGNAL_MASK) { + if (th->flags & THREAD_FLAG_SIGNAL_MASK) { Pike_interpreter.thread_state->flags &= ~THREAD_FLAG_SIGNAL_MASK; if (!--num_pending_interrupts) { remove_callback(thread_interrupt_callback); thread_interrupt_callback = NULL; - } + } } - co_destroy(& THIS_THREAD->status_change); - th_destroy(& THIS_THREAD->id); + co_destroy(& th->status_change); + th_destroy(& th->id); } void exit_thread_obj(struct object *UNUSED(o)) From 66a0391e2124ff29cddfce19624b0d607ea00f52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Grubbstr=C3=B6m=20=28Grubba=29?= Date: Wed, 4 Dec 2024 11:11:03 +0100 Subject: [PATCH 3/3] Stdio.File: Support closing from a concurrent thread. Pending read() and write() operations will now be interrupted if the file is concurrently closed by another thread. Typical intended use is for timeouts of blocking operations. --- src/modules/_Stdio/file.c | 177 +++++++++++++++++++++++++++++++++++--- src/modules/_Stdio/file.h | 13 +++ 2 files changed, 180 insertions(+), 10 deletions(-) diff --git a/src/modules/_Stdio/file.c b/src/modules/_Stdio/file.c index 9de978539c..72215c827d 100644 --- a/src/modules/_Stdio/file.c +++ b/src/modules/_Stdio/file.c @@ -542,6 +542,23 @@ static void close_fd_quietly(void) } break; } + +#ifdef _REENTRANT +#ifdef SIGCHLD + if (THIS->flags & FILE_BUSY) { + /* The fd appears to be busy in some other thread. + * + * Force that syscall to fail with EINTR. + */ + if (THIS->flags & FILE_RBUSY) { + th_kill(THIS->rthread, SIGCHLD); + } + if (THIS->flags & FILE_WBUSY) { + th_kill(THIS->wthread, SIGCHLD); + } + } +#endif +#endif } static void close_fd(void) @@ -594,6 +611,23 @@ static void close_fd(void) } break; } + +#ifdef _REENTRANT +#ifdef SIGCHLD + if (THIS->flags & FILE_BUSY) { + /* The fd appears to be busy in some other thread. + * + * Force that syscall to fail with EINTR. + */ + if (THIS->flags & FILE_RBUSY) { + th_kill(THIS->rthread, SIGCHLD); + } + if (THIS->flags & FILE_WBUSY) { + th_kill(THIS->wthread, SIGCHLD); + } + } +#endif +#endif } void my_set_close_on_exec(int fd, int to) @@ -695,6 +729,11 @@ static struct pike_string *do_read(int fd, do{ int fd=FD; int e; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); try_read = i; @@ -773,9 +812,15 @@ static struct pike_string *do_read(int fd, SET_ONERROR(ebuf, free_dynamic_buffer, &b); i = all && !INT32_MUL_OVERFLOW(r, 2) ? DIRECT_BUFSIZE : READ_BUFFER; do{ + int fd=FD; int e; char *buf; + if (fd < 0) { + /* Closed from another thread? */ + break; + } + try_read = i; buf = low_make_buf_space(try_read, &b); @@ -1036,6 +1081,11 @@ static struct pike_string *do_recvmsg(INT32 r, int all) do{ int fd=FD; int e; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL /* XPG 4.2 */ message.msg.msg_control = &message.cmsgbuf; @@ -1113,7 +1163,13 @@ static struct pike_string *do_recvmsg(INT32 r, int all) initialize_buf(&b); SET_ONERROR(ebuf, free_dynamic_buffer, &b); do{ + int fd=FD; int e; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } try_read=MINIMUM(CHUNK,r); #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL @@ -1258,6 +1314,11 @@ static struct pike_string *do_read_oob(int UNUSED(fd), do{ int e; int fd=FD; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); i=fd_recv(fd, str->str+bytes_read, r, MSG_OOB); e=errno; @@ -1396,6 +1457,14 @@ static void file_read(INT32 args) all=1; } +#ifdef _REENTRANT + if (THIS->flags & FILE_RBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_RBUSY; + THIS->rthread = th_self(); +#endif + pop_n_elems(args); #ifdef HAVE_PIKE_SEND_FD @@ -1417,6 +1486,10 @@ static void file_read(INT32 args) push_int(0); } +#ifdef _REENTRANT + THIS->flags &= ~FILE_RBUSY; +#endif + if (!(THIS->open_mode & FILE_NONBLOCKING)) INVALIDATE_CURRENT_TIME(); @@ -1518,6 +1591,14 @@ static void file_peek(INT32 args) fds.events=POLLIN; fds.revents=0; +#ifdef _REENTRANT + if (THIS->flags & FILE_RBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_RBUSY; + THIS->rthread = th_self(); +#endif + if (timeout) { THREADS_ALLOW(); ret=poll(&fds, 1, timeout); @@ -1526,6 +1607,10 @@ static void file_peek(INT32 args) ret=poll(&fds, 1, 0); } +#ifdef _REENTRANT + THIS->flags &= ~FILE_RBUSY; +#endif + if(ret < 0) { ERRNO=errno; @@ -1560,6 +1645,14 @@ static void file_peek(INT32 args) tv.tv_usec=(int)(1000000*(tf-tv.tv_sec)); } +#ifdef _REENTRANT + if (THIS->flags & FILE_RBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_RBUSY; + THIS->rthread = th_self(); +#endif + /* FIXME: Handling of EOF and not_eof */ if(tv.tv_sec || tv.tv_usec) { @@ -1570,6 +1663,10 @@ static void file_peek(INT32 args) else ret = fd_select(ret+1,&tmp,0,0,&tv); +#ifdef _REENTRANT + THIS->flags &= ~FILE_RBUSY; +#endif + if(ret < 0) { ERRNO=errno; @@ -1676,6 +1773,14 @@ static void file_read_oob(INT32 args) all=1; } +#ifdef _REENTRANT + if (THIS->flags & FILE_RBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_RBUSY; + THIS->rthread = th_self(); +#endif + pop_n_elems(args); if((tmp=do_read_oob(FD, len, all, & ERRNO))) @@ -1685,6 +1790,10 @@ static void file_read_oob(INT32 args) push_int(0); } +#ifdef _REENTRANT + THIS->flags &= ~FILE_RBUSY; +#endif + if (!(THIS->open_mode & FILE_NONBLOCKING)) INVALIDATE_CURRENT_TIME(); @@ -1859,6 +1968,12 @@ static void file_write(INT32 args) if(FD < 0) Pike_error("File not open for write.\n"); +#ifdef _REENTRANT + if (THIS->flags & FILE_WBUSY) { + Pike_error("File in use by another thread.\n"); + } +#endif + if (TYPEOF(Pike_sp[-args]) == PIKE_T_ARRAY) { struct array *a = Pike_sp[-args].u.array; @@ -1915,6 +2030,10 @@ static void file_write(INT32 args) } } +#ifdef _REENTRANT + THIS->flags |= FILE_WBUSY; + THIS->wthread = th_self(); +#endif for(written = 0; iovcnt; check_signals(0,0,0)) { int fd = FD; int e; @@ -1927,6 +2046,10 @@ static void file_write(INT32 args) THIS->fd_info = NULL; } #endif + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); #ifdef IOV_MAX @@ -2005,13 +2128,11 @@ static void file_write(INT32 args) } } } + } + #ifdef _REENTRANT - if (FD<0) { - free(iovbase); - Pike_error("File closed while in file->write.\n"); - } + THIS->flags &= ~FILE_WBUSY; #endif - } free(iovbase); @@ -2038,6 +2159,10 @@ static void file_write(INT32 args) if(str->size_shift) Pike_error("Stdio.File->write(): cannot output wide strings.\n"); +#ifdef _REENTRANT + THIS->flags |= FILE_WBUSY; + THIS->wthread = th_self(); +#endif for(written=0;written < str->len;check_signals(0,0,0)) { int fd=FD; @@ -2051,6 +2176,10 @@ static void file_write(INT32 args) THIS->fd_info = NULL; } #endif + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); #ifdef HAVE_PIKE_SEND_FD if (fd_info) { @@ -2107,15 +2236,12 @@ static void file_write(INT32 args) if(THIS->open_mode & FILE_NONBLOCKING) break; } -#ifdef _REENTRANT - if(FD<0) Pike_error("File closed while in file->write.\n"); -#endif } #ifdef _REENTRANT - /* check_signals() may have done something... */ - if(FD<0) Pike_error("File closed while in file->write.\n"); + THIS->flags &= ~FILE_WBUSY; #endif + /* Race: A backend in another thread might have managed to set these * again for buffer space available after the write above. Not that * bad - it will get through in a later backend round. */ @@ -2186,10 +2312,23 @@ static void file_write_oob(INT32 args) if(str->size_shift) Pike_error("Stdio.File->write_oob(): cannot output wide strings.\n"); +#ifdef _REENTRANT + if (THIS->flags & FILE_WBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_WBUSY; + THIS->wthread = th_self(); +#endif + while(written < str->len) { int fd=FD; int e; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); i = fd_send(fd, str->str + written, str->len - written, MSG_OOB); e=errno; @@ -2230,6 +2369,10 @@ static void file_write_oob(INT32 args) } } +#ifdef _REENTRANT + THIS->flags &= ~FILE_WBUSY; +#endif + /* Race: A backend in another thread might have managed to set these * again for buffer space available after the write above. Not that * bad - it will get through in a later backend round. */ @@ -2904,6 +3047,13 @@ static void file_openat(INT32 args) Pike_error("Must open file for at least one of read and write.\n"); do { + dir_fd = FD; + if (dir_fd < 0) { + /* Unlikely, but... */ + fd = -1; + err = EBADF; + break; + } THREADS_ALLOW_UID(); fd = openat(dir_fd, str->str, map(flags), access); err = errno; @@ -3353,6 +3503,7 @@ static void file_unlinkat(INT32 args) THREADS_ALLOW_UID(); do { + /* FIXME: Handle concurrent close of dir_fd. */ i = fstatat(dir_fd, str->str, &st, AT_SYMLINK_NOFOLLOW); } while ((i < 0) && (errno == EINTR)); if (i >= 0) { @@ -3361,6 +3512,7 @@ static void file_unlinkat(INT32 args) flag = AT_REMOVEDIR; } do { + /* FIXME: Handle concurrent close of dir_fd. */ i = unlinkat(dir_fd, str->str, flag); } while ((i < 0) && (errno == EINTR)); } @@ -3430,6 +3582,11 @@ static void file_get_dir(INT32 args) if ((dfd == -1) && (errno == EINTR)) { check_threads_etc(); + fd = FD; + if (fd < 0) { + errno = EBADF; + break; + } continue; } break; diff --git a/src/modules/_Stdio/file.h b/src/modules/_Stdio/file.h index a40028676b..be0dba0665 100644 --- a/src/modules/_Stdio/file.h +++ b/src/modules/_Stdio/file.h @@ -53,6 +53,7 @@ #endif #include "pike_netlib.h" +#include "pike_threadlib.h" #include "backend.h" #if defined(HAVE_IPPROTO_IPv6) && !defined(IPPROTO_IPV6) @@ -90,6 +91,15 @@ struct my_file #if defined(HAVE_FD_FLOCK) || defined(HAVE_FD_LOCKF) struct object *key; #endif +#ifdef _REENTRANT + /* Threads that need to be woken up on concurrent close(2). + * + * NB: May be stale. Only valid if the corresponding FILE_BUSY + * bit is set in flags. + */ + THREAD_T rthread; + THREAD_T wthread; +#endif }; #ifdef _REENTRANT @@ -210,5 +220,8 @@ void low_get_dir(DIR *dir, ptrdiff_t name_max); #define FILE_LOCK_FD 0x0004 #define FILE_NOT_OPENED 0x0010 #define FILE_HAVE_RECV_FD 0x0020 +#define FILE_RBUSY 0x0040 +#define FILE_WBUSY 0x0080 +#define FILE_BUSY 0x00c0 #endif