Skip to content

main

ies_pi_worker.main

Task

A task is a function that's executed in background. It's defined by an id and a Future. When the task is initialized, started_at is populated with the current timestamp, and as soon as the future is done, finished_at is populated in the same way.

Source code in src/ies_pi_worker/main.py
class Task:
    """
    A task is a function that's executed in background.
    It's defined by an `id` and a `Future`.
    When the task is initialized, `started_at` is populated with the current
    timestamp, and as soon as the future is done, `finished_at` is populated in
    the same way.
    """    
    def __init__(self, id: str, future: Future):
        self.id = id
        self.future = future
        self.started_at = datetime.now()
        self.finished_at = None

        self.future.add_done_callback(self._finish())

    def finished(self) -> bool:
        """Returns whether the task has been finished"""
        return self.finished_at is not None

    def elapsed(self) -> timedelta:
        """Returns the amount of time it took for the task to complete"""
        return self.finished_at - self.started_at

    def to_dict(self, timeout=0.01) -> dict:
        """
        to_dict returns a dict representation of the task. Along with `id`, 
        `started_at` and `finished_at` it also returns the result or the error
        of the task, if it has finished.

        Args:
            timeout (float, optional): The number of seconds to wait for the 
                                       result if the task isn't finished yet.
                                       Defaults to 0.01

        Returns:
            dict: {
                "id": str,
                "started_at": datetime,
                "finished_at": datetime,
                "result": any | None,
                "error": str | None,
            }
        """        
        result = None
        error = None

        try:
            result = self.future.result(timeout)
        except(TimeoutError):
            pass
        except Exception as e:
            import traceback
            traceback.print_exception(e)
            error = str(e)

        return {
            "id": self.id,
            "started_at": self.started_at.strftime(date_format),
            "finished_at": None if self.finished_at is None else 
                self.finished_at.strftime(date_format),
            "result": result,
            "error": error,
        }

    def _finish(self):
        """Returns an unbounded method used to set `finished_at`"""        
        def finish(_):
            self.finished_at = datetime.now()
        return finish

elapsed

elapsed() -> timedelta

Returns the amount of time it took for the task to complete

Source code in src/ies_pi_worker/main.py
def elapsed(self) -> timedelta:
    """Returns the amount of time it took for the task to complete"""
    return self.finished_at - self.started_at

finished

finished() -> bool

Returns whether the task has been finished

Source code in src/ies_pi_worker/main.py
def finished(self) -> bool:
    """Returns whether the task has been finished"""
    return self.finished_at is not None

to_dict

to_dict(timeout=0.01) -> dict

to_dict returns a dict representation of the task. Along with id, started_at and finished_at it also returns the result or the error of the task, if it has finished.

Parameters:

Name Type Description Default
timeout float

The number of seconds to wait for the result if the task isn't finished yet. Defaults to 0.01

0.01

Returns:

Name Type Description
dict dict

{ "id": str, "started_at": datetime, "finished_at": datetime, "result": any | None, "error": str | None,

dict

}

Source code in src/ies_pi_worker/main.py
def to_dict(self, timeout=0.01) -> dict:
    """
    to_dict returns a dict representation of the task. Along with `id`, 
    `started_at` and `finished_at` it also returns the result or the error
    of the task, if it has finished.

    Args:
        timeout (float, optional): The number of seconds to wait for the 
                                   result if the task isn't finished yet.
                                   Defaults to 0.01

    Returns:
        dict: {
            "id": str,
            "started_at": datetime,
            "finished_at": datetime,
            "result": any | None,
            "error": str | None,
        }
    """        
    result = None
    error = None

    try:
        result = self.future.result(timeout)
    except(TimeoutError):
        pass
    except Exception as e:
        import traceback
        traceback.print_exception(e)
        error = str(e)

    return {
        "id": self.id,
        "started_at": self.started_at.strftime(date_format),
        "finished_at": None if self.finished_at is None else 
            self.finished_at.strftime(date_format),
        "result": result,
        "error": error,
    }

