Skip to content

Commit

Permalink
Merge branch 'patches/interruptable_stdio' into 8.0
Browse files Browse the repository at this point in the history
* patches/interruptable_stdio:
  Stdio.File: Support closing from a concurrent thread.
  Threads: Fix clean up on exit for late threads.
  Threads: interrupt() and kill() now actually attempt to interrupt the thread.
  • Loading branch information
grubba committed Dec 4, 2024
2 parents b4c28a7 + 66a0391 commit 472577b
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 16 deletions.
177 changes: 167 additions & 10 deletions src/modules/_Stdio/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,23 @@ static void close_fd_quietly(void)
}
break;
}

#ifdef _REENTRANT
#ifdef SIGCHLD
if (THIS->flags & FILE_BUSY) {
/* The fd appears to be busy in some other thread.
*
* Force that syscall to fail with EINTR.
*/
if (THIS->flags & FILE_RBUSY) {
th_kill(THIS->rthread, SIGCHLD);
}
if (THIS->flags & FILE_WBUSY) {
th_kill(THIS->wthread, SIGCHLD);
}
}
#endif
#endif
}

static void close_fd(void)
Expand Down Expand Up @@ -626,6 +643,23 @@ static void close_fd(void)
}
break;
}

#ifdef _REENTRANT
#ifdef SIGCHLD
if (THIS->flags & FILE_BUSY) {
/* The fd appears to be busy in some other thread.
*
* Force that syscall to fail with EINTR.
*/
if (THIS->flags & FILE_RBUSY) {
th_kill(THIS->rthread, SIGCHLD);
}
if (THIS->flags & FILE_WBUSY) {
th_kill(THIS->wthread, SIGCHLD);
}
}
#endif
#endif
}

