diff --git a/src/modules/_Stdio/file.c b/src/modules/_Stdio/file.c index 541b4bd206..d1d050fb1d 100644 --- a/src/modules/_Stdio/file.c +++ b/src/modules/_Stdio/file.c @@ -574,6 +574,23 @@ static void close_fd_quietly(void) } break; } + +#ifdef _REENTRANT +#ifdef SIGCHLD + if (THIS->flags & FILE_BUSY) { + /* The fd appears to be busy in some other thread. + * + * Force that syscall to fail with EINTR. + */ + if (THIS->flags & FILE_RBUSY) { + th_kill(THIS->rthread, SIGCHLD); + } + if (THIS->flags & FILE_WBUSY) { + th_kill(THIS->wthread, SIGCHLD); + } + } +#endif +#endif } static void close_fd(void) @@ -626,6 +643,23 @@ static void close_fd(void) } break; } + +#ifdef _REENTRANT +#ifdef SIGCHLD + if (THIS->flags & FILE_BUSY) { + /* The fd appears to be busy in some other thread. + * + * Force that syscall to fail with EINTR. + */ + if (THIS->flags & FILE_RBUSY) { + th_kill(THIS->rthread, SIGCHLD); + } + if (THIS->flags & FILE_WBUSY) { + th_kill(THIS->wthread, SIGCHLD); + } + } +#endif +#endif } void my_set_close_on_exec(int fd, int to) @@ -727,6 +761,11 @@ static struct pike_string *do_read(int fd, do{ int fd=FD; int e; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); try_read = i; @@ -805,9 +844,15 @@ static struct pike_string *do_read(int fd, SET_ONERROR(ebuf, free_dynamic_buffer, &b); i = all && !INT32_MUL_OVERFLOW(r, 2) ? DIRECT_BUFSIZE : READ_BUFFER; do{ + int fd=FD; int e; char *buf; + if (fd < 0) { + /* Closed from another thread? */ + break; + } + try_read = i; buf = low_make_buf_space(try_read, &b); @@ -1068,6 +1113,11 @@ static struct pike_string *do_recvmsg(INT32 r, int all) do{ int fd=FD; int e; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL /* XPG 4.2 */ message.msg.msg_control = &message.cmsgbuf; @@ -1145,7 +1195,13 @@ static struct pike_string *do_recvmsg(INT32 r, int all) initialize_buf(&b); SET_ONERROR(ebuf, free_dynamic_buffer, &b); do{ + int fd=FD; int e; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } try_read=MINIMUM(CHUNK,r); #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL @@ -1290,6 +1346,11 @@ static struct pike_string *do_read_oob(int UNUSED(fd), do{ int e; int fd=FD; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); i=fd_recv(fd, str->str+bytes_read, r, MSG_OOB); e=errno; @@ -1431,6 +1492,14 @@ static void file_read(INT32 args) all=1; } +#ifdef _REENTRANT + if (THIS->flags & FILE_RBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_RBUSY; + THIS->rthread = th_self(); +#endif + pop_n_elems(args); #ifdef HAVE_PIKE_SEND_FD @@ -1452,6 +1521,10 @@ static void file_read(INT32 args) push_int(0); } +#ifdef _REENTRANT + THIS->flags &= ~FILE_RBUSY; +#endif + if (!(THIS->open_mode & FILE_NONBLOCKING)) INVALIDATE_CURRENT_TIME(); @@ -1553,6 +1626,14 @@ static void file_peek(INT32 args) fds.events=POLLIN; fds.revents=0; +#ifdef _REENTRANT + if (THIS->flags & FILE_RBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_RBUSY; + THIS->rthread = th_self(); +#endif + if (timeout) { THREADS_ALLOW(); ret=poll(&fds, 1, timeout); @@ -1561,6 +1642,10 @@ static void file_peek(INT32 args) ret=poll(&fds, 1, 0); } +#ifdef _REENTRANT + THIS->flags &= ~FILE_RBUSY; +#endif + if(ret < 0) { ERRNO=errno; @@ -1595,6 +1680,14 @@ static void file_peek(INT32 args) tv.tv_usec=(int)(1000000*(tf-tv.tv_sec)); } +#ifdef _REENTRANT + if (THIS->flags & FILE_RBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_RBUSY; + THIS->rthread = th_self(); +#endif + /* FIXME: Handling of EOF and not_eof */ if(tv.tv_sec || tv.tv_usec) { @@ -1605,6 +1698,10 @@ static void file_peek(INT32 args) else ret = fd_select(ret+1,&tmp,0,0,&tv); +#ifdef _REENTRANT + THIS->flags &= ~FILE_RBUSY; +#endif + if(ret < 0) { ERRNO=errno; @@ -1711,6 +1808,14 @@ static void file_read_oob(INT32 args) all=1; } +#ifdef _REENTRANT + if (THIS->flags & FILE_RBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_RBUSY; + THIS->rthread = th_self(); +#endif + pop_n_elems(args); if((tmp=do_read_oob(FD, len, all, & ERRNO))) @@ -1720,6 +1825,10 @@ static void file_read_oob(INT32 args) push_int(0); } +#ifdef _REENTRANT + THIS->flags &= ~FILE_RBUSY; +#endif + if (!(THIS->open_mode & FILE_NONBLOCKING)) INVALIDATE_CURRENT_TIME(); @@ -1921,6 +2030,12 @@ static void file_write(INT32 args) if(FD < 0) Pike_error("File not open for write.\n"); +#ifdef _REENTRANT + if (THIS->flags & FILE_WBUSY) { + Pike_error("File in use by another thread.\n"); + } +#endif + if (TYPEOF(Pike_sp[-args]) == PIKE_T_ARRAY) { struct array *a = Pike_sp[-args].u.array; @@ -1977,6 +2092,10 @@ static void file_write(INT32 args) } } +#ifdef _REENTRANT + THIS->flags |= FILE_WBUSY; + THIS->wthread = th_self(); +#endif for(written = 0; iovcnt; check_signals(0,0,0)) { int fd = FD; int e; @@ -1989,6 +2108,10 @@ static void file_write(INT32 args) THIS->fd_info = NULL; } #endif + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); #ifdef IOV_MAX @@ -2068,13 +2191,11 @@ static void file_write(INT32 args) } } } + } + #ifdef _REENTRANT - if (FD<0) { - free(iovbase); - Pike_error("File closed while in file->write.\n"); - } + THIS->flags &= ~FILE_WBUSY; #endif - } free(iovbase); @@ -2104,6 +2225,10 @@ static void file_write(INT32 args) if(str->size_shift) Pike_error("Stdio.File->write(): cannot output wide strings.\n"); +#ifdef _REENTRANT + THIS->flags |= FILE_WBUSY; + THIS->wthread = th_self(); +#endif for(written=0;written < str->len;check_signals(0,0,0)) { int fd=FD; @@ -2117,6 +2242,10 @@ static void file_write(INT32 args) THIS->fd_info = NULL; } #endif + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); #ifdef HAVE_PIKE_SEND_FD if (fd_info) { @@ -2173,15 +2302,12 @@ static void file_write(INT32 args) if(THIS->open_mode & FILE_NONBLOCKING) break; } -#ifdef _REENTRANT - if(FD<0) Pike_error("File closed while in file->write.\n"); -#endif } #ifdef _REENTRANT - /* check_signals() may have done something... */ - if(FD<0) Pike_error("File closed while in file->write.\n"); + THIS->flags &= ~FILE_WBUSY; #endif + /* Race: A backend in another thread might have managed to set these * again for buffer space available after the write above. Not that * bad - it will get through in a later backend round. */ @@ -2252,10 +2378,23 @@ static void file_write_oob(INT32 args) if(str->size_shift) Pike_error("Stdio.File->write_oob(): cannot output wide strings.\n"); +#ifdef _REENTRANT + if (THIS->flags & FILE_WBUSY) { + Pike_error("File in use by another thread.\n"); + } + THIS->flags |= FILE_WBUSY; + THIS->wthread = th_self(); +#endif + while(written < str->len) { int fd=FD; int e; + + if (fd < 0) { + /* Closed from another thread? */ + break; + } THREADS_ALLOW(); i = fd_send(fd, str->str + written, str->len - written, MSG_OOB); e=errno; @@ -2296,6 +2435,10 @@ static void file_write_oob(INT32 args) } } +#ifdef _REENTRANT + THIS->flags &= ~FILE_WBUSY; +#endif + /* Race: A backend in another thread might have managed to set these * again for buffer space available after the write above. Not that * bad - it will get through in a later backend round. */ @@ -3028,6 +3171,13 @@ static void file_openat(INT32 args) Pike_error("Must open file for at least one of read and write.\n"); do { + dir_fd = FD; + if (dir_fd < 0) { + /* Unlikely, but... */ + fd = -1; + err = EBADF; + break; + } THREADS_ALLOW_UID(); fd = openat(dir_fd, str->str, map(flags), access); err = errno; @@ -3553,6 +3703,7 @@ static void file_unlinkat(INT32 args) THREADS_ALLOW_UID(); do { + /* FIXME: Handle concurrent close of dir_fd. */ i = fstatat(dir_fd, str->str, &st, AT_SYMLINK_NOFOLLOW); } while ((i < 0) && (errno == EINTR)); if (i >= 0) { @@ -3561,6 +3712,7 @@ static void file_unlinkat(INT32 args) flag = AT_REMOVEDIR; } do { + /* FIXME: Handle concurrent close of dir_fd. */ i = unlinkat(dir_fd, str->str, flag); } while ((i < 0) && (errno == EINTR)); } @@ -3630,6 +3782,11 @@ static void file_get_dir(INT32 args) if ((dfd == -1) && (errno == EINTR)) { check_threads_etc(); + fd = FD; + if (fd < 0) { + errno = EBADF; + break; + } continue; } break; diff --git a/src/modules/_Stdio/file.h b/src/modules/_Stdio/file.h index a659df35ab..6cd2ec8cc0 100644 --- a/src/modules/_Stdio/file.h +++ b/src/modules/_Stdio/file.h @@ -57,6 +57,7 @@ #endif #include "pike_netlib.h" +#include "pike_threadlib.h" #include "backend.h" #if defined(HAVE_IPPROTO_IPv6) && !defined(IPPROTO_IPV6) @@ -108,6 +109,15 @@ struct my_file #if defined(HAVE_FD_FLOCK) || defined(HAVE_FD_LOCKF) struct object *key; #endif +#ifdef _REENTRANT + /* Threads that need to be woken up on concurrent close(2). + * + * NB: May be stale. Only valid if the corresponding FILE_BUSY + * bit is set in flags. + */ + THREAD_T rthread; + THREAD_T wthread; +#endif }; #ifdef _REENTRANT @@ -230,5 +240,8 @@ void low_get_dir(DIR *dir, ptrdiff_t name_max); #define FILE_LOCK_FD 0x0004 #define FILE_NOT_OPENED 0x0010 #define FILE_HAVE_RECV_FD 0x0020 +#define FILE_RBUSY 0x0040 +#define FILE_WBUSY 0x0080 +#define FILE_BUSY 0x00c0 #endif diff --git a/src/pike_threadlib.h b/src/pike_threadlib.h index 84d2270d12..4e66a0a74c 100644 --- a/src/pike_threadlib.h +++ b/src/pike_threadlib.h @@ -324,6 +324,7 @@ PMOD_EXPORT extern pthread_attr_t small_pattr; #define low_th_yield() Sleep(0) #define th_equal(X,Y) ((X)==(Y)) #define th_hash(X) (X) +#define th_kill(ID, sig) /* FIXME: Check if we can switch to the cheaper CRITICAL_SECTION objects. */ @@ -703,6 +704,7 @@ PMOD_EXPORT void pike_threads_disallow_ext (struct thread_state *ts #define th_cleanup() #define th_init_programs() #define th_self() NULL +#define th_kill(ID, sig) #define co_wait(X,Y) #define co_wait_timeout(X,Y,S,N) #define co_signal(X) diff --git a/src/threads.c b/src/threads.c index 4ddc955cca..c7081265b0 100644 --- a/src/threads.c +++ b/src/threads.c @@ -3158,7 +3158,8 @@ static void f_thread_id_interrupt(INT32 args) thread_interrupt_callback = add_to_callback(&evaluator_callbacks, check_thread_interrupt, 0, 0); } - /* FIXME: Actually interrupt the thread. */ + /* Actually interrupt the thread. */ + th_kill(THIS_THREAD->id, SIGCHLD); } THIS_THREAD->flags |= THREAD_FLAG_INTR; push_int(0); @@ -3172,7 +3173,8 @@ static void low_thread_kill (struct thread_state *th) thread_interrupt_callback = add_to_callback(&evaluator_callbacks, check_thread_interrupt, 0, 0); } - /* FIXME: Actually interrupt the thread. */ + /* Actually interrupt the thread. */ + th_kill(th->id, SIGCHLD); } th->flags |= THREAD_FLAG_TERM; } @@ -3218,16 +3220,16 @@ static void cleanup_thread_state (struct thread_state *th) if (th->status == THREAD_RUNNING || th->waiting) return; - if (THIS_THREAD->flags & THREAD_FLAG_SIGNAL_MASK) { + if (th->flags & THREAD_FLAG_SIGNAL_MASK) { Pike_interpreter.thread_state->flags &= ~THREAD_FLAG_SIGNAL_MASK; if (!--num_pending_interrupts) { remove_callback(thread_interrupt_callback); thread_interrupt_callback = NULL; - } + } } - co_destroy(& THIS_THREAD->status_change); - th_destroy(& THIS_THREAD->id); + co_destroy(& th->status_change); + th_destroy(& th->id); } void exit_thread_obj(struct object *UNUSED(o))