Worker

Worker is a wrapper around concurrent.futures that allows to run tasks and retrieve their results later.

It saves the tasks in memory, so they don't persist.

The tasks are short-lived, and after they're finished, they are available for a limited amount of time.

Source code in src/ies_pi_worker/main.py
class Worker:
    """
    Worker is a wrapper around `concurrent.futures` that allows to run tasks
    and retrieve their results later.

    It saves the tasks in memory, so they don't persist.

    The tasks are short-lived, and after they're finished, they are available 
    for a limited amount of time.
    """    
    def __init__(self, delete_tasks_after=default_delete_tasks_after):
        self.delete_tasks_after=delete_tasks_after
        self.lock = Lock()
        self.executor = ProcessPoolExecutor()
        self.tasks: dict[str, Task] = {}

    def shutdown(self):
        """shutdown cleans up the resources of the executor"""        
        self.executor.shutdown()

    def run(self, fn, *args, **kwargs) -> Task:
        """
        run allows to run any arbitrary task, much like the `submit` method of
        a `concurrent.futures` executor, but returns a `Task`, that will automatically 
        update with the result of the task.

        Args:
            fn (function): a callable to be executed as fn(*args, **kwargs).
                           its return values will be stored in the returned Task.

        Returns:
            Task: a Task that will automatically update with the result of the function.
        """        
        id = str(uuid4())
        future = self.executor.submit(fn, *args, **kwargs)

        with self.lock:
            task = Task(id, future)
            self.tasks[id] = task
            return task

    def get(self, id: str) -> Task:
        """
        get retrieves a Task by their id.

        Args:
            id (str): The id of the task.

        Returns:
            Task: The Task obtained
        """        
        self._cleanup()
        with self.lock:
            task = self.tasks[id]
            return task

    def _cleanup(self):
        """cleanup removes tasks older than self.delete_tasks_after"""        
        with self.lock:
            for id, task in self.tasks.copy().items():
                if not task.finished():
                    continue
                if task.elapsed() > self.delete_tasks_after:
                    self.tasks.pop(id)                                  

get

get(id: str) -> Task

get retrieves a Task by their id.

Parameters:

Name Type Description Default
id str

The id of the task.

required

Returns:

Name Type Description
Task Task

The Task obtained

Source code in src/ies_pi_worker/main.py
def get(self, id: str) -> Task:
    """
    get retrieves a Task by their id.

    Args:
        id (str): The id of the task.

    Returns:
        Task: The Task obtained
    """        
    self._cleanup()
    with self.lock:
        task = self.tasks[id]
        return task

run

run(fn, *args, **kwargs) -> Task

run allows to run any arbitrary task, much like the submit method of a concurrent.futures executor, but returns a Task, that will automatically update with the result of the task.

Parameters:

Name Type Description Default
fn function

a callable to be executed as fn(args, *kwargs). its return values will be stored in the returned Task.

required

Returns:

Name Type Description
Task Task

a Task that will automatically update with the result of the function.

Source code in src/ies_pi_worker/main.py
def run(self, fn, *args, **kwargs) -> Task:
    """
    run allows to run any arbitrary task, much like the `submit` method of
    a `concurrent.futures` executor, but returns a `Task`, that will automatically 
    update with the result of the task.

    Args:
        fn (function): a callable to be executed as fn(*args, **kwargs).
                       its return values will be stored in the returned Task.

    Returns:
        Task: a Task that will automatically update with the result of the function.
    """        
    id = str(uuid4())
    future = self.executor.submit(fn, *args, **kwargs)

    with self.lock:
        task = Task(id, future)
        self.tasks[id] = task
        return task

shutdown

shutdown()

shutdown cleans up the resources of the executor

Source code in src/ies_pi_worker/main.py
def shutdown(self):
    """shutdown cleans up the resources of the executor"""        
    self.executor.shutdown()