-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlpjs_compd.c
848 lines (737 loc) · 26.3 KB
/
lpjs_compd.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
/***************************************************************************
* Description:
* LPJS compute node daemon. Checks in with lpfs-dispatchd to signal
* that node is up and starts computational processes on compute nodes.
*
* History:
* Date Name Modification
* 2021-09-30 Jason Bacon Begin
***************************************************************************/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sysexits.h>
#include <signal.h>
#include <errno.h>
#include <sys/socket.h>
#include <poll.h>
#include <stdbool.h>
#include <sys/stat.h> // S_ISDIR()
#include <pwd.h> // getpwnam()
#include <grp.h> // getgrnam()
#include <fcntl.h> // open()
#include <signal.h>
#include <sys/wait.h>
#include <xtend/string.h>
#include <xtend/proc.h>
#include <xtend/file.h> // xt_rmkdir()
#include "lpjs.h"
#include "node-list.h"
#include "config.h"
#include "network.h"
#include "misc.h"
#include "job.h"
#include "lpjs_compd.h"
int main (int argc, char *argv[])
{
// Terminates process if malloc() fails, no check required
node_list_t *node_list = node_list_new();
// Terminates process if malloc() fails, no check required
node_t *node = node_new();
char *munge_payload,
vis_msg[LPJS_MSG_LEN_MAX + 1];
ssize_t bytes;
int compd_msg_fd;
struct pollfd poll_fd;
extern FILE *Log_stream;
uid_t uid;
gid_t gid;
if ( argc > 2 )
{
fprintf (stderr, "Usage: %s [--daemonize|--log-output]\n", argv[0]);
return EX_USAGE;
}
else if ( (argc == 2) && (strcmp(argv[1],"--daemonize") == 0 ) )
{
if ( (Log_stream = lpjs_log_output(LPJS_COMPD_LOG, "w")) == NULL )
return EX_CANTCREAT;
/*
* Code run after this must not attempt to write to stdout or stderr
* since they will be closed. Use lpjs_log() for all informative
* messages.
* FIXME: Prevent unchecked log growth
*/
xt_daemonize(0, 0);
}
else if ( (argc == 2) && (strcmp(argv[1],"--log-output") == 0 ) )
{
// FIXME: Log_stream should use close on exec (fork chaperone)
if ( (Log_stream = lpjs_log_output(LPJS_COMPD_LOG, "w")) == NULL )
return EX_CANTCREAT;
}
else
Log_stream = stderr;
#ifdef __linux__ // systemd needs a pid file for forking daemons
// FIXME: Make sure Pid_path is removed no matter where the program exits
int status;
extern char Pid_path[PATH_MAX + 1];
if ( xt_rmkdir(LPJS_RUN_DIR, 0755) != 0 )
return EX_CANTCREAT;
snprintf(Pid_path, PATH_MAX + 1, "%s/lpjs_compd.pid", LPJS_RUN_DIR);
status = xt_create_pid_file(Pid_path, Log_stream);
if ( status != EX_OK )
return status;
#endif
// Get hostname of head node
lpjs_load_config(node_list, LPJS_CONFIG_HEAD_ONLY, Log_stream);
compd_msg_fd = lpjs_compd_checkin_loop(node_list, node);
poll_fd.fd = compd_msg_fd;
// POLLERR and POLLHUP are actually always set. Listing POLLHUP here just
// for documentation.
poll_fd.events = POLLIN | POLLHUP;
// Now keep daemon running, awaiting jobs
// Almost correct: https://unix.stackexchange.com/questions/581426/how-to-get-notified-when-the-other-end-of-a-socketpair-is-closed
while ( true )
{
// Just poll the dedicated socket connection with dispatchd
// Time out after 2 seconds
poll(&poll_fd, 1, 2000);
// dispatchd closed its end of the socket?
if (poll_fd.revents & POLLHUP)
{
poll_fd.revents &= ~POLLHUP;
// Close this end, or dispatchd gets "address already in use"
// When trying to restart
close(compd_msg_fd);
lpjs_log("%s(): Error: Lost connection to dispatchd: HUP received.\n",
__FUNCTION__);
sleep(LPJS_RETRY_TIME); // No point trying immediately after drop
compd_msg_fd = lpjs_compd_checkin_loop(node_list, node);
}
if (poll_fd.revents & POLLERR)
{
poll_fd.revents &= ~POLLERR;
lpjs_log("%s(): Error: Problem polling dispatchd: %s\n",
__FUNCTION__, strerror(errno));
break;
}
if (poll_fd.revents & POLLIN)
{
poll_fd.revents &= ~POLLIN;
// FIXME: Add a timeout and handling code
lpjs_log("%s(): New message from dispatchd.\n", __FUNCTION__);
bytes = lpjs_recv_munge(compd_msg_fd, &munge_payload, 0, 0,
&uid, &gid, close);
if ( bytes < 0 )
{
// FIXME: Not sure what this actually means
// Do more digging and decide what to do about negative codes
close(compd_msg_fd);
lpjs_log("%s(): Error: Got %zd bytes from dispatchd. Something is wrong.\n",
__FUNCTION__, bytes);
poll_fd.revents = 0;
compd_msg_fd = lpjs_compd_checkin_loop(node_list, node);
}
else if ( bytes == 0 )
{
/*
* Likely lost connection due to crash or other ungraceful
* event. Close connection so that dispatchd doesn't hang
* with "address already in use".
*/
lpjs_log("%s(): Error: 0 bytes received from dispatchd. Disconnecting...\n",
__FUNCTION__);
close(compd_msg_fd);
poll_fd.revents = 0;
compd_msg_fd = lpjs_compd_checkin_loop(node_list, node);
}
else
{
munge_payload[bytes] = '\0';
xt_strviscpy((unsigned char *)vis_msg,
(unsigned char *)munge_payload, LPJS_MSG_LEN_MAX + 1);
// lpjs_debug("Received %zd bytes from dispatchd: \"%s\"\n", bytes, vis_msg);
if ( munge_payload[0] == LPJS_EOT )
{
// Close this socket end first, or dispatchd gets
// "address already in use" when trying to restart
lpjs_log("%s(): Dispatchd sent EOT. Closing connection.\n",
__FUNCTION__);
close(compd_msg_fd);
sleep(LPJS_RETRY_TIME); // No point trying immediately after drop
// Ignore HUP that follows EOT
// FIXME: This might be bad timing
poll_fd.revents &= ~POLLHUP;
compd_msg_fd = lpjs_compd_checkin_loop(node_list, node);
}
else if ( munge_payload[0] == LPJS_COMPD_REQUEST_NEW_JOB )
{
// Terminates process if malloc() fails, no check required
job_t *job = job_new();
char *script_buff;
lpjs_log("%s(): LPJS_COMPD_REQUEST_NEW_JOB\n", __FUNCTION__);
/*
* Message from dispatch contains the job specs
* followed by the job script text.
*/
job_read_from_string(job, munge_payload + 1, &script_buff);
job_print_full_specs(job, Log_stream);
/*
* lpjs_run_chaperone() forks, and the child process
* sends a response to lpjs_compd, depending on
* whether the dispatch succeeds.
*/
lpjs_run_chaperone(job, script_buff, compd_msg_fd, node_list);
}
else if ( munge_payload[0] == LPJS_COMPD_REQUEST_CANCEL )
{
pid_t chaperone_pid;
char *end;
lpjs_log("%s(): LPJS_COMPD_REQUEST_CANCEL\n", __FUNCTION__);
lpjs_debug("%s(): Payload = %s\n", __FUNCTION__,
munge_payload + 1);
chaperone_pid = strtoul(munge_payload + 1, &end, 10);
if ( *end != '\0' )
lpjs_log("%s(): Bug: Malformed cancel payload.\n",
__FUNCTION__);
else
{
lpjs_log("%s(): Sending SIGHUP to %d...\n",
__FUNCTION__, chaperone_pid);
kill(chaperone_pid, SIGHUP);
// FIXME: Verify termination
}
}
free(munge_payload);
}
}
}
close(compd_msg_fd);
return EX_IOERR;
}
int lpjs_compd_checkin(int compd_msg_fd, node_t *node)
{
char outgoing_msg[LPJS_MSG_LEN_MAX + 1],
*munge_payload,
specs[NODE_SPECS_LEN + 1];
ssize_t bytes;
uid_t uid;
gid_t gid;
extern FILE *Log_stream;
/* Send a message to the server */
/* Need to send \0, so xt_dprintf() doesn't work here */
node_detect_specs(node);
snprintf(outgoing_msg, LPJS_MSG_LEN_MAX + 1,
"%c%s %s", LPJS_DISPATCHD_REQUEST_COMPD_CHECKIN,
VERSION, node_specs_to_str(node, specs, NODE_SPECS_LEN + 1));
lpjs_log("%s(): Sending node specs:\n", __FUNCTION__);
node_print_specs_header(Log_stream);
fprintf(Log_stream, "%s\n", outgoing_msg + 1);
if ( lpjs_send_munge(compd_msg_fd, outgoing_msg, close) != LPJS_MSG_SENT )
{
close(compd_msg_fd);
lpjs_log("%s(): Error: Failed to send checkin message to dispatchd: %s\n",
__FUNCTION__, strerror(errno));
lpjs_log("%s(): Sleeping %d seconds...\n",
__FUNCTION__, LPJS_RETRY_TIME);
sleep(LPJS_RETRY_TIME);
return EX_IOERR;
}
lpjs_log("%s(): Sent checkin request.\n", __FUNCTION__);
// FIXME: Add a timeout and handling code
bytes = lpjs_recv_munge(compd_msg_fd, &munge_payload, 0, 0, &uid, &gid, close);
if ( bytes < 1 )
{
lpjs_log("%s(): Error: Unable to read response.\nExiting.\n",
__FUNCTION__);
exit(EX_IOERR); // FIXME: Should we retry?
}
else if ( strcmp(munge_payload, LPJS_WRONG_VERSION_MSG) == 0 )
{
close(compd_msg_fd);
lpjs_log("%s(): Error: This node is running an incompatible LPJS version.\n",
__FUNCTION__);
// Sleep a long time, since the only fix is an upgrade on one
// end or the other.
lpjs_log("%s(): Sleeping %d seconds...\n",
__FUNCTION__, LPJS_WRONG_VERSION_RETRY_TIME);
sleep(LPJS_WRONG_VERSION_RETRY_TIME);
return EX_IOERR;
}
// FIXME: Assuming LPJS_NODE_NOT_AUTHORIZED_MSG here
else if ( strcmp(munge_payload, LPJS_NODE_AUTHORIZED_MSG) != 0 )
{
lpjs_log("%s(): Error: This node is not authorized to connect.\n"
"It must be added to the etc/lpjs/config on the head node.\n",
"Exiting.\n",
__FUNCTION__);
exit(EX_NOPERM);
}
else
lpjs_log("%s(): Received authorization from lpjs_dispatchd.\n",
__FUNCTION__);
free(munge_payload);
return EX_OK;
}
/***************************************************************************
* Description:
* Connect to dispatchd and send checkin request.
* Retry indefinitely if failure occurs.
*
* Returns:
* File descriptor for ongoing connection to dispatchd.
*
* History:
* Date Name Modification
* 2024-01-23 Jason Bacon Begin
***************************************************************************/
int lpjs_compd_checkin_loop(node_list_t *node_list, node_t *node)
{
int compd_msg_fd,
status;
// Does not return until successful
compd_msg_fd = lpjs_dispatchd_connect_loop(node_list);
// Retry checking request indefinitely
while ( (status = lpjs_compd_checkin(compd_msg_fd, node)) != EX_OK )
{
// In case failure is due to disconnect
close(compd_msg_fd);
compd_msg_fd = lpjs_dispatchd_connect_loop(node_list);
}
lpjs_log("%s(): Checkin successful.\n", __FUNCTION__);
return compd_msg_fd;
}
/***************************************************************************
* Description:
* Save script copy, create log dir
*
* Returns:
* LPJS_SUCCESS, etc.
*
* History:
* Date Name Modification
* 2024-05-10 Jason Bacon Factored out from lpjs_run_script()
***************************************************************************/
int lpjs_working_dir_setup(job_t *job, const char *script_buff,
char *job_script_name, size_t maxlen)
{
char temp_wd[PATH_MAX + 1 - 20],
start_wd[PATH_MAX + 1 - 20],
log_dir[PATH_MAX + 1],
shared_fs_marker[LPJS_SHARED_FS_MARKER_MAX + 1],
shared_fs_marker_path[PATH_MAX + 1],
*working_dir,
marker[PATH_MAX + 1];
int fd;
// FIXME: Break out new functions for this
struct stat st;
extern FILE *Log_stream;
/*
* Go to same directory from which job was submitted
* if it exists here (likely using NFS), otherwise
* go to user's home dir.
*/
working_dir = job_get_submit_dir(job);
lpjs_get_marker_filename(shared_fs_marker, job_get_submit_node(job),
PATH_MAX + 1);
snprintf(shared_fs_marker_path, PATH_MAX + 1, "%s/%s",
working_dir, shared_fs_marker);
lpjs_log("%s(): Checking for %s...\n", __FUNCTION__, shared_fs_marker_path);
if ( stat(shared_fs_marker_path, &st) != 0 )
{
struct passwd *pw_ent;
// Use pwnam_r() if multithreading, not likely
if ( (pw_ent = getpwnam(job_get_user_name(job))) == NULL )
{
lpjs_log("%s(): Error: No such user: %s\n",
__FUNCTION__, job_get_user_name(job));
// FIXME: Report job failure to dispatchd
}
else
{
// Place temp working dirs in user's home dir
// FIXME: Check for failures
chdir(pw_ent->pw_dir);
// FIXME: Remove LPJS-job-* from previous submissions
// This should replace temp workdir removal in chaperone
snprintf(temp_wd, PATH_MAX + 1 - 20, "LPJS-job-%lu",
job_get_job_id(job));
lpjs_log("%s(): %s does not exist. Using temp dir %s.\n",
__FUNCTION__, working_dir, temp_wd);
// If temp_wd exists, rename it first
// This will only happen if the job ID is duplicated
if ( stat(temp_wd, &st) == 0 )
{
char save_wd[PATH_MAX + 1];
int c = 0;
do
{
snprintf(save_wd, PATH_MAX + 1, "%s.%d", temp_wd, c++);
} while ( stat(save_wd, &st) == 0 );
rename(temp_wd, save_wd);
}
mkdir(temp_wd, 0700);
// Mark this directory
// FIXME: Check time stamps on markers and remove them if expired
snprintf(marker, PATH_MAX + 1, "%s/lpjs-remove-me", temp_wd);
if ( (fd = open(marker, O_WRONLY|O_CREAT, 0644)) != -1 )
close(fd);
working_dir = temp_wd;
}
}
// FIXME: getcwd() hangs on NetBSD when running as a service
// chdir() to $HOME beforehand fixes the issue
xt_get_home_dir(start_wd, PATH_MAX + 1 - 20);
chdir(start_wd);
lpjs_log("%s(): Changing from %s to %s...\n",
__FUNCTION__, start_wd, working_dir);
if ( chdir(working_dir) != 0 )
{
lpjs_log("%s(): Error: Failed to enter working dir: %s\n",
__FUNCTION__, working_dir);
// FIXME: Check for actual reason
return EX_NOPERM;
}
// Oddly, chdir() indicates success, but getcwd() fails on macOS
// due to lpjs_compd not having full disk access permission
if ( getcwd(temp_wd, PATH_MAX + 1 - 20) == NULL )
{
lpjs_log("%s(): Error: getcwd() failed: errno = %s\n",
__FUNCTION__, strerror(errno));
#ifdef __APPLE__
lpjs_log("You may need to grant lpjs_compd full disk access in\n"
"System Preferences -> Privacy and Security. This access\n"
"will be revoked when LPJS is updated. If you find\n"
"that you have to repeatedly reset it, please report the\n"
"problem to Apple via the developer feedback assistant.\n"
"They need to hear from multiple people before they will\n"
"take the issue seriously.\n");
#endif
return LPJS_CHAPERONE_OSERR;
}
else
lpjs_log("%s(): Confirmed in %s.\n", __FUNCTION__, temp_wd);
/*
* Save script
*/
lpjs_job_log_dir(job_get_log_dir(job), job_get_job_id(job),
log_dir, PATH_MAX + 1);
xt_rmkdir(log_dir, 0700);
snprintf(job_script_name, maxlen, "%s/%s",
log_dir, job_get_script_name(job));
lpjs_log("%s(): Saving job script to %s.\n",
__FUNCTION__, job_script_name);
if ( (fd = open(job_script_name, O_WRONLY|O_CREAT|O_TRUNC, 0700)) == -1 )
{
lpjs_log("%s(): Error: Cannot create %s: %s\n",
__FUNCTION__, job_script_name, strerror(errno));
// FIXME: Report job failure to dispatchd
}
write(fd, script_buff, strlen(script_buff));
close(fd);
/*
* FIXME: Update node status (keep a copy here in case
* dispatchd is restarted)
*/
return LPJS_SUCCESS;
}
/***************************************************************************
* Description:
* Attempt to send job completion report to dispatchd
*
* Returns:
* EX_OK on success, EX_IOERR otherwise
*
* History:
* Date Name Modification
* 2024-12-07 Jason Bacon Adapt from lpjs_chaperone_completion
***************************************************************************/
int lpjs_send_chaperone_status(int msg_fd, unsigned long job_id,
chaperone_status_t chaperone_status)
{
char outgoing_msg[LPJS_MSG_LEN_MAX + 1],
hostname[sysconf(_SC_HOST_NAME_MAX) + 1];
lpjs_log("%s(): job_id %lu sending status %d on fd %d\n", __FUNCTION__,
job_id, chaperone_status, msg_fd);
/* Send job completion message to dispatchd */
gethostname(hostname, sysconf(_SC_HOST_NAME_MAX));
snprintf(outgoing_msg, LPJS_MSG_LEN_MAX + 1, "%c%lu %d %s",
LPJS_DISPATCHD_REQUEST_CHAPERONE_STATUS, job_id,
chaperone_status, hostname);
lpjs_debug("%s(): msg = %s\n", __FUNCTION__, outgoing_msg + 1);
if ( lpjs_send_munge(msg_fd, outgoing_msg, close) != LPJS_MSG_SENT )
{
lpjs_log("%s(): Error: Failed to send message to dispatchd: %s\n",
__FUNCTION__, strerror(errno));
close(msg_fd);
return LPJS_WRITE_FAILED;
}
lpjs_debug("%s(): Status %d sent by job_id %lu.\n",
__FUNCTION__, chaperone_status, job_id);
return EX_OK; // FIXME: Use LPJS return values
}
/***************************************************************************
* Description:
* Connect to dispatchd and send checkin request.
* Retry indefinitely if failure occurs.
*
* Returns:
* File descriptor for ongoing connection to dispatchd.
*
* History:
* Date Name Modification
* 2024-12-07 Jason Bacon Adapt from lpjs_chaperone_completion_loop
***************************************************************************/
int lpjs_send_chaperone_status_loop(node_list_t *node_list,
unsigned long job_id,
chaperone_status_t chaperone_status)
{
int msg_fd, send_status;
lpjs_debug("%s(): job_id %lu sendind status %d\n", __FUNCTION__,
job_id, chaperone_status);
// Retry socket connection and message send indefinitely
do
{
msg_fd = lpjs_connect_to_dispatchd(node_list);
if ( msg_fd == -1 )
{
lpjs_log("%s(): Error: Failed to connect to dispatchd: %s\n",
__FUNCTION__, strerror(errno));
lpjs_log("%s(): Retry in %d seconds...\n",
__FUNCTION__, LPJS_RETRY_TIME);
sleep(LPJS_RETRY_TIME);
}
else
{
send_status = lpjs_send_chaperone_status(msg_fd, job_id,
chaperone_status);
if ( send_status != EX_OK )
{
lpjs_log("%s(): Error: Message send failed. Retry in %d seconds...\n",
__FUNCTION__, LPJS_RETRY_TIME);
sleep(LPJS_RETRY_TIME);
}
close(msg_fd);
}
lpjs_debug("%s(): msg_fd = %d send_status = %d\n",
__FUNCTION__, msg_fd, send_status);
} while ( (msg_fd == -1) || (send_status != EX_OK) );
lpjs_debug("%s(): Chaperone status %d sent.\n", __FUNCTION__, chaperone_status);
return 0; // FIXME: Define return codes
}
/***************************************************************************
* Description:
*
* Returns:
*
* History:
* Date Name Modification
* 2024-03-10 Jason Bacon Begin
***************************************************************************/
int lpjs_run_chaperone(job_t *job, const char *script_buff,
int compd_msg_fd, node_list_t *node_list)
{
char *chaperone_bin = PREFIX "/libexec/lpjs/chaperone",
chaperone_response[LPJS_MSG_LEN_MAX + 1],
job_script_name[PATH_MAX + 1],
out_file[PATH_MAX + 1],
err_file[PATH_MAX + 1];
unsigned long job_id = job_get_job_id(job);
extern FILE *Log_stream;
pid_t chaperone_pid;
signal(SIGCHLD, sigchld_handler);
/*
* Child process must tell lpjs_compd whether chaperone was
* successfully launched. Script failures are reported by
* chaperone directly to lpjs_dispatchd.
*/
if ( (chaperone_pid = fork()) == 0 )
{
/*
* Child: This is now the chaperone process.
* exec() the chaperone command with the script as an argument.
* The chaperone runs in the background, monitoring the job,
* enforcing resource limits, and reporting exit status and
* stats to dispatchd.
*
* Exit with EX_OSERR to indicate a problem with the node.
* This will cause LPJS to take the node down.
* Permission problems, etc. should return other codes, which
* merely lead to job failure.
*/
// We don't want chaperone and its children to inherit
// the socket connection between dispatchd and compd.
// The parent process lpjs_compd will continue to use it,
// but we're done with it here.
close(compd_msg_fd);
// If lpjs_compd is running as root, use setuid() to switch
// to submitting user
if ( getuid() == 0 )
{
struct passwd *pw_ent;
uid_t uid;
struct group *gr_ent;
gid_t gid;
char *user_name, *group_name;
uid = getuid();
gid = getgid();
user_name = job_get_user_name(job);
if ( (pw_ent = getpwnam(user_name)) == NULL )
{
lpjs_log("%s(): Error: %s: No such user.\n", __FUNCTION__, user_name);
lpjs_send_chaperone_status_loop(node_list, job_id,
LPJS_CHAPERONE_OSERR);
exit(EX_OSERR);
}
group_name = job_get_primary_group_name(job);
if ( (gr_ent = getgrnam(group_name)) == NULL )
{
lpjs_log("%s(): Info: %s: No such group.\n", __FUNCTION__, group_name);
gid = pw_ent->pw_gid;
}
else
gid = gr_ent->gr_gid;
// Set gid before uid, while still running as root
if ( setgid(gid) != 0 )
lpjs_log("%s(): Info: Failed to set gid to %u.\n", __FUNCTION__, gid);
uid = pw_ent->pw_uid;
if ( setuid(uid) != 0 )
{
lpjs_log("%s(): Error: Failed to set uid to %u.\n", __FUNCTION__, uid);
lpjs_send_chaperone_status_loop(node_list, job_id,
LPJS_CHAPERONE_OSERR);
exit(EX_OSERR);
}
lpjs_log("%s(): user = %s group = %s\n", __FUNCTION__,
user_name, group_name);
lpjs_log("%s(): uid = %u gid = %u\n", __FUNCTION__, uid, gid);
}
/*
* Set LPJS_USER, LPJS_SUBMIT_HOST, etc. for chaperone and
* job scripts
*/
job_setenv(job);
if ( lpjs_working_dir_setup(job, script_buff, job_script_name,
PATH_MAX + 1) != LPJS_SUCCESS )
{
// FIXME: Take node down and reschedule jobs elsewhere
// FIXME: Terminating here causes dispatchd to crash
// dispatchd should be able to tolerate lost connections at any time
lpjs_log("%s(): Error: lpjs_working_dir_setup() failed.\n", __FUNCTION__);
lpjs_send_chaperone_status_loop(node_list, job_id,
LPJS_CHAPERONE_OSERR);
exit(EX_OSERR);
}
// FIXME: Make sure filenames are not truncated
// Redirect stdout
strlcpy(out_file, job_script_name, PATH_MAX + 1);
strlcat(out_file, ".stdout", PATH_MAX + 1);
close(1);
if ( open(out_file, O_WRONLY|O_CREAT, 0644) == -1 )
{
lpjs_log("%s(): Error: Could not open %s: %s\n", __FUNCTION__,
out_file, strerror(errno));
lpjs_send_chaperone_status_loop(node_list, job_id,
LPJS_CHAPERONE_CANTCREAT);
exit(EX_CANTCREAT);
}
// Redirect stderr
strlcpy(err_file, job_script_name, PATH_MAX + 1);
strlcat(err_file, ".stderr", PATH_MAX + 1);
close(2);
if ( open(err_file, O_WRONLY|O_CREAT, 0644) == -1 )
{
lpjs_log("%s(): Error: Could not open %s: %s\n", __FUNCTION__,
err_file, strerror(errno));
lpjs_send_chaperone_status_loop(node_list, job_id,
LPJS_CHAPERONE_CANTCREAT);
exit(EX_CANTCREAT);
}
// Note: This will be redirected to err_file
// lpjs_log("%s(): Running chaperone: %s %s...\n", __FUNCTION__,
// chaperone_bin, job_script_name);
// FIXME: Build should use realpath
// FIXME: This assumes execl() will succeed, which is all but certain.
// It would be better to send msg_fd value to chaperone and let
// it respond to dispatchd, or send a failure message after execl().
lpjs_send_chaperone_status_loop(node_list, job_id, LPJS_CHAPERONE_OK);
lpjs_debug("%s(): Execing %s\n", __FUNCTION__, chaperone_bin);
execl(chaperone_bin, chaperone_bin, job_script_name, NULL);
// We only get here if execl() failed
// Note: This will be redirected to err_file
lpjs_log("%s(): Error: Failed to exec %s %u %u %s\n",
__FUNCTION__, chaperone_bin, job_script_name);
// See FIXME above
lpjs_send_chaperone_status_loop(node_list, job_id, LPJS_CHAPERONE_EXEC_FAILED);
exit(EX_SOFTWARE);
}
else
{
/*
* Send verification that the chaperone process started
* back to lpjs_dispatchd immediately, so it can resume
* listening for new events. The work that follows
* (creating directories, redirecting, running the script,
* etc) can take a while on a busy compute node, and we
* don't want dispatchd stuck waiting.
*/
lpjs_debug("%s(): Sending chaperone forked verification, pid = %d.\n",
__FUNCTION__, chaperone_pid);
snprintf(chaperone_response, LPJS_MSG_LEN_MAX + 1,
"%c%d", LPJS_CHAPERONE_FORKED, chaperone_pid);
if ( lpjs_send_munge(compd_msg_fd, chaperone_response, close)
!= LPJS_MSG_SENT )
{
lpjs_log("%s(): Error: Failed to send chaperone forked verification.\n",
__FUNCTION__);
close(compd_msg_fd);
exit(EX_UNAVAILABLE);
}
lpjs_debug("%s(): Verification sent.\n", __FUNCTION__);
}
/*
* lpjs_compd does not wait for chaperone, but resumes listening
* for more jobs.
*/
return EX_OK;
}
void lpjs_chown(job_t *job, const char *path)
{
struct passwd *pw_ent;
struct group *gr_ent;
// FIXME: Use getpwnam_r() if multithreading, unlikely
// FIXME: Terminate job if this fails
pw_ent = getpwnam(job_get_user_name(job));
lpjs_log("%s(): User %u (must be root) changing ownership of %s to user %u.\n",
__FUNCTION__, getuid(), path, pw_ent->pw_uid);
if ( chown(path, pw_ent->pw_uid, -1) != 0 )
lpjs_log("%s(): Error: chown() failed.\n", __FUNCTION__);
// It's OK if this fails, groups may differ on different nodes
if ( (gr_ent = getgrnam(job_get_primary_group_name(job))) != NULL )
{
lpjs_log("%s(): User %u changing group ownership of %s to %u.\n",
__FUNCTION__, getuid(), path, gr_ent->gr_gid);
if ( chown(path, -1, gr_ent->gr_gid) != 0 )
lpjs_log("%s(): Error: chown() failed.\n", __FUNCTION__);
}
else
lpjs_log("%s(): Info: No %s group on this host.\n",
__FUNCTION__, job_get_primary_group_name(job));
}
/***************************************************************************
* Description:
* Per Alma sigaction man page, catching SIGCHLD and calling wait()
* is the only fully portable way to avoid zombie chaperone processes.
* Modern systems can just explictly set signal(SIGCHLD, SIG_IGN),
* but using a handler seems safer.
*
* History:
* Date Name Modification
* 2024-05-04 Jason Bacon Begin
***************************************************************************/
void sigchld_handler(int s2)
{
int status;
wait(&status);
}