Replies: 2 comments 2 replies
-
Hi! And thanks! It's always nice to hear people find it useful! Rocketry is designed to create tasks on runtime. The from rocketry.conds import daily
def do_things():
...
@app.task(daily, execution="main")
def create_do_things():
app.session.create_task(func=do_things, start_cond=daily) I used from rocketry.conds import daily
from rocketry.args import TerminationFlag
from rocketry.exc import TaskTerminationException
def do_things():
...
def has_new_task():
...
@app.task(daily, execution="thread", permanent_task=True)
def create_do_things(flag=TerminationFlag()):
while True:
if has_new_task():
app.session.create_task(func=do_things, start_cond=daily)
if flag.is_set():
raise TaskTerminationException("Task was terminated") Note that we used Also note that if you use the same function for multiple tasks, pass the Is this the direction you are looking for? |
Beta Was this translation helpful? Give feedback.
-
Also, as of 2.4.0, you could do a task queue and event-based system with Rocketry as well. However, there are not much tools for those (yet) but there is the from rocketry.conds import daily
from rocketry.args import TerminationFlag
from rocketry.exc import TaskTerminationException
import asyncio
@app.task()
def do_things(arg):
...
async def get_next_task():
"Infinite task queue (that waits when there is no tasks to run)"
while True:
await asyncio.sleep(2)
yield 'do_things', {"arg": "run specific arg"}
@app.task(daily, execution="async")
async def task_queue():
for task_name, params in get_next_task():
task = app.session[task_name]
task.run(**params) But of course there are no premade tools for such queues and events (yet). |
Beta Was this translation helpful? Give feedback.
-
Hi! First, thanks for Rocketry! I'm thinking about using Rocketry in a project that will need to run an event loop with that responds both to events and runs schedules tasks. Rocketry seems like it might be perfect for the latter. However, the project will also need to dynamically load plug-ins at run time. Looking through the docs & examples, I'm not sure if this is something Rocketry can do out of the box.
For example, a plug in author creates a
plugin1.py
as:and another plug-in author creates
plugin2.py
:I'd like to be able to load the plug-ins at run-time (and after the session is already running). I could have a task that continually monitors the plug-in folder for new plug-ins and uses
create_task
but does that work with the decorators? (e.g. could plug-in author use@app.task(daily...)
even though they don't have access to the app object that's already running?Just brainstorming at this point--open to any ideas about implementing such a system and whether using or extending Rocketry might be a good approach.
Beta Was this translation helpful? Give feedback.
All reactions