From f6f2cf199cf743f76f8de7102a5be63b41147065 Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 18 Mar 2024 16:35:01 -0600 Subject: [PATCH] feat: support for batch and stage Signed-off-by: vsoch --- .gitignore | 1 + README.md | 67 +++++++++- examples/hello-world-batch.yaml | 37 ++++++ examples/hello-world-jobspec.yaml | 5 +- .../cli/__pycache__/__init__.cpython-310.pyc | Bin 2045 -> 0 bytes jobspec/cli/__pycache__/run.cpython-310.pyc | Bin 541 -> 0 bytes .../__pycache__/__init__.cpython-310.pyc | Bin 361 -> 0 bytes .../__pycache__/registry.cpython-310.pyc | Bin 3696 -> 0 bytes jobspec/steps/__init__.py | 1 + .../__pycache__/__init__.cpython-310.pyc | Bin 179 -> 0 bytes .../steps/__pycache__/base.cpython-310.pyc | Bin 1913 -> 0 bytes .../steps/__pycache__/fileio.cpython-310.pyc | Bin 1519 -> 0 bytes jobspec/steps/base.py | 21 +++ jobspec/steps/empty.py | 18 +++ jobspec/steps/helpers.py | 21 +++ jobspec/transform.py | 8 +- .../__pycache__/__init__.cpython-310.pyc | Bin 218 -> 0 bytes .../__pycache__/flux.cpython-310.pyc | Bin 2330 -> 0 bytes jobspec/transformer/flux.py | 120 ++++++++++++++++-- jobspec/utils.py | 38 ++++++ 20 files changed, 312 insertions(+), 25 deletions(-) create mode 100644 examples/hello-world-batch.yaml delete mode 100644 jobspec/cli/__pycache__/__init__.cpython-310.pyc delete mode 100644 jobspec/cli/__pycache__/run.cpython-310.pyc delete mode 100644 jobspec/plugin/__pycache__/__init__.cpython-310.pyc delete mode 100644 jobspec/plugin/__pycache__/registry.cpython-310.pyc delete mode 100644 jobspec/steps/__pycache__/__init__.cpython-310.pyc delete mode 100644 jobspec/steps/__pycache__/base.cpython-310.pyc delete mode 100644 jobspec/steps/__pycache__/fileio.cpython-310.pyc create mode 100644 jobspec/steps/empty.py create mode 100644 jobspec/steps/helpers.py delete mode 100644 jobspec/transformer/__pycache__/__init__.cpython-310.pyc delete mode 100644 jobspec/transformer/__pycache__/flux.cpython-310.pyc diff --git a/.gitignore b/.gitignore index f0ddc72..907868f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ build dist env .eggs +__pycache__ diff --git a/README.md b/README.md index 1e5d6ca..4dc7407 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,21 @@ for a specific clusters scheduler. If you think it looks too simple then I'd say ## Usage +A transformer provides one or more steps for the jobpsec to be transformed and understood for a particular +execution environment. They include: + +| Name | Description | +|--------|-------------| +| write | write a file in the staging directory | +| stage | stage a file across nodes | +| submit | submit the job | +| batch | submit the job with a batch command (more common in HPC) | +| auth | authenticate with some service | + +These are the basic steps that @vsoch needs now for scheduling experiments, and more can be added (or tweaked) if needed. + +### Steps + ### Example Start up the development environment to find yourself in a container with flux. Start a test instance: @@ -45,9 +60,24 @@ jobspec - it can be a file that the jobspec writes, and then the command is issu it there for the time being, mostly because it looks nicer. I'm sure someone will disagree with me about that. ```bash +# Example showing without watching (waiting) and showing output +jobspec run ./examples/hello-world-jobspec.yaml + +# Example that shows waiting for output +jobspec run ./examples/hello-world-wait-jobspec.yaml +# Example with batch using flux +jobspec run ./examples/hello-world-batch.yaml ``` +Note that the default transformer is flux, so the above are equivalent to: + +```bash +jobspec run -t flux ./examples/hello-world-wait-jobspec.yaml +jobspec run --transformer flux ./examples/hello-world-wait-jobspec.yaml +``` + + ### Details As an example, although you *could* submit a job with a command ready to go - assuming your cluster has the @@ -105,7 +135,9 @@ Jobspec is an entity of [flux-framework](https://flux-framework.org). #### Why not rely on Flux internals? -We want a Jobspec to be able to handle a transformation of some logic (the above) into an execution that might not involve flux at all. It could be another workload manager (e.g., Slurm) or it could be a service that submits to some cloud batch API. +If we lived in a universe of just flux, sure we wouldn't need this. But the world is more than Flux, and we want to extend our Jobspec to that world. +So we want a Jobspec to be able to handle a transformation of some logic (the above) into an execution that might not involve flux at all. It could be another workload manager (e.g., Slurm), +Kubernetes, or it could be a service that submits to some cloud batch API. #### What are all the steps allowed? @@ -130,12 +162,6 @@ There are several likely means of interacting with this library: For the example usage here, and since the project I am working on is concerned with Flux, we will start with the simplest case - a client that is running inside a flux instance (meaning it can import flux) that reads in a jobspec with a section that defines a set of transforms, and then issues the commands to stage the setup and use flux to run the work defined by the jobspec. -## TODO - - - write the hello world example with flux - - add the staging example - - write the same, but using batch - ## Developer ### Organization @@ -144,6 +170,33 @@ While you can write an external transformer (as a plugin) a set of core transfor - [jobspec/transformer](jobspec/transformer): core transformer classes that ship internally here. +### Writing a Transformer + +For now, the easiest thing to do is add a single file (named by your transformer) to [jobspec/transformer](jobspec/transformer) +and copy the precedence in the file. A transformer minimally is a class with a name, description, and some number of steps. +You can then use provided steps in [jobspec/steps](jobstep/steps) or use the `StepBase` to write your own. At the end of +your transformer file you simply need to register the steps you want to use: + +```python +# A transformer can register shared steps, or custom steps +Transformer.register_step(steps.WriterStep) +Transformer.register_step(batch) +Transformer.register_step(submit) +Transformer.register_step(stage) +``` + +If there is a skip you want the user to be able to define (but skip it for your transformer, for whatever reason you might have) +just register the empty step with the name you want to skip. As an example, let's say my transforer has no concept of a stage +(sharing a file across separate nodes) given that it has a shared filesystem. I might want to do: + +```python +import jobspec.steps as steps + +# This will not fail validation that the step is unknowb, but skip it +Transformer.register_step(steps.EmptyStep, name="stage") +``` + + ## License HPCIC DevTools is distributed under the terms of the MIT license. diff --git a/examples/hello-world-batch.yaml b/examples/hello-world-batch.yaml new file mode 100644 index 0000000..d94e8f0 --- /dev/null +++ b/examples/hello-world-batch.yaml @@ -0,0 +1,37 @@ +# TODO test these out on on x86, then create arm specs +version: 1 +resources: +- count: 4 + type: node + with: + - count: 1 + label: hello-world + type: slot + with: + - count: 4 + type: core + +task: + transform: + - step: write + filename: batch.sh + executable: true + + - step: batch + filename: batch.sh + # wait: true + + scripts: + - name: batch.sh + content: | + #!/bin/bash + flux submit -N 1 --watch echo what is the meaning + flux submit -N 1 --watch echo of all of this + count: + per_slot: 1 + resources: + hardware: + hardware.gpu.available: 'no' + io.archspec: + cpu.target: amd64 + slot: hello-world diff --git a/examples/hello-world-jobspec.yaml b/examples/hello-world-jobspec.yaml index 15f2402..faa5f3f 100644 --- a/examples/hello-world-jobspec.yaml +++ b/examples/hello-world-jobspec.yaml @@ -19,8 +19,9 @@ task: executable: true # This is only provided as an example - in the devcontainer it's just one physicap machine! - #- step: stage - # filename: install.sh + # and filemap I think will likely not work + # - step: stage + # filename: job.sh - step: submit filename: job.sh diff --git a/jobspec/cli/__pycache__/__init__.cpython-310.pyc b/jobspec/cli/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index 9547baf5fe50de82b139a110ac3d752f21db6807..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2045 zcmZuyOK;pZ5Ed!wvD)?e5hrclLX)&zG`m52=qU}FrcR2W4T3oB!6+ceOL{F^D{V>2 zj)mQm1GGK(KO{ge=}+RdK!N;%0)5X=UZ)AFUCx6X&J1VtQRTuyXQSa{gIJwzP|Mv77dXw?*xT@k!(GmvIvYOsxDPxKArE$`d`|?2 z4y=-OJ9^N!* z#^M1Fk08FCYw?J;-Svaj`2nNTjI+{Y(}PJ#{-)ce(H#VAJkTDtke9=kvpSP z3en!;Z}YSK96x`=zy@b7@QeHs=q|=ycY2o}nBac7sU~9Bqo&P;8p>>pDMg}XC3?&-@U0Eo z7Wu>kTx_LPu8p4#F&4W1uF8Zqo@MoURZS*RC>5uiXK0eHSpNtZ;o`f+m16!LS^U(}MSyY(aC3r!$=k$v~DNkoxrhdgu{rVq94j z^;s!~BHI%&_=#a6PUD?&bE2?z(nC+^uBLrmxBGo9(?Vel>y;BxNGuT4&9JS!dIH-p z8HsPV3BVWjpdx`h0H=Ds2~#v1~5$cW7bB*6f(7qE3xeN)uaO4GGzkhIJzm&Ub zk`9Hsah%r)ALiMOB*}_QC&^%PV4{)G$FAG3Sv>^82qld8e>0zg)cpKESU8s{0WEk< zJ6ZGp?D~x1ez5v$#ajYNUIM4DVG-Ma12L(72BMuq=SPPxeD7=#PPzNEHFGr4fQUoF z86bXj3Kqoq!Fk4>JKxe70R$nYyvmVRxv%}5fCo1bXCV)Fsk#nQOMa{)-QMXO65?q3 zY=M*KD0N|N(e7>?V>9Ft#JCUGZ|6(nQE?AGr_F+6%Z zQ!)JYwur}RR>aU_c`zxnLJzEgByg}HlKF$zARhS?4D~(`yn<$MOb*w@83%*G0o~_t zP|cosQwP)K6oR_-_C$i`7~y$6&Wd4PaSLaN{wAvI>yNksBtX9*FPjf0jld93G9B+{ zdOMbISIQzDmRvL`qe1JLI(_GyeV1Soc$j8R} zeC@NZ@0re5YwPQ`@2!2g_RZRtHUdDk0~XRpf^dM30mHt)SHX6ow_@KzcF4Cgt`jEXIAJUHeC9h2U z1H{B7X{(-e@BB{pp3hc;K@UNkz5j}Pg3xzE_5*t864EdLjyTTHq^me56M_*ZoPMGS z<@g2#&JRqXaoni3;lhDJUO}2u0FF7yP>$a~U6T*=Mr*nvHCoU$2Ikbx$^quCF}Tyl z_4TgDAg(LPBFV%{siU||l?>VKLuyzI&6!}al21aHg0oVwh!t8DLbG`#+u7D3oiix} z7d&jK#;{I+?QtT?DVRz#S2`DZszs8TQZL7WXNLA>gH{>*C@J%J=`fR z)s?gd8$sY&C(?*QZZ_rdY+Wb~}`qbZ_Tn9;^zSK}-lfpu6ct&>No z^g&@7=(`>sdPh_bfdr0q7Dh=s*sffaX$JSVr5$PS+Jgbetbd*>US;B<`3~a&_@qzz F^dD!1jWqxO diff --git a/jobspec/plugin/__pycache__/__init__.cpython-310.pyc b/jobspec/plugin/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index 4e83048dec119390d3c02f263035e423d98da6f9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 361 zcmY*Vy-ve05WaKLVnI8xAjAt~X*MQ=kk~r_Qx_}5CNd;(bap_U;Xz>JWq5$BOuPU{ zoj8}D!b$gi{?5MdPIlAjHgFuie3*CU*C_smU~$6f4;VCPtf9dLB4|;7PIUSJ3!%k1 zWTQ9aCm9aA7gV(V%28wJ%v9C~TFy(p`xnvpl+*7s1h(#hb^_i*!Rk>w0Wz~K&a-5Y z^zB2jZIIgf(%l+*1{!f;4Xasx=jg_FMQMC~?Jj+1%3tp4zOrqulx=NLYSt}>y~+d? zHoK~yA>+7~WRPNv5>uot7Fj|`oE0}%7dE56PO)u}M~%~cZI0t-_*j}inoPte5Zg{` diff --git a/jobspec/plugin/__pycache__/registry.cpython-310.pyc b/jobspec/plugin/__pycache__/registry.cpython-310.pyc deleted file mode 100644 index 45149dec640e40ac3d1d6063b1d771abe9733a49..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3696 zcmZ`+&5zs073YwY$fc-V?MLjyO%u9y>L{?~exx@yND&v!Ay9&D5Cn?^1XnY=%QZ#G zGn8Eu+DkU*x%a|=_UM0!*PeRrr7%#Wzc>7lR*@;p2WN&e@BQBIy*HY4yPkn({~!Mq z-*ye-U-Yv2Sa|sqMGa7KgR{gKvm=vH-%P9{OSkrs{TG8<-2Tzv_N95$;^v{zZ~YtJ z7_V62Oq1DZoT}l7238*vFQ1~QDJo$cF>V~0oMBt@$mSL{vU%%AQJ^w0RY5iho=@bHWD@aU z9Hr5zkbx4}Y&y_86`hq+3b*<;95LuzKh!5MK0{GHs=2W+t_;HY5D*UgtZ;d(Mw4ek z0#RFuoSM!57~D{Fs?YLwS7fY_ z7}N{jEGn@lgRtl9k{ue-!E4_v?1Ydc?#L~wNX)V#)r#J!$ih;vFpb7S^)UJ$62xQq z-A#h5e=wrO^+SCs8aonb&LEKS49IDLBu(~BNmGSQ2d)d`#|n_BEJ_l=_pwZpM0v1D zl+~rz#2|HT6fMSw=#UT#HfLAt!knAjm>F^}1H0$!pKzX+)+PIa&B;-$;cMqB?IJ0frv(DL8(LY&9K(I6=!0sif+Vt*nHNvOKWa5+ZHU8 zHzC>@2=W%j-bGPuRC~71?4R=6n{>an|24vsVOuELu-~B*1Ax%T*uvBh+`52C&n>x+ z+FrB(PWy_fgQrgBELf?Z2)MfyaGy6IJgz)mV^+E_ z04$MG!Sgsf3y98jIC}7i-actQANKu{J5URw{`UssGFB3;gH${Z;uNVu zh{{F-uUYr?nLuv9G>=bCgcRvW1ZQZ;-LtbOTl-QqA4XUcB~nEEML-!w@G`5AV`tr@0tnk95YNm1c7d(dx}KC-h0Na0H{M!GGXK;Fjc z!jWQ{Ak7vXvP6ZPB(|=jbr)lwp(x_MXA@2b){Qr$*&3;92oXAjlHTP29nhC>0D&po z&UM1=v;6%=<1gs`5v>POq+9EYshGwbV%6%*?q(4oMoKwO`tl9%%({!Da7M^X-?W9)~kl zSlHT8;i?o@xdRrs`P~K+WK7NS2A3rRwBNk$*7;VOKlko8PLz9eu`2@}E??p#nkAXa z-(Rk{R}bE!^j5~S0lZH3p{V>VRod--L%oksVT$;$0y*NQK3M*a#;@O#&rlNHt|{4+~X4%B@579Ci^EH5G#7PpX%ZeQ3GBImflt9LS{?P1St ztZ7x&K+v4qynSh@el_ptdAFQ@>hd=5-^xE~@IO35{Nh0+SUgF~EAyHa#4tDS-^*{UjMRpNQQ}l^1T<}i#sW3?8fhKj{(w^p zTZhLr|GsAQxulPxk*dUs2Zs>af2{H*E^UVOV%!l{3-z!|>@WN~#$r z4Ebafh9zGp%t*SI%N?rrsCt_!y2nULkdgwir1&DqH#JE#xmMVtwV|uFB(Ei_tM^d3 z2GXfzGuL!2{cF39yX)?_zU%nAdTqa5>{ovw2HEmIPkERBfOe$&pZq0NwD+b%3-?j! S3!l)xn)2FdxVU)kS^o#U2zliI diff --git a/jobspec/steps/__init__.py b/jobspec/steps/__init__.py index ac6cb44..ae636af 100644 --- a/jobspec/steps/__init__.py +++ b/jobspec/steps/__init__.py @@ -1 +1,2 @@ +from .empty import EmptyStep from .fileio import WriterStep diff --git a/jobspec/steps/__pycache__/__init__.cpython-310.pyc b/jobspec/steps/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index 29353eba2c5b8f3ef36c7e12f8d07f718bb03abe..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 179 zcmd1j<>g`k0>0iKsd7O2F^Gc(44TX@8G*u@ zjJLSLi!w`6i-Jp13;Z;hZn33h=A>rk7lCxIWGG?*Qefhju6}ubQFd`bVsdJ+epY@` raY1S_oGu3HF4m8a&&@qe*a63Lqh(<#cE@) zcm|@kp!?k`WbT0rEC)gDk^@J~aFa{zJ zA&jAnL?qU($R!=EiCC$IZ4UfgPt1bF*Utyn1K0fBG?;tNne4@TmPC+ z0%H$H;X`P>=DGVo8r)s(n3^xeXV`ZFrdJ%ot_kU$RLpzgMbKb}c>4CWXk1 zw9#^wFzahMIk4W5Rz1l052E@oyi0b@Ds`-9SuXX?!hbhnHhKqv9?Z^dl%|+Tnnnm4 zo(}wg#`HdoSf4cyFm|laU}=|Auvs<_CIN`U{3T50BKk-4l&VJtt_aK- zVD8Q7Yx2f#aX$i^Cs2o)s83dG{2QASCYY7aBx(VX_ zdc}L;K5+#T0|i1Fs0}QR@M_x5_ONi?TY&qTUeJPEK%vxEa|#;&nnFSO1rxnFgK~aq zd=bpCm|;sG5jp)AoqlaWEC$qrQ%}e)Ir*@q;T+$UxcRj!=L>0S1tj8=syeP`NN}Y6 z%;>nY`6(;U9qBbcD<%`D;Hl)L#OhTFUWY=ruK|)3?(^dO(E3PK8!pzanoYRpyz^%J zzib@XFt17@OVd1D-CZaLNl)QjR}Zj5Xe}E{)6;+<5>K=Gf$UnzMg%q*TOjR?f%(*Lu`T%$FZCmLae-T6m;(mbNmbDFw9%gpuG7K-n{^3YD$4h0C^LS5W0L%#NL8vXhyq?b)SK zA|$ro0)aM1zLKw;`V9~V;5|>0jYocd_WSGiex9n=>kt^f{PDN^DInx8-0Thpn@2G9 zLm+|(T96qXQc8p;{KA|0Lx0V)A#-QJFnCG?6TxdD0!gp^VJQ4l(huKIaM7nWJ~gsF zO0|3gdvcN?-5nlm9>LU~1CeA%1sQsR4t?Q42!Bk6LF*nIha!4ShLMONz@F{=sB$9} z`k$)~hw%-+az|Xl8K0z@>!z+NW%#I4PL`KrKFSNJJI>&2l7kK^r-iO~t*XmhNX=7z z4jDVE%1q>~E$Zrlv$cT>W@GeMrZbu0jW@#wCWQGIrhWosL8jz}3c8|8YCPEaH*7_& z>C#(J_3?r(yi2M+F@b>nZ$$W@5yHxT53cF2R0O9`U##o=xr@GY-Sz@XzHD;ka^EEM zB~MGiXQ|01n|t%TZS!aV1DPuADq^Sd0XBIGD9rEoP!`TV0c%6VgWSs8R= zQhl-Ub<&S)RMjT0N^SkIG?o>)Ha1$vvg|^>(zf$MS~T)lsY+RfCF}c^X<3Y13O2wa z{U=d(AfbQ0IJm0Rg|5>~>Vs)@u4|cXSJ#IFyl7qx>Q^?tOp9EkM!t>pMv!;ezTfrl z)A+ynySG3YZ1gh3-DbqVz;htaVCqXCJI!Lo!&X6q*veZ5Cb$Vf8ZFoZcoYxFB3OBJ zqwlY%U=3M@i%@-Kyqg%LbQuA+w+I%|v}5AwTSo+F*U{b)LF=97F1KBA;NrTPNuE`u zk)`3L0$^^M+X5J2fVDVmk-xRxXdObQv`LF%peI|Q{8?s=NzV&;$c<{`R?xN~TF0}U zTg9`J{=SN!G74*Dy-HhFr)FZ=w8~2xG$t>gzb@$ngDdO-G}olY2H9j*2>^phV3jsH zO};yRas1dZFP?t?yk(x9ecO-KKH3FfHq3*&~eTD?1?GQ{w;qMe5m+k_I38O3sY3y}-P6BU+;crGE6m4$} o@qsI)x({dS10?SuK~Fp0T`+gs_+gvz*ZA{kflsg`kf~FNeQvHDRV-N=!FabFZKwK;aBvKes7;_kM8KamO8B&?j8Kam}n1UHJ znO`yjB{dmuafcKo<`t*q7v-iF`DwD;VoA#>ttbK+af{y#$V8I9#a@t8nx2_gypo}a z4X6N2{4&-r&o9a@E=WvHE!NMq4+2`(_ zlg7G8h*s)T`2{>U5-)w>H}E_5l_%bKtaxH(?Km!Qr=6Ldot>GTnVl^fjgY{1>$g{O z52!zIvidQY+<~Eg2Ox-`If>_(#?;Q8*s<7+U5i=FEcRm0Vn6mR4&uOK9&?Lp!!Qo% zTvvS<#gV0*yfJLXO-sA^+OQS3D3R`LU9dA6ZwQ)R`kDw&_|J&&W$URMZ^Aqf9Ohg$ zVZJ3dXP0F-c;k%1PNcVx67&J-)c%B1kev=S{6Q(lcT+8aUfWgaNbeVFC>64%(V*DV zW7)g0pHH6jaLelFE++atASA&sKnN%t-+`mMvG<(pQ^ADyjK}_S!blu|Y2VaW%uKI` zZTNN%WE*X?%L@#>Ub}~j_f$HR$Avmv73vgA$3n><1#Bo z)|P23-0jIcPZEWJn}Y^YEd&_Q%P=(F5Kb6n9;2I%<^QKB9FVz#!fgP{q99NdR5;HF z7KUJeJ>l76@bzSGn3b<(5mSMBzJ#GS00>%<0XZT%xKDlpJ+RZAes2o|tLI^wXCf_S zJ8jRkwKWuVH(Uzr9%QQP(O@u&bo)rZDIs7;ubrONgurG%TQPoNNn zS*r^RNJT}UoRBHhd7y-r~GSQUN9y*BP(4C?ZtpU5LQDi^da z({iFskf(bx&kFT6df*+WW$&P~X}G<|P%MV^it!1UlyZ!ex`uLYUzR2)?DazfAXmFT zZ?O+QG_8GA43m>|m?yeaG94O*6T`%0I969cfeNg(vC2lE#~Ovk6oQIJ=m5|j_@_!! zONB^4eI`sDH1*0tC2BGP|33a~oTu5S6WKzx@_ni5s2po@-v68dcy}mI)OBRpBSv^K zREh6_@vLt3DK1~M40{(`u=5G<^=$y0MU2xX4XN`ZcSGs}#oZ0sayRJ)Wv)*-b^i7H zuXqSjAUh2gR|r(@bbPgr=CB{AO9-|Qyoum0f(t(^Wm^{p?R^-U1HjhBorv2|cCHu> z4Pl%rE!7QJH4Ga7B(~`Hvr*qTg@))HDmLBhLZgZY$~W0 vIX2-VmBHiz(Ek?-!*9-?h}+m*?c2x+hcK`HkrPq;Lx;I7x3v|0(WL(X#u!zm diff --git a/jobspec/transformer/flux.py b/jobspec/transformer/flux.py index 920e28c..55dda0a 100644 --- a/jobspec/transformer/flux.py +++ b/jobspec/transformer/flux.py @@ -6,6 +6,7 @@ import shlex import subprocess import time +import uuid import yaml @@ -41,6 +42,108 @@ def __init__(self, *args, **kwargs): # Custom Flux steps - just write and register! +class stage(StepBase): + """ + A stage step uses flux filemap to stage across nodes + """ + + name = "stage" + + def run(self, stage, *args, **kwargs): + """ + Run the stage step = fall back to filename for now + """ + name = str(uuid.uuid4()) + filename = self.options["filename"] + cmd = ["flux", "filemap", "map", "--tags", name, "--directory", stage, filename] + utils.run_command(cmd, check_output=True) + + # Assume we send to all ranks besides where we've already written it + # This will likely fail if the filesystem is shared + cmd = [ + "flux", + "exec", + "--dir", + stage, + "-r", + "all", + "-x", + "0", + "flux", + "filemap", + "get", + "--tags", + name, + ] + utils.run_command(cmd, check_output=False) + + # Unmap to clear the memory map + cmd = ["flux", "filemap", "unmap", "--tags", name] + utils.run_command(cmd, check_output=True) + + +class batch(StepBase): + name = "batch" + + def run(self, stage, *args, **kwargs): + """ + Run the batch step + """ + slot = self.flatten_slot() + nodes = slot.get("node") + tasks = slot.get("core") + + # I'm pretty sure we need one of these + if not nodes and not tasks: + raise ValueError("slot is missing node or core, cannot direct to batch.") + + # I don't think batch has python bindings? + filename = self.options.get("filename") + cmd = ["flux", "batch"] + if nodes: + cmd += ["-N", str(nodes)] + if tasks: + cmd += ["-n", str(tasks)] + cmd.append(filename) + + # Would be nice if this was exposed as "from jobspec" + # https://github.com/flux-framework/flux-core/blob/master/src/bindings/python/flux/cli/batch.py#L109-L120 + with utils.workdir(stage): + res = utils.run_command(cmd, check_output=True) + + # 👀️ 👀️ 👀️ + jobid = res["message"].strip() + wait = self.options.get("wait") is True + if wait: + watch_job(handle, jobid) + return jobid + + +def watch_job(handle, jobid): + """ + Shared function to watch a job + """ + import flux.job + + if isinstance(jobid, str): + jobid = flux.job.JobID(jobid) + + print() + watcher = flux.job.watcher.JobWatcher( + handle, + progress=False, + jps=False, # show throughput with progress + log_events=False, + log_status=True, + labelio=False, + wait=True, + watch=True, + ) + watcher.start() + watcher.add_jobid(jobid) + handle.reactor_run() + + class submit(StepBase): name = "submit" @@ -82,23 +185,12 @@ def run(self, stage, *args, **kwargs): # 👀️ 👀️ 👀️ if wait: - print() - watcher = flux.job.watcher.JobWatcher( - handle, - progress=False, - jps=False, # show throughput with progress - log_events=False, - log_status=True, - labelio=False, - wait=True, - watch=True, - ) - watcher.start() - watcher.add_jobid(jobid) - handle.reactor_run() + watch_job(handle, jobid) return jobid.f58plain # A transformer can register shared steps, or custom steps Transformer.register_step(steps.WriterStep) +Transformer.register_step(batch) Transformer.register_step(submit) +Transformer.register_step(stage) diff --git a/jobspec/utils.py b/jobspec/utils.py index 89512e7..f975b63 100644 --- a/jobspec/utils.py +++ b/jobspec/utils.py @@ -1,6 +1,8 @@ import json import os +import subprocess import tempfile +from contextlib import contextmanager import yaml @@ -59,3 +61,39 @@ def write_yaml(obj, filename): """ with open(filename, "w") as fd: yaml.dump(obj, fd) + + +@contextmanager +def workdir(dirname): + """ + Provide context for a working directory, e.g., + + with workdir(name): + # do stuff + """ + here = os.getcwd() + os.chdir(dirname) + try: + yield + finally: + os.chdir(here) + + +def run_command(cmd, stream=False, check_output=False, return_code=0): + """ + use subprocess to send a command to the terminal. + + If check_output is True, check against an expected return code. + """ + stdout = subprocess.PIPE if not stream else None + output = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=stdout) + t = output.communicate()[0], output.returncode + output = {"message": t[0], "return_code": t[1]} + + if isinstance(output["message"], bytes): + output["message"] = output["message"].decode("utf-8") + + # Check the output and raise an error if not success + if check_output and t[1] != return_code: + raise ValueError(output["message"].strip()) + return output