Source code for top.redis.tracker

"""Redis based task storage backend."""

import datetime
import os
from typing import Dict, List, Type, Optional

from redis import StrictRedis, ConnectionPool

from top.core.tracker import Tracker
from top.core.task import Task


DEFAULT_MAX_COMPLETED_TASKS = 50


[docs]class RedisTracker(Tracker): """Manage task status in Redis. - Tasks are stored as UTF-8 encoded JSON blobs in Redis - Internally uses `redis-py <https://redis-py.readthedocs.io/en/stable/index.html>`_ - It is recommended to give its own database for task tracking; do not share other databases (even though this works in theory) How to start tracking a (web) task: .. code-block:: python task_id = id(req) task = HTTPTask.create_from_current_thread( task_id, path=req.path, method=req.method, processor_name=str(worker), request_headers=req.headers, ) tracker.start_task(task) Example how to finish a (web) task: .. code-block:: python task.status_code = resp.status_code task.status_message = resp.status task.response_headers = resp.headers tracker.end_task(task) """
[docs] def __init__(self, redis: StrictRedis, task_type: Type[Task], max_past_tasks=50): """Create a new emitter. :param redis: Redis instance :param task_type: Subclass of Task or Task class itself. Used to serialise/deserialise data to Redis. :param max_past_tasks: How many tasks to keep in the past events log """ self.redis = redis self.task_type = task_type self.max_past_tasks = max_past_tasks #: Redis HSET, maps processor -> current task self.processors_hkey = "processors" #: Redis LIST, maps past tasks, the latest task on right self.past_tasks_list = "past_tasks" #: Pub sub key for task updates self.task_updates_channel = "task_updates" #: When we cleared the tracker last time self.last_cleared_at_key = "last_cleared_at"
[docs] def clear(self): """Clear out whatever Redis database we are connected do. - Call at the restart of your system e.g. web server to clear any dangling processors - Call at the start of the tests when you need to clear the previous test database """ for key in (self.processors_hkey, self.past_tasks_list,): self.redis.delete(key) # Add a marker key about clearing the database self.redis.set(self.last_cleared_at_key, datetime.datetime.now(datetime.timezone.utc).isoformat())
def update_task(self, task: Task): task.updated_at = datetime.datetime.now(datetime.timezone.utc) processor_id = task.get_processor_tracking_id() data = task.serialise() # Update the task for the current processor self.redis.hset(self.processors_hkey, processor_id, data) # Do a notification on update self.redis.publish(self.task_updates_channel, data)
[docs] def start_task(self, task: Task): self.update_task(task)
[docs] def end_task(self, task: Task): task.ended_at = task.updated_at = datetime.datetime.now(datetime.timezone.utc) task.recorded_successfully = True data = task.serialise() # Delete task from the active processor processor_id = task.get_processor_tracking_id() self.redis.hdel(self.processors_hkey, processor_id) # Do a notification on update self.redis.publish(self.task_updates_channel, data) # Add task to the past buffer # https://stackoverflow.com/a/57776359/315168 self.redis.lpush(self.past_tasks_list, data) self.redis.ltrim(self.past_tasks_list, 0, self.max_past_tasks - 1)
[docs] def get_active_tasks(self) -> Dict[str, Task]: # Iterate over all hset keys and decode them as tasks. res = {} keys = self.redis.hkeys(self.processors_hkey) for processor_id in keys: task_blob = self.redis.hget(self.processors_hkey, processor_id) if task_blob is not None: # Because of race condition, we might have the key # gone missing while iterating res[processor_id] = self.task_type.deserialise(task_blob) return res
[docs] def get_completed_tasks(self) -> List[Task]: res = [] task_blobs = self.redis.lrange(self.past_tasks_list, 0, -1) for blob in task_blobs: res.append(self.task_type.deserialise(blob)) return res
[docs] @staticmethod def create_default_instance(task_type: Type[Task], redis_url: Optional[str] = None, redis_url_env="TOP_TRACKER_URL", max_past_tasks_env="TOP_MAX_COMPLETED_TASKS", max_past_tasks=None) -> "RedisTracker": """Creates a connection to the Redis database. :param task_type: Subclass of Task or Task class itself. Used to serialise/deserialise data to Redis. :param redis_url: Redis database string where to connect to :param redis_url_env: This environment variable contains the Redis URL where to connect :param max_past_tasks: How many tasks to keep in the past events log. If not given read `max_past_tasks_env` environment variable. If not available default to :py:data:`DEFAULT_MAX_COMPLETED_TASKS`. """ if not redis_url: redis_url = os.environ.get(redis_url_env) if not redis_url: raise RuntimeError(f"You must configure Redis connection URL with {redis_url_env} environment variable") if not max_past_tasks: max_past_tasks = os.environ.get(max_past_tasks_env) if not max_past_tasks: max_past_tasks = DEFAULT_MAX_COMPLETED_TASKS else: max_past_tasks = int(max_past_tasks) pool = ConnectionPool.from_url(redis_url) client = StrictRedis(connection_pool=pool) return RedisTracker(client, task_type, max_past_tasks)