RedisTracker#

top.redis.tracker.RedisTracker class.

class RedisTracker[source]#

Bases: Tracker

Manage task status in Redis.

  • Tasks are stored as UTF-8 encoded JSON blobs in Redis

  • Internally uses redis-py

  • It is recommended to give its own database for task tracking; do not share other databases (even though this works in theory)

How to start tracking a (web) task:

task_id = id(req)
task = HTTPTask.create_from_current_thread(
    task_id,
    path=req.path,
    method=req.method,
    processor_name=str(worker),
    request_headers=req.headers,
)
tracker.start_task(task)

Example how to finish a (web) task:

task.status_code = resp.status_code
task.status_message = resp.status
task.response_headers = resp.headers

tracker.end_task(task)
__init__(redis, task_type, max_past_tasks=50)[source]#

Create a new emitter.

Parameters:
  • redis (StrictRedis) – Redis instance

  • task_type (Type[Task]) – Subclass of Task or Task class itself. Used to serialise/deserialise data to Redis.

  • max_past_tasks – How many tasks to keep in the past events log

Methods

__init__(redis, task_type[, max_past_tasks])

Create a new emitter.

clear()

Clear out whatever Redis database we are connected do.

create_default_instance(task_type[, ...])

Creates a connection to the Redis database.

end_task(task)

Finish exisiting task.

get_active_tasks()

Get currently active tasks.

get_completed_tasks()

Get the backlog of completed tasks.

start_task(task)

Start a new task.

update_task(task)

Attributes

processors_hkey

Redis HSET, maps processor -> current task

past_tasks_list

Redis LIST, maps past tasks, the latest task on right

task_updates_channel

Pub sub key for task updates

last_cleared_at_key

When we cleared the tracker last time

__init__(redis, task_type, max_past_tasks=50)[source]#

Create a new emitter.

Parameters:
  • redis (StrictRedis) – Redis instance

  • task_type (Type[Task]) – Subclass of Task or Task class itself. Used to serialise/deserialise data to Redis.

  • max_past_tasks – How many tasks to keep in the past events log

processors_hkey#

Redis HSET, maps processor -> current task

past_tasks_list#

Redis LIST, maps past tasks, the latest task on right

task_updates_channel#

Pub sub key for task updates

last_cleared_at_key#

When we cleared the tracker last time

clear()[source]#

Clear out whatever Redis database we are connected do.

  • Call at the restart of your system e.g. web server to clear any dangling processors

  • Call at the start of the tests when you need to clear the previous test database

start_task(task)[source]#

Start a new task.

Record a task started in the tracker backend.

Parameters:

task (Task) –

end_task(task)[source]#

Finish exisiting task.

Mark task completed.

Parameters:

task (Task) –

get_active_tasks()[source]#

Get currently active tasks.

Returns:

Map of (Processor id -> Task)

Return type:

Dict[str, Task]

get_completed_tasks()[source]#

Get the backlog of completed tasks.

Each backend can have N number of tasks in a ring buffer that are last completed.

Tasks are in the completion order. The most recently completed task is the first item in the list.

Returns:

List of past completed tasks that are in our past tasks buffer. The most recently completed task is the first item in the list.

Return type:

List[Task]

static create_default_instance(task_type, redis_url=None, redis_url_env='TOP_TRACKER_URL', max_past_tasks_env='TOP_MAX_COMPLETED_TASKS', max_past_tasks=None)[source]#

Creates a connection to the Redis database.

Parameters:
  • task_type (Type[Task]) – Subclass of Task or Task class itself. Used to serialise/deserialise data to Redis.

  • redis_url (Optional[str]) – Redis database string where to connect to

  • redis_url_env – This environment variable contains the Redis URL where to connect

  • max_past_tasks – How many tasks to keep in the past events log. If not given read max_past_tasks_env environment variable. If not available default to DEFAULT_MAX_COMPLETED_TASKS.

Return type:

RedisTracker