void my_set_close_on_exec(int fd, int to)
Expand Down Expand Up @@ -727,6 +761,11 @@ static struct pike_string *do_read(int fd,
do{
int fd=FD;
int e;

if (fd < 0) {
/* Closed from another thread? */
break;
}
THREADS_ALLOW();

try_read = i;
Expand Down Expand Up @@ -805,9 +844,15 @@ static struct pike_string *do_read(int fd,
SET_ONERROR(ebuf, free_dynamic_buffer, &b);
i = all && !INT32_MUL_OVERFLOW(r, 2) ? DIRECT_BUFSIZE : READ_BUFFER;
do{
int fd=FD;
int e;
char *buf;

if (fd < 0) {
/* Closed from another thread? */
break;
}

try_read = i;

buf = low_make_buf_space(try_read, &b);
Expand Down Expand Up @@ -1068,6 +1113,11 @@ static struct pike_string *do_recvmsg(INT32 r, int all)
do{
int fd=FD;
int e;

if (fd < 0) {
/* Closed from another thread? */
break;
}
#ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
/* XPG 4.2 */
message.msg.msg_control = &message.cmsgbuf;
Expand Down Expand Up @@ -1145,7 +1195,13 @@ static struct pike_string *do_recvmsg(INT32 r, int all)
initialize_buf(&b);
SET_ONERROR(ebuf, free_dynamic_buffer, &b);
do{
int fd=FD;
int e;

if (fd < 0) {
/* Closed from another thread? */
break;
}
try_read=MINIMUM(CHUNK,r);

#ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
Expand Down Expand Up @@ -1290,6 +1346,11 @@ static struct pike_string *do_read_oob(int UNUSED(fd),
do{
int e;
int fd=FD;

if (fd < 0) {
/* Closed from another thread? */
break;
}
THREADS_ALLOW();
i=fd_recv(fd, str->str+bytes_read, r, MSG_OOB);
e=errno;
Expand Down Expand Up @@ -1431,6 +1492,14 @@ static void file_read(INT32 args)
all=1;
}

#ifdef _REENTRANT
if (THIS->flags & FILE_RBUSY) {
Pike_error("File in use by another thread.\n");
}
THIS->flags |= FILE_RBUSY;
THIS->rthread = th_self();
#endif

pop_n_elems(args);

#ifdef HAVE_PIKE_SEND_FD
Expand All @@ -1452,6 +1521,10 @@ static void file_read(INT32 args)
push_int(0);
}

#ifdef _REENTRANT
THIS->flags &= ~FILE_RBUSY;
#endif

if (!(THIS->open_mode & FILE_NONBLOCKING))
INVALIDATE_CURRENT_TIME();

Expand Down Expand Up @@ -1553,6 +1626,14 @@ static void file_peek(INT32 args)
fds.events=POLLIN;
fds.revents=0;

#ifdef _REENTRANT
if (THIS->flags & FILE_RBUSY) {
Pike_error("File in use by another thread.\n");
}
THIS->flags |= FILE_RBUSY;
THIS->rthread = th_self();
#endif

if (timeout) {
THREADS_ALLOW();
ret=poll(&fds, 1, timeout);
Expand All @@ -1561,6 +1642,10 @@ static void file_peek(INT32 args)
ret=poll(&fds, 1, 0);
}

#ifdef _REENTRANT
THIS->flags &= ~FILE_RBUSY;
#endif

if(ret < 0)
{
ERRNO=errno;
Expand Down Expand Up @@ -1595,6 +1680,14 @@ static void file_peek(INT32 args)
tv.tv_usec=(int)(1000000*(tf-tv.tv_sec));
}

#ifdef _REENTRANT
if (THIS->flags & FILE_RBUSY) {
Pike_error("File in use by another thread.\n");
}
THIS->flags |= FILE_RBUSY;
THIS->rthread = th_self();
#endif

/* FIXME: Handling of EOF and not_eof */

if(tv.tv_sec || tv.tv_usec) {
Expand All @@ -1605,6 +1698,10 @@ static void file_peek(INT32 args)
else
ret = fd_select(ret+1,&tmp,0,0,&tv);

#ifdef _REENTRANT
THIS->flags &= ~FILE_RBUSY;
#endif

if(ret < 0)
{
ERRNO=errno;
Expand Down Expand Up @@ -1711,6 +1808,14 @@ static void file_read_oob(INT32 args)
all=1;
}

#ifdef _REENTRANT
if (THIS->flags & FILE_RBUSY) {
Pike_error("File in use by another thread.\n");
}
THIS->flags |= FILE_RBUSY;
THIS->rthread = th_self();
#endif

pop_n_elems(args);

if((tmp=do_read_oob(FD, len, all, & ERRNO)))
Expand All @@ -1720,6 +1825,10 @@ static void file_read_oob(INT32 args)
push_int(0);
}

#ifdef _REENTRANT
THIS->flags &= ~FILE_RBUSY;
#endif

if (!(THIS->open_mode & FILE_NONBLOCKING))
INVALIDATE_CURRENT_TIME();

Expand Down Expand Up @@ -1921,6 +2030,12 @@ static void file_write(INT32 args)
if(FD < 0)
Pike_error("File not open for write.\n");

#ifdef _REENTRANT
if (THIS->flags & FILE_WBUSY) {
Pike_error("File in use by another thread.\n");
}
#endif

if (TYPEOF(Pike_sp[-args]) == PIKE_T_ARRAY) {
struct array *a = Pike_sp[-args].u.array;

Expand Down Expand Up @@ -1977,6 +2092,10 @@ static void file_write(INT32 args)
}
}

#ifdef _REENTRANT
THIS->flags |= FILE_WBUSY;
THIS->wthread = th_self();
#endif
for(written = 0; iovcnt; check_signals(0,0,0)) {
int fd = FD;
int e;
Expand All @@ -1989,6 +2108,10 @@ static void file_write(INT32 args)
THIS->fd_info = NULL;
}
#endif
if (fd < 0) {
/* Closed from another thread? */
break;
}
THREADS_ALLOW();

#ifdef IOV_MAX
Expand Down Expand Up @@ -2068,13 +2191,11 @@ static void file_write(INT32 args)
}
}
}
}

#ifdef _REENTRANT
if (FD<0) {
free(iovbase);
Pike_error("File closed while in file->write.\n");
}
THIS->flags &= ~FILE_WBUSY;
#endif
}

