diff --git a/example.py b/example.py index c52ce67..0d22f61 100644 --- a/example.py +++ b/example.py @@ -1,8 +1,75 @@ from htag import Tag +import json,asyncio,time,os -class App(Tag.div): +""" +Complex htag's app to test: + + - a dynamic object (TagSession), which got a render method (new way) + - using tag.state (in session) + - using tag.update with a task in a loop + - can recreate itself (when init params change) + +""" + +class TagSession(Tag.div): #dynamic component (compliant htag >= 0.30) !!!! FIRST IN THE WORLD !!!! def init(self): - self+= "hello world" + self["style"]="border:1px solid black" + self.otitle = Tag.h3(_style="padding:0px;margin:0px;float:right") + self.orendu = Tag.pre(_style="padding:0px;margin:0px") + + # draw ui + self+=self.otitle + self+=self.orendu + + def render(self): + self.otitle.set( "Live Session" ) + self.orendu.set( json.dumps( dict(self.session.items()), indent=1)) + +class App(Tag.body): + imports=[TagSession] + statics=b"window.error=alert" + def init(self,v="0"): + self.place = Tag.div(js="console.log('I update myself')") + asyncio.ensure_future( self.loop_timer() ) + + def inc_test_session(o): + v=int(self.state.get("integer","0")) + v=v+1 + self.state["integer"]=v + def addd(o): + if "list" in self.state: + self.state["list"].append("x") # <= this workd because tag.state.save() called in interaction (before guess rendering) + else: + self.state["list"]=[] + def clllll(o): + self.state.clear() + + + self <= Tag.div(f"V{v} (pid:{os.getpid()})") + self <= Tag.button("inc integer",_onclick=inc_test_session) + self <= Tag.button("add list",_onclick=addd) + self <= Tag.button("clear",_onclick=clllll) + #~ self <= Tag.button("yield",_onclick=self.yielder) + self <= TagSession() + + self+=Tag.li(Tag.a("t0",_href="/")) + self+=Tag.li(Tag.a("t1",_href="/?v=1")) + self+=Tag.li(Tag.a("t2",_href="/?v=2")) + self+=self.place + + #~ async def yielder(self,o): + #~ for i in "ABCDEF": + #~ await asyncio.sleep(0.3) + #~ self+=i + + async def loop_timer(self): + while 1: + await asyncio.sleep(0.5) + self.place.set(time.time() ) + if not await self.place.update(): # update component using current websocket + # break if can't (<- good practice to kill this asyncio/loop) + break + # With Web http runner provided by htag #------------------------------------------------------ @@ -11,5 +78,13 @@ def init(self): # With htagweb.WebServer runner provided by htagweb #------------------------------------------------------ -from htagweb import AppServer -AppServer( App ).run(openBrowser=True) +from htagweb import SimpleServer,AppServer +app=AppServer( App ,parano=False,httponly=False) + +if __name__=="__main__": + #~ import logging + #~ logging.basicConfig(format='[%(levelname)-5s] %(name)s: %(message)s',level=logging.DEBUG) + #~ logging.getLogger("redys.servone").setLevel( logging.INFO ) + + + app.run(openBrowser=True) diff --git a/examples/main.py b/examples/main.py index 29a7d4c..bcd3fd1 100644 --- a/examples/main.py +++ b/examples/main.py @@ -1,6 +1,6 @@ import os,sys; sys.path.insert(0,os.path.realpath(os.path.dirname(os.path.dirname(__file__)))) -from htagweb import AppServer +from htagweb import SimpleServer,AppServer from starlette.responses import HTMLResponse import app1 @@ -43,6 +43,7 @@ async def handlePath(request): return HTMLResponse("404",404) +#app=SimpleServer() app=AppServer() app.add_route("/{path:path}", handlePath ) diff --git a/examples/oauth_example.py b/examples/oauth_example.py index 804c02b..63dc8e8 100644 --- a/examples/oauth_example.py +++ b/examples/oauth_example.py @@ -1,7 +1,7 @@ import os,sys; sys.path.insert(0,os.path.realpath(os.path.dirname(os.path.dirname(__file__)))) from htag import Tag -from htagweb import AppServer +from htagweb import SimpleServer from authlib.integrations.starlette_client import OAuth from starlette.responses import Response,RedirectResponse import time @@ -92,7 +92,7 @@ def render(self): # dynamic rendering #========================================= # IT WORKS FOR THE 3 runners of htagweb ;-) (should work with old webhttp/webws runners from htag too) -app=AppServer(App) +app=SimpleServer(App) app.add_route("/oauth_{action}", oauth_request_action ) diff --git a/htagweb/__init__.py b/htagweb/__init__.py index aef9a23..7942e18 100644 --- a/htagweb/__init__.py +++ b/htagweb/__init__.py @@ -7,10 +7,11 @@ # https://github.com/manatlan/htagweb # ############################################################################# -from .appserver import AppServer # a completly different beast, but compatible with ^^ +__version__ = "0.0.0" # auto updated + +from .appserver import AppServer +from .simpleserver import SimpleServer # a completly different beast, but compatible with ^^ from .htagserver import HtagServer # a completly different beast. -from .usot import Usot -__all__= ["AppServer"] +__all__= ["AppServer","SimpleServer"] -__version__ = "0.0.0" # auto updated diff --git a/htagweb/__main__.py b/htagweb/__main__.py index e464c76..f993ca1 100644 --- a/htagweb/__main__.py +++ b/htagweb/__main__.py @@ -13,9 +13,9 @@ if __name__=="__main__": if len(sys.argv)==1: - app=HtagServer(None, debug=True,ssl=False) + app=HtagServer(None, debug=True) elif len(sys.argv)==2: - app=HtagServer(sys.argv[1], debug=True,ssl=False) + app=HtagServer(sys.argv[1], debug=True) else: print("bad call (only one paremeter is possible (a fqn, ex: 'main:App'))") sys.exit(-1) diff --git a/htagweb/appserver.py b/htagweb/appserver.py index 9177415..5f3a3e5 100644 --- a/htagweb/appserver.py +++ b/htagweb/appserver.py @@ -9,40 +9,19 @@ # gunicorn -w 4 -k uvicorn.workers.UvicornWorker -b localhost:8000 --preload basic:app -""" -IDEM que htagweb.AppServer -- mais sans le SHARED MEMORY DICT (donc compat py3.7) ... grace au sesprovider ! -- fichier solo -- !!! utilise crypto de htagweb !!! - - - -This thing is completly new different beast, and doesn't work as all classic runners. - -It's a runner between "WebHTTP/WebServer" & "HtagServer" : the best of two worlds - - - It's fully compatible with WebHTTP/WebServer (provide a serve method) - - it use same techs as HTagServer (WS only, parano mode, simple/#workers, etc ...) - - and the SEO trouble is faked by a pre-fake-rendering (it create a hr on http, for seo ... and recreate a real one at WS connect) - -Like HTagServer, as lifespan of htag instances is completly changed : -htag instances should base their state on "self.root.state" only! -Because a F5 will always destroy/recreate the instance. -""" import os import sys import json import uuid -import pickle -import inspect import logging import uvicorn -import importlib -import contextlib +import asyncio +import hashlib +import multiprocessing from htag import Tag from starlette.applications import Starlette -from starlette.responses import HTMLResponse +from starlette.responses import HTMLResponse,PlainTextResponse from starlette.applications import Starlette from starlette.routing import Route,WebSocketRoute from starlette.endpoints import WebSocketEndpoint @@ -51,9 +30,12 @@ from starlette.datastructures import MutableHeaders from starlette.types import ASGIApp, Message, Receive, Scope, Send -from htag.render import HRenderer from htag.runners import commons -from . import crypto,usot +from . import crypto +import redys.v2 + +from htagweb.server import hrserver +from htagweb.server.client import HrClient logger = logging.getLogger(__name__) #################################################### @@ -80,32 +62,7 @@ def findfqn(x) -> str: return tagClass.__module__+"."+tagClass.__qualname__ -def getClass(fqn_norm:str) -> type: - assert ":" in fqn_norm - #--------------------------- fqn -> module, name - modulename,name = fqn_norm.split(":",1) - if modulename in sys.modules: - module=sys.modules[modulename] - try: - module=importlib.reload( module ) - except ModuleNotFoundError: - """ can't be (really) reloaded if the component is in the - same module as the instance htag server""" - pass - else: - module=importlib.import_module(modulename) - #--------------------------- - klass= getattr(module,name) - if not ( inspect.isclass(klass) and issubclass(klass,Tag) ): - raise Exception(f"'{fqn_norm}' is not a htag.Tag subclass") - - if not hasattr(klass,"imports"): - # if klass doesn't declare its imports - # we prefer to set them empty - # to avoid clutering - klass.imports=[] - return klass - +parano_seed = lambda uid: hashlib.md5(uid.encode()).hexdigest() class WebServerSession: # ASGI Middleware, for starlette def __init__(self, app:ASGIApp, https_only:bool = False, sesprovider:"async method(uid)"=None ) -> None: @@ -132,7 +89,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: #!!!!!!!!!!!!!!!!!!!!!!!!!!! scope["uid"] = uid - scope["session"] = await self.cbsesprovider(uid) + scope["session"] = self.cbsesprovider(uid) #!!!!!!!!!!!!!!!!!!!!!!!!!!! logger.debug("request for %s, scope=%s",uid,scope) @@ -154,92 +111,121 @@ async def send_wrapper(message: Message) -> None: await self.app(scope, receive, send_wrapper) -def fqn2hr(fqn:str,js:str,init,session,fullerror=False): # fqn is a "full qualified name", full ! +def normalize(fqn): if ":" not in fqn: # replace last "." by ":" fqn="".join( reversed("".join(reversed(fqn)).replace(".",":",1))) + return fqn - klass=getClass(fqn) - - styles=Tag.style("body.htagoff * {cursor:not-allowed !important;}") - return HRenderer( klass, js, init=init, session = session, fullerror=fullerror, statics=[styles,]) class HRSocket(WebSocketEndpoint): encoding = "text" - async def _sendback(self,ws, txt:str) -> bool: + async def _sendback(self,websocket, txt:str) -> bool: try: - if ws.app.parano: - txt = crypto.encrypt(txt.encode(),ws.app.parano) + if websocket.app.parano: + seed = parano_seed( websocket.scope["uid"]) + txt = crypto.encrypt(txt.encode(),seed) - await ws.send_text( txt ) + await websocket.send_text( txt ) return True except Exception as e: logger.error("Can't send to socket, error: %s",e) return False + async def loop_tag_update(self, event, websocket): + #TODO: there is trouble here sometimes ... to fix ! + with redys.v2.AClient() as bus: + await bus.subscribe(event) + + ok=True + while ok: + actions = await bus.get_event( event ) + if actions is not None: + ok=await self._sendback(websocket,json.dumps(actions)) + await asyncio.sleep(0.1) + async def on_connect(self, websocket): + #====================================================== get the event fqn=websocket.path_params.get("fqn","") + uid=websocket.scope["uid"] + event=HrClient(uid,fqn).event_response+"_update" + #====================================================== await websocket.accept() - js=""" -// rewrite the onmessage of the _WS_ to interpret json action now ! -_WS_.onmessage = async function(e) { - let actions = await _read_(e.data) - action(actions) -} -// declare the interact js method to communicate thru the WS -async function interact( o ) { - _WS_.send( await _write_(JSON.stringify(o)) ); -} + # add the loop to tag.update feature + asyncio.ensure_future(self.loop_tag_update(event,websocket)) -console.log("started") -""" + async def on_receive(self, websocket, data): + fqn=websocket.path_params.get("fqn","") + uid=websocket.scope["uid"] - try: - hr=fqn2hr(fqn,js,commons.url2ak(str(websocket.url)),websocket.session,fullerror=websocket.app.debug) - except Exception as e: - await self._sendback( websocket, str(e) ) - await websocket.close() - return + if websocket.app.parano: + data = crypto.decrypt(data.encode(),parano_seed( uid )).decode() + data=json.loads(data) - self.hr=hr + p=HrClient(uid,fqn) - # send back the full rendering (1st onmessage after js connection) - await self._sendback( websocket, str(self.hr) ) + actions=await p.interact( oid=data["id"], method_name=data["method"], args=data["args"], kargs=data["kargs"], event=data.get("event") ) - # register the hr.sendactions, for tag.update feature - self.hr.sendactions=lambda actions: self._sendback(websocket,json.dumps(actions)) + await self._sendback( websocket, json.dumps(actions) ) - async def on_receive(self, websocket, data): - if websocket.app.parano: - data = crypto.decrypt(data.encode(),websocket.app.parano).decode() + async def on_disconnect(self, websocket, close_code): + #====================================================== get the event + fqn=websocket.path_params.get("fqn","") + uid=websocket.scope["uid"] + event=HrClient(uid,fqn).event_response+"_update" + #====================================================== - data=json.loads(data) + with redys.v2.AClient() as bus: + await bus.unsubscribe(event) - #=================================== for UT only - if data["id"]=="ut": - data["id"]=id(self.hr.tag) - #=================================== - actions = await self.hr.interact(data["id"],data["method"],data["args"],data["kargs"],data.get("event")) - await self._sendback( websocket, json.dumps(actions) ) +def processHrServer(): + asyncio.run( hrserver() ) + + +async def lifespan(app): + process_hrserver=multiprocessing.Process(target=processHrServer) + process_hrserver.start() + yield + process_hrserver.terminate() - async def on_disconnect(self, websocket, close_code): - del self.hr class AppServer(Starlette): - def __init__(self,obj:"htag.Tag class|fqn|None"=None, debug:bool=True,ssl:bool=False,parano:bool=False,sesprovider:"htagweb.sessions.create*|None"=None): + def __init__(self, + obj:"htag.Tag class|fqn|None"=None, + debug:bool=True, + ssl:bool=False, + parano:bool=False, + httponly:bool=False, + sesprovider:"sessions.MemDict|sessions.FileDict|sessions.FilePersistentDict|None"=None, + ): self.ssl=ssl - self.parano = str(uuid.uuid4()) if parano else None + self.parano=parano + self.httponly=httponly + if sesprovider is None: - sesprovider = sessions.createFile + self.sesprovider = sessions.MemDict + else: + self.sesprovider = sesprovider + + print("Session with:",self.sesprovider.__name__) + ################################################################### + + if httponly: + routes=[Route("/_/{fqn}", self.HRHttp, methods=["POST"])] + else: + routes=[WebSocketRoute("/_/{fqn}", HRSocket)] + + ################################################################# Starlette.__init__( self, debug=debug, - routes=[WebSocketRoute("/_/{fqn}", HRSocket)], - middleware=[Middleware(WebServerSession,https_only=ssl,sesprovider=sesprovider)], + routes=routes, + middleware=[Middleware(WebServerSession,https_only=ssl,sesprovider=self.sesprovider)], + lifespan=lifespan, ) if obj: @@ -248,88 +234,105 @@ async def handleHome(request): self.add_route( '/', handleHome ) async def serve(self, request, obj ) -> HTMLResponse: - fqn=findfqn(obj) - protocol = "wss" if self.ssl else "ws" + uid = request.scope["uid"] + fqn=normalize(findfqn(obj)) + if self.parano: - jsparano = crypto.JSCRYPTO - jsparano += f"\nvar _PARANO_='{self.parano}'\n" - jsparano += "\nasync function _read_(x) {return await decrypt(x,_PARANO_)}\n" - jsparano += "\nasync function _write_(x) {return await encrypt(x,_PARANO_)}\n" + seed = parano_seed( uid ) + + jstunnel = crypto.JSCRYPTO + jstunnel += f"\nvar _PARANO_='{seed}'\n" + jstunnel += "\nasync function _read_(x) {return await decrypt(x,_PARANO_)}\n" + jstunnel += "\nasync function _write_(x) {return await encrypt(x,_PARANO_)}\n" else: - jsparano = "" - jsparano += "\nasync function _read_(x) {return x}\n" - jsparano += "\nasync function _write_(x) {return x}\n" - #TODO: consider https://developer.chrome.com/blog/removing-document-write/ - - jsbootstrap=""" - %(jsparano)s - // instanciate the WEBSOCKET - let _WS_=null; - let retryms=500; - - function connect() { - _WS_= new WebSocket("%(protocol)s://"+location.host+"/_/%(fqn)s"+location.search); - _WS_.onopen=function(evt) { - console.log("** WS connected") - document.body.classList.remove("htagoff"); - retryms=500; - - _WS_.onmessage = async function(e) { - // when connected -> the full HTML page is returned, installed & start'ed !!! - - let html = await _read_(e.data); - html = html.replace("
- # - # - # - # - # loading - # - # """ % locals() - - # return HTMLResponse( bootstrapHtmlPage ) + jstunnel = "" + jstunnel += "\nasync function _read_(x) {return x}\n" + jstunnel += "\nasync function _write_(x) {return x}\n" + + + if self.httponly: + # interactions use HTTP POST + js = """ +%(jstunnel)s + +async function interact( o ) { + let body = await _write_(JSON.stringify(o)); + let req=await window.fetch("/_/%(fqn)s",{method:"POST", body: body}); + let actions=await req.text(); + action( await _read_(actions) ); +} + +window.addEventListener('DOMContentLoaded', start ); +""" % locals() + else: + # interactions use WS + protocol = "wss" if self.ssl else "ws" + + js = """ + %(jstunnel)s + + async function interact( o ) { + _WS_.send( await _write_(JSON.stringify(o)) ); + } + + // instanciate the WEBSOCKET + let _WS_=null; + let retryms=500; + + function connect() { + _WS_= new WebSocket("%(protocol)s://"+location.host+"/_/%(fqn)s"); + _WS_.onopen=function(evt) { + console.log("** WS connected") + document.body.classList.remove("htagoff"); + retryms=500; + start(); + + _WS_.onmessage = async function(e) { + let actions = await _read_(e.data) + action(actions) + }; + + } + + _WS_.onclose = function(evt) { + console.log("** WS disconnected, retry in (ms):",retryms); + document.body.classList.add("htagoff"); + + setTimeout( function() { + connect(); + retryms=retryms*2; + }, retryms); + }; + } + connect(); + + """ % locals() + + p = HrClient(uid,fqn,js,self.sesprovider.__name__) + + args,kargs = commons.url2ak(str(request.url)) + html=await p.start(*args,**kargs) + return HTMLResponse(html) + + async def HRHttp(self,request) -> PlainTextResponse: + uid = request.scope["uid"] + fqn = request.path_params.get("fqn","") + seed = parano_seed( uid ) + + p=HrClient(uid,fqn) + data = await request.body() + + if self.parano: + data = crypto.decrypt(data,seed).decode() + + data=json.loads(data) + actions=await p.interact( oid=data["id"], method_name=data["method"], args=data["args"], kargs=data["kargs"], event=data.get("event") ) + txt=json.dumps(actions) + + if self.parano: + txt = crypto.encrypt(txt.encode(),seed) + + return PlainTextResponse(txt) def run(self, host="0.0.0.0", port=8000, openBrowser=False): # localhost, by default !! if openBrowser: diff --git a/htagweb/htagserver.py b/htagweb/htagserver.py index 4ac9a58..76e85a3 100644 --- a/htagweb/htagserver.py +++ b/htagweb/htagserver.py @@ -10,7 +10,7 @@ # gunicorn -w 4 -k uvicorn.workers.UvicornWorker -b localhost:8000 --preload basic:app """ -This thing is AppServer, but with 2 majors behaviour +This thing is HtagServer, but with 2 majors behaviour - If "no klass"(None) is defined -> will hook "/" on IndexApp (a browser of folders/files) - every others routes -> will try to instanciate an htag app @@ -23,7 +23,8 @@ from starlette.responses import HTMLResponse,Response from htag import Tag -from .appserver import AppServer,getClass +from .simpleserver import SimpleServer +from .server import importClassFromFqn #################################################### class IndexApp(Tag.body): @@ -60,11 +61,11 @@ def init(self,path="."): #################################################### -class HtagServer(AppServer): +class HtagServer(SimpleServer): def __init__(self,obj:"htag.Tag class|fqn|None"=None, *a,**k): if obj is None: obj = IndexApp - AppServer.__init__(self,obj,*a,**k) + SimpleServer.__init__(self,obj,*a,**k) self.add_route('/{path}', self._serve) @@ -74,10 +75,10 @@ async def _serve(self, request) -> HTMLResponse: fqn_norm="".join( reversed("".join(reversed(fqn)).replace(".",":",1))) try: - klass=getClass(fqn_norm) + klass=importClassFromFqn(fqn_norm) except: try: - klass=getClass(fqn+":App") + klass=importClassFromFqn(fqn+":App") except ModuleNotFoundError: return HTMLResponse("Not Found (%s)" % fqn,404,media_type="text/plain") diff --git a/htagweb/server/__init__.py b/htagweb/server/__init__.py new file mode 100644 index 0000000..84ba67a --- /dev/null +++ b/htagweb/server/__init__.py @@ -0,0 +1,241 @@ +# -*- coding: utf-8 -*- +# ############################################################################# +# Copyright (C) 2023 manatlan manatlan[at]gmail(dot)com +# +# MIT licence +# +# https://github.com/manatlan/htagweb +# ############################################################################# + +import asyncio +import redys +import redys.v2 +import os,sys,importlib,inspect +import multiprocessing +from htag import Tag +from htag.render import HRenderer + + + +EVENT_SERVER="EVENT_SERVER" + +CMD_EXIT="EXIT" +CMD_RENDER="RENDER" + +def importClassFromFqn(fqn_norm:str) -> type: + assert ":" in fqn_norm + #--------------------------- fqn -> module, name + modulename,name = fqn_norm.split(":",1) + if modulename in sys.modules: + module=sys.modules[modulename] + try: + module=importlib.reload( module ) + except ModuleNotFoundError as e: + """ can't be (really) reloaded if the component is in the + same module as the instance htag server""" + print("*WARNING* can't force module reload:",e) + else: + module=importlib.import_module(modulename) + #--------------------------- + klass= getattr(module,name) + if not ( inspect.isclass(klass) and issubclass(klass,Tag) ): + raise Exception(f"'{fqn_norm}' is not a htag.Tag subclass") + + if not hasattr(klass,"imports"): + # if klass doesn't declare its imports + # we prefer to set them empty + # to avoid clutering + klass.imports=[] + return klass + + + +def process(uid,hid,event_response,event_interact,fqn,js,init,sesprovidername): + #'''''''''''''''''''''''''''''''''''''''''''''''''''' + if sesprovidername is None: + sesprovidername="MemDict" + import htagweb.sessions + FactorySession=getattr(htagweb.sessions,sesprovidername) + #'''''''''''''''''''''''''''''''''''''''''''''''''''' + + pid = os.getpid() + async def loop(): + with redys.v2.AClient() as bus: + try: + if os.getcwd() not in sys.path: sys.path.insert(0,os.getcwd()) + klass=importClassFromFqn(fqn) + except Exception as e: + print(f">Process {pid} ERROR :",hid,e) + #TODO: do better here + assert await bus.publish(event_response,str(e)) + return + + RUNNING=True + def exit(): + RUNNING=False + + session = FactorySession(uid) + + styles=Tag.style("body.htagoff * {cursor:not-allowed !important;}") + + hr=HRenderer( klass ,js, init=init, exit_callback=exit, fullerror=True, statics=[styles,],session = session) + + print(f">Process {pid} started with :",hid,init) + + + # subscribe for interaction + await bus.subscribe( event_interact ) + + # publish the 1st rendering + assert await bus.publish(event_response,str(hr)) + + # register tag.update feature + #====================================== + async def update(actions): + try: + await bus.publish(event_response+"_update",actions) + except: + print("!!! concurrent write/read on redys !!!") + return True + + hr.sendactions=update + #====================================== + + while RUNNING: + params = await bus.get_event( event_interact ) + if params is not None: # sometimes it's not a dict ?!? (bool ?!) + if params.get("cmd") == CMD_RENDER: + # just a false start, just need the current render + print(f">Process {pid} render {hid}") + assert await bus.publish(event_response,str(hr)) + else: + print(f">Process {pid} interact {hid}:") + #-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- UT + if params["oid"]=="ut": params["oid"]=id(hr.tag) + #-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- + + actions = await hr.interact(**params) + + assert await bus.publish(event_response+"_interact",actions) + + await asyncio.sleep(0.1) + + #consume all pending events + assert await bus.unsubscribe( event_interact ) + + asyncio.run( loop() ) + print(f">Process {pid} ended") + +async def hrserver_orchestrator(): + with redys.v2.AClient() as bus: + + # prevent multi orchestrators + if await bus.get("hrserver_orchestrator_running")==True: + print("hrserver_orchestrator is already running") + return + else: + print("hrserver_orchestrator started") + await bus.set("hrserver_orchestrator_running",True) + + # register its main event + await bus.subscribe( EVENT_SERVER ) + + ps={} + + def killall(ps:dict): + # try to send a EXIT CMD to all running ps + for hid,infos in ps.items(): + ps[hid]["process"].kill() + + while 1: + params = await bus.get_event( EVENT_SERVER ) + if params is not None: + if params.get("cmd") == CMD_EXIT: + print(EVENT_SERVER, params.get("cmd") ) + break + elif params.get("cmd") == "CLEAN": + print(EVENT_SERVER, params.get("cmd") ) + killall(ps) + continue + elif params.get("cmd") == "PS": + print(EVENT_SERVER, params.get("cmd") ) + from pprint import pprint + pprint(ps) + continue + + hid=params["hid"] + key_init=str(params["init"]) + + if hid in ps and ps[hid]["process"].is_alive(): + # process is already running + + if key_init == ps[hid]["key"]: + # it's the same initialization process + + # so ask process to send back its render + assert await bus.publish(params["event_interact"],dict(cmd=CMD_RENDER)) + continue + else: + # kill itself because it's not the same init params + print("Reload a new process",hid) + ps[hid]["process"].kill() + + # and recreate another one later + + # create the process + p=multiprocessing.Process(target=process, args=[],kwargs=params) + p.start() + + # and save it in pool ps + ps[hid]=dict( process=p, key=key_init, event_interact=params["event_interact"]) + + await asyncio.sleep(0.1) + + assert await bus.unsubscribe( EVENT_SERVER ) + + killall(ps) + + + print("hrserver_orchestrator stopped") + +async def wait_redys(): + bus=redys.v2.AClient() + while 1: + try: + if await bus.ping()=="pong": + break + except: + pass + await asyncio.sleep(0.1) + +async def wait_hrserver(): + bus=redys.v2.AClient() + while 1: + try: + if await bus.get("hrserver_orchestrator_running"): + break + except Exception as e: + print(e) + await asyncio.sleep(0.5) + + +async def kill_hrserver(): + bus=redys.v2.AClient() + await bus.publish( EVENT_SERVER, dict(cmd=CMD_EXIT) ) # kill orchestrator loop + + +async def hrserver(): + s=redys.v2.Server() + s.start() + + await wait_redys() + + await hrserver_orchestrator() + + s.stop() + + + + +if __name__=="__main__": + asyncio.run( hrserver() ) diff --git a/htagweb/server/client.py b/htagweb/server/client.py new file mode 100644 index 0000000..b7dff15 --- /dev/null +++ b/htagweb/server/client.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# ############################################################################# +# Copyright (C) 2023 manatlan manatlan[at]gmail(dot)com +# +# MIT licence +# +# https://github.com/manatlan/htagweb +# ############################################################################# + +import uuid,asyncio,time +import redys +import redys.v2 +from htagweb.server import EVENT_SERVER + +TIMEOUT=20 # sec to wait answer from redys server #TODO: set better + +class HrClient: + def __init__(self,uid:str,fqn:str,js:str=None,sesprovidername=None): + """ !!!!!!!!!!!!!!!!!!!! if js|sesprovidername is None : can't do a start() !!!!!!!!!!!!!!!!!!!!!!""" + self.uid=uid + self.fqn=fqn + self.js=js + self.bus = redys.v2.AClient() + self.sesprovidername=sesprovidername + + self.hid=f"{uid}_{fqn}" + self.event_response = f"response_{self.hid}" + self.event_interact = f"interact_{self.hid}" + + async def _wait(self,event, s=TIMEOUT): + # wait for a response + t1=time.monotonic() + while time.monotonic() - t1 < s: + message = await self.bus.get_event( event ) + if message is not None: + return message + + return None + + async def start(self,*a,**k) -> str: + """ Start the defined app with this params (a,k) + (dialog with server event) + """ + assert self.js, "You should define the js in HrPilot() !!!!!!" + + # subscribe for response + await self.bus.subscribe( self.event_response ) + + # start the process app + assert await self.bus.publish( EVENT_SERVER , dict( + uid=self.uid, + hid=self.hid, + event_response=self.event_response, + event_interact=self.event_interact, + fqn=self.fqn, + js=self.js, + init= (a,k), + sesprovidername=self.sesprovidername, + )) + + # wait 1st rendering + return await self._wait(self.event_response) or "?!" + + # async def kill(self): + # """ Kill the process + # (dialog with process event) + # """ + # assert await self.bus.publish( self.event_interact, dict(cmd="EXIT") ) + + + async def interact(self,**params) -> dict: + """ return htag'actions or None (if process doesn't answer, after timeout) + (dialog with process event) + """ + # subscribe for response + await self.bus.subscribe( self.event_response+"_interact" ) + + # post the interaction + assert await self.bus.publish( self.event_interact, params ) + + # wait actions + return await self._wait(self.event_response+"_interact") or {} + + + @staticmethod + async def list(): + """ SERVER COMMAND + (dialog with server event) + """ + with redys.v2.AClient() as bus: + assert await bus.publish( EVENT_SERVER, dict(cmd="PS") ) + + @staticmethod + async def clean(): + """ SERVER COMMAND + (dialog with server event) + """ + with redys.v2.AClient() as bus: + assert await bus.publish( EVENT_SERVER, dict(cmd="CLEAN") ) + + +async def main(): + uid ="u1" + p=HrClient(uid,"obj:App","//") + #~ html=await p.start() + #~ print(html) + + #~ actions=await p.interact( oid="ut", method_name="doit", args=[], kargs={}, event={} ) + #~ print(actions) + + await p.kill() + await p.kill() + await p.kill() + #~ await p.kill() + #~ await p.kill() + #~ await HrPilot.list() + #~ await HrPilot.clean() + +if __name__=="__main__": + asyncio.run( main() ) diff --git a/htagweb/sessions/__init__.py b/htagweb/sessions/__init__.py index f1235d2..9c1dbd4 100644 --- a/htagweb/sessions/__init__.py +++ b/htagweb/sessions/__init__.py @@ -6,21 +6,8 @@ # # https://github.com/manatlan/htagweb # ############################################################################# +from .file import FileDict +from .file import FilePersistentDict +from .mem import MemDict -async def createFile(uid): - from . import file - return await file.create(uid) - -async def createFilePersistent(uid): # <- persistent after server reboot - from . import file - return await file.create(uid,True) - -async def createShm(uid): - from . import shm - return await shm.create(uid) - -async def createMem(uid): - from . import memory - return await memory.create(uid) - -__all__= ["createFile","createFilePersistent","createShm","createMem"] \ No newline at end of file +__all__= ["FileDict","FilePersistentDict","MemDict"] \ No newline at end of file diff --git a/htagweb/sessions/file.py b/htagweb/sessions/file.py index c5a43d5..571239d 100644 --- a/htagweb/sessions/file.py +++ b/htagweb/sessions/file.py @@ -12,7 +12,7 @@ class FileDict: # default """ mimic a dict (with minimal methods), unique source of truth, based on FS""" - def __init__(self,uid:str,persistent:bool): + def __init__(self,uid:str,persistent:bool=False): self._uid=uid if persistent: name="" @@ -61,5 +61,6 @@ def clear(self): if os.path.isfile(self._file): os.unlink(self._file) -async def create(uid,persistent=False) -> FileDict: - return FileDict(uid,persistent) +class FilePersistentDict(FileDict): # default + def __init__(self,uid): + FileDict.__init__(self,uid,persistent=True) diff --git a/htagweb/sessions/mem.py b/htagweb/sessions/mem.py new file mode 100644 index 0000000..8d9abaf --- /dev/null +++ b/htagweb/sessions/mem.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +# ############################################################################# +# Copyright (C) 2023 manatlan manatlan[at]gmail(dot)com +# +# MIT licence +# +# https://github.com/manatlan/htagweb +# ############################################################################# + +import os,pickle,tempfile +import redys.v2 + +class MemDict: # default + """ mimic a dict (with minimal methods), unique source of truth, based on redys.v2""" + def __init__(self,uid:str): + self._uid=uid + self._bus=redys.v2.Client() + self._d=self._bus.get(self._uid) or {} + + def __len__(self): + return len(self._d.keys()) + + def __contains__(self,key): + return key in self._d.keys() + + def items(self): + return self._d.items() + + def get(self,k:str,default=None): + return self._d.get(k,default) + + def __getitem__(self,k:str): + return self._d[k] + + def __delitem__(self,k:str): + """ save session """ + del self._d[k] + self._bus.set(self._uid, self._d) + + def __setitem__(self,k:str,v): + """ save session """ + self._d[k]=v + self._bus.set(self._uid, self._d) + + def clear(self): + """ save session """ + self._d.clear() + self._bus.delete(self._uid) + diff --git a/htagweb/sessions/memory.py b/htagweb/sessions/memory.py deleted file mode 100644 index a923554..0000000 --- a/htagweb/sessions/memory.py +++ /dev/null @@ -1,67 +0,0 @@ -# -*- coding: utf-8 -*- -# ############################################################################# -# Copyright (C) 2023 manatlan manatlan[at]gmail(dot)com -# -# MIT licence -# -# https://github.com/manatlan/htagweb -# ############################################################################# -from ..usot import Usot - - -class ServerMemDict: # proxy between app and ServerUnique - def __init__(self,uid,dico:dict): - self._uid=uid - self._dico=dico - - def __len__(self): - return len(self._dico.keys()) - - def __contains__(self,key): - return key in self._dico.keys() - - def items(self): - return self._dico.items() - - def get(self,k:str,default=None): # could be inplemented in SessionMem - return self._dico.get(k,default) - - def __getitem__(self,k:str): # could be inplemented in SessionMem - return self._dico[k] - - def __setitem__(self,k:str,v): # could be inplemented in SessionMem - self._dico[k]=v - PX.clientsync.set( self._uid, self._dico) - - def __delitem__(self,k:str): - del self._dico[k] - PX.clientsync.set( self._uid, self._dico) - - def clear(self): - self._dico.clear() - PX.clientsync.set( self._uid, {}) - - def __repr__(self): - return f"