Skip to content

Commit

Permalink
minor fixes + better log
Browse files Browse the repository at this point in the history
  • Loading branch information
manatlan committed Oct 5, 2023
1 parent 2ecd791 commit 92f105c
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 119 deletions.
2 changes: 2 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def clllll(o):
self <= Tag.button("inc integer",_onclick=inc_test_session)
self <= Tag.button("add list",_onclick=addd)
self <= Tag.button("clear",_onclick=clllll)
self <= Tag.button("X",_onclick=lambda o: self.exit(),_style="float:right")
self <= TagSession()

self+=Tag.li(Tag.a("t0",_href="/"))
Expand All @@ -58,6 +59,7 @@ def clllll(o):
self+=Tag.li(Tag.a("Other app",_href="/jo"))
self+=self.place


async def loop_timer(self):
while 1:
await asyncio.sleep(0.5)
Expand Down
252 changes: 138 additions & 114 deletions htagweb/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from htag import Tag
from htag.render import HRenderer

import logging
logger = logging.getLogger(__name__)


EVENT_SERVER="EVENT_SERVER"
Expand Down Expand Up @@ -50,7 +52,9 @@ def importClassFromFqn(fqn_norm:str) -> type:



##################################################################################
def process(uid,hid,event_response,event_interact,fqn,js,init,sesprovidername,force):
##################################################################################
#''''''''''''''''''''''''''''''''''''''''''''''''''''
if sesprovidername is None:
sesprovidername="MemDict"
Expand All @@ -59,150 +63,170 @@ def process(uid,hid,event_response,event_interact,fqn,js,init,sesprovidername,fo
#''''''''''''''''''''''''''''''''''''''''''''''''''''

pid = os.getpid()
async def loop():
with redys.v2.AClient() as bus:
try:
if os.getcwd() not in sys.path: sys.path.insert(0,os.getcwd())
klass=importClassFromFqn(fqn)
except Exception as e:
print(f">Process {pid} ERROR :",hid,e)
#TODO: do better here
assert await bus.publish(event_response,str(e))
return

RUNNING=True
def exit():
RUNNING=False
def log(*a):
txt=f">PID {pid} {hid}: %s" % (" ".join([str(i) for i in a]))
print(txt,flush=True,file=sys.stdout)
logger.info(txt)

session = FactorySession(uid)
async def loop():
RRR={"1":"1"} #TODO: find something better ;-)

styles=Tag.style("body.htagoff * {cursor:not-allowed !important;}")
def suicide():
log("suicide")
RRR.clear()

hr=HRenderer( klass ,js, init=init, exit_callback=exit, fullerror=True, statics=[styles,],session = session)
bus = redys.v2.AClient()
try:
if os.getcwd() not in sys.path: sys.path.insert(0,os.getcwd())
klass=importClassFromFqn(fqn)
except Exception as e:
log("importClassFromFqn -->",e)
#TODO: do better here
assert await bus.publish(event_response,str(e))
return

print(f">Process {pid} started with :",hid,init)
session = FactorySession(uid)

styles=Tag.style("body.htagoff * {cursor:not-allowed !important;}")

# subscribe for interaction
await bus.subscribe( event_interact )
hr=HRenderer( klass ,js, init=init, exit_callback=suicide, fullerror=True, statics=[styles,],session = session)

# publish the 1st rendering
assert await bus.publish(event_response,str(hr))
log(f"Start with params:",init)

# register tag.update feature
#======================================
async def update(actions):
try:
await bus.publish(event_response+"_update",actions)
except:
print("!!! concurrent write/read on redys !!!")
return True

hr.sendactions=update
#======================================
# subscribe for interaction
await bus.subscribe( event_interact )

while RUNNING:
params = await bus.get_event( event_interact )
if params is not None: # sometimes it's not a dict ?!? (bool ?!)
if params.get("cmd") == CMD_RENDER:
# just a false start, just need the current render
print(f">Process {pid} render {hid}")
hr.session = FactorySession(uid) # reload session
assert await bus.publish(event_response,str(hr))
else:
print(f">Process {pid} interact {hid}:")
#-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- UT
if params["oid"]=="ut": params["oid"]=id(hr.tag)
#-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-
# publish the 1st rendering
assert await bus.publish(event_response,str(hr))

actions = await hr.interact(**params)

# always save session after interaction
hr.session._save()

assert await bus.publish(event_response+"_interact",actions)
# register tag.update feature
#======================================
async def update(actions):
try:
r=await bus.publish(event_response+"_update",actions)
except:
log("!!! concurrent write/read on redys !!!")
r=False
return r

hr.sendactions=update
#======================================

while RRR:
params = await bus.get_event( event_interact )
if params is not None: # sometimes it's not a dict ?!? (bool ?!)
if params.get("cmd") == CMD_RENDER:
# just a false start, just need the current render
log("RERENDER")
hr.session = FactorySession(uid) # reload session
assert await bus.publish(event_response,str(hr))
else:
log("INTERACT")
#-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- UT
if params["oid"]=="ut": params["oid"]=id(hr.tag)
#-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-

actions = await hr.interact(**params)

# always save session after interaction
hr.session._save()

assert await bus.publish(event_response+"_interact",actions)

await asyncio.sleep(0.1)
await asyncio.sleep(0.1)

#consume all pending events
assert await bus.unsubscribe( event_interact )
#consume all pending events
assert await bus.unsubscribe( event_interact )

asyncio.run( loop() )
print(f">Process {pid} ended")
log("end")



##################################################################################
async def hrserver_orchestrator():
with redys.v2.AClient() as bus:
##################################################################################
bus=redys.v2.AClient()

# prevent multi orchestrators
if await bus.get("hrserver_orchestrator_running")==True:
print("hrserver_orchestrator is already running")
return
else:
print("hrserver_orchestrator started")
await bus.set("hrserver_orchestrator_running",True)

# register its main event
await bus.subscribe( EVENT_SERVER )

ps={}

def killall(ps:dict):
# try to send a EXIT CMD to all running ps
for hid,infos in ps.items():
ps[hid]["process"].kill()

while 1:
params = await bus.get_event( EVENT_SERVER )
if params is not None:
if params.get("cmd") == CMD_EXIT:
print(EVENT_SERVER, params.get("cmd") )
break
elif params.get("cmd") == "CLEAN":
print(EVENT_SERVER, params.get("cmd") )
killall(ps)
continue
elif params.get("cmd") == "PS":
print(EVENT_SERVER, params.get("cmd") )
from pprint import pprint
pprint(ps)
continue
def log(*a):
txt=f"-ORCHESTRATOR- %s" % (" ".join([str(i) for i in a]))
print(txt,flush=True,file=sys.stdout)
logger.info(txt)

hid=params["hid"]
key_init=str(params["init"])

if hid in ps and ps[hid]["process"].is_alive():
# process is already running
# prevent multiple orchestrators
if await bus.get("hrserver_orchestrator_running")==True:
log("already running")
return
else:
log("started")
await bus.set("hrserver_orchestrator_running",True)

if params["force"] or key_init != ps[hid]["key"]:
# kill itself because it's not the same init params, or force recreate
if params["force"]:
print("Recreate a new process (forced)",hid)
else:
print("Recreate a new process (qp changed)",hid)
ps[hid]["process"].kill()
# and recreate another one later
else:
# it's the same initialization process
# register its main event
await bus.subscribe( EVENT_SERVER )

ps={}

# so ask process to send back its render
assert await bus.publish(params["event_interact"],dict(cmd=CMD_RENDER))
continue
def killall(ps:dict):
# try to send a EXIT CMD to all running ps
for hid,infos in ps.items():
ps[hid]["process"].kill()

# create the process
p=multiprocessing.Process(target=process, args=[],kwargs=params)
p.start()
while 1:
params = await bus.get_event( EVENT_SERVER )
if params is not None:
if params.get("cmd") == CMD_EXIT:
log(EVENT_SERVER, params.get("cmd") )
break
elif params.get("cmd") == "CLEAN":
log(EVENT_SERVER, params.get("cmd") )
killall(ps)
continue
elif params.get("cmd") == "PS":
log(EVENT_SERVER, params.get("cmd") )
log(ps)
continue

hid=params["hid"]
key_init=str(params["init"])

if hid in ps and ps[hid]["process"].is_alive():
# process is already running

if params["force"] or key_init != ps[hid]["key"]:
# kill itself because it's not the same init params, or force recreate
if params["force"]:
log("Destroy/Recreate a new process (forced)",hid)
else:
log("Destroy/Recreate a new process (qp changed)",hid)
ps[hid]["process"].kill()
# and recreate another one later
else:
# it's the same initialization process

log("Reuse process",hid)
# so ask process to send back its render
assert await bus.publish(params["event_interact"],dict(cmd=CMD_RENDER))
continue
else:
log("Start a new process",hid)

# and save it in pool ps
ps[hid]=dict( process=p, key=key_init, event_interact=params["event_interact"])
# create the process
p=multiprocessing.Process(target=process, args=[],kwargs=params)
p.start()

await asyncio.sleep(0.1)
# and save it in pool ps
ps[hid]=dict( process=p, key=key_init, event_interact=params["event_interact"])

assert await bus.unsubscribe( EVENT_SERVER )
await asyncio.sleep(0.1)

killall(ps)
assert await bus.unsubscribe( EVENT_SERVER )

killall(ps)

print("hrserver_orchestrator stopped")
log("stopped")

async def wait_redys():
bus=redys.v2.AClient()
Expand Down
22 changes: 17 additions & 5 deletions htagweb/server/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
# https://github.com/manatlan/htagweb
# #############################################################################

import uuid,asyncio,time
import uuid,asyncio,time,sys
import redys
import redys.v2
from htagweb.server import EVENT_SERVER
import logging
logger = logging.getLogger(__name__)

TIMEOUT=2*60 # A interaction can take 2min max

TIMEOUT=20 # sec to wait answer from redys server #TODO: set better

class HrClient:
def __init__(self,uid:str,fqn:str,js:str=None,sesprovidername=None,recreate=False):
Expand All @@ -28,6 +31,12 @@ def __init__(self,uid:str,fqn:str,js:str=None,sesprovidername=None,recreate=Fals
self.event_response = f"response_{self.hid}"
self.event_interact = f"interact_{self.hid}"

def error(self, *a):
txt=f".HrClient {self.uid} {self.fqn}: %s" % (" ".join([str(i) for i in a]))
print(txt,flush=True,file=sys.stderr)
logger.error(txt)


async def _wait(self,event, s=TIMEOUT):
# wait for a response
t1=time.monotonic()
Expand All @@ -36,6 +45,7 @@ async def _wait(self,event, s=TIMEOUT):
if message is not None:
return message

self.error(f"Event TIMEOUT ({s}s) on {event} !!!")
return None

async def start(self,*a,**k) -> str:
Expand Down Expand Up @@ -78,10 +88,12 @@ async def interact(self,**params) -> dict:
await self.bus.subscribe( self.event_response+"_interact" )

# post the interaction
assert await self.bus.publish( self.event_interact, params )
if await self.bus.publish( self.event_interact, params ):
# wait actions
return await self._wait(self.event_response+"_interact") or {}
else:
self.error(f"Can't publish {self.event_interact} !!!")

# wait actions
return await self._wait(self.event_response+"_interact") or {}


@staticmethod
Expand Down

0 comments on commit 92f105c

Please sign in to comment.