free(iovbase);

Expand Down Expand Up @@ -2104,6 +2225,10 @@ static void file_write(INT32 args)
if(str->size_shift)
Pike_error("Stdio.File->write(): cannot output wide strings.\n");

#ifdef _REENTRANT
THIS->flags |= FILE_WBUSY;
THIS->wthread = th_self();
#endif
for(written=0;written < str->len;check_signals(0,0,0))
{
int fd=FD;
Expand All @@ -2117,6 +2242,10 @@ static void file_write(INT32 args)
THIS->fd_info = NULL;
}
#endif
if (fd < 0) {
/* Closed from another thread? */
break;
}
THREADS_ALLOW();
#ifdef HAVE_PIKE_SEND_FD
if (fd_info) {
Expand Down Expand Up @@ -2173,15 +2302,12 @@ static void file_write(INT32 args)
if(THIS->open_mode & FILE_NONBLOCKING)
break;
}
#ifdef _REENTRANT
if(FD<0) Pike_error("File closed while in file->write.\n");
#endif
}

#ifdef _REENTRANT
/* check_signals() may have done something... */
if(FD<0) Pike_error("File closed while in file->write.\n");
THIS->flags &= ~FILE_WBUSY;
#endif

/* Race: A backend in another thread might have managed to set these
* again for buffer space available after the write above. Not that
* bad - it will get through in a later backend round. */
Expand Down Expand Up @@ -2252,10 +2378,23 @@ static void file_write_oob(INT32 args)
if(str->size_shift)
Pike_error("Stdio.File->write_oob(): cannot output wide strings.\n");

#ifdef _REENTRANT
if (THIS->flags & FILE_WBUSY) {
Pike_error("File in use by another thread.\n");
}
THIS->flags |= FILE_WBUSY;
THIS->wthread = th_self();
#endif

while(written < str->len)
{
int fd=FD;
int e;

if (fd < 0) {
/* Closed from another thread? */
break;
}
THREADS_ALLOW();
i = fd_send(fd, str->str + written, str->len - written, MSG_OOB);
e=errno;
Expand Down Expand Up @@ -2296,6 +2435,10 @@ static void file_write_oob(INT32 args)
}
}

#ifdef _REENTRANT
THIS->flags &= ~FILE_WBUSY;
#endif

/* Race: A backend in another thread might have managed to set these
* again for buffer space available after the write above. Not that
* bad - it will get through in a later backend round. */
Expand Down Expand Up @@ -3028,6 +3171,13 @@ static void file_openat(INT32 args)
Pike_error("Must open file for at least one of read and write.\n");

do {
dir_fd = FD;
if (dir_fd < 0) {
/* Unlikely, but... */
fd = -1;
err = EBADF;
break;
}
THREADS_ALLOW_UID();
fd = openat(dir_fd, str->str, map(flags), access);
err = errno;
Expand Down Expand Up @@ -3553,6 +3703,7 @@ static void file_unlinkat(INT32 args)

THREADS_ALLOW_UID();
do {
/* FIXME: Handle concurrent close of dir_fd. */
i = fstatat(dir_fd, str->str, &st, AT_SYMLINK_NOFOLLOW);
} while ((i < 0) && (errno == EINTR));
if (i >= 0) {
Expand All @@ -3561,6 +3712,7 @@ static void file_unlinkat(INT32 args)
flag = AT_REMOVEDIR;
}
do {
/* FIXME: Handle concurrent close of dir_fd. */
i = unlinkat(dir_fd, str->str, flag);
} while ((i < 0) && (errno == EINTR));
}
Expand Down Expand Up @@ -3630,6 +3782,11 @@ static void file_get_dir(INT32 args)

if ((dfd == -1) && (errno == EINTR)) {
check_threads_etc();
fd = FD;
if (fd < 0) {
errno = EBADF;
break;
}
continue;
}
break;
Expand Down
Loading

0 comments on commit 472577b

Please sign in to comment.