Task#

top.core.task.Task class.

class Task[source]#

Bases: object

A generic structure for tracking tasks across workers.

All fields are optional for maximum flexibility.

dataclasses_json library is used to automatically convert Python dataclasses structures to JSON and back.

__init__(task_id=None, task_name=None, host_name=None, process_id=None, thread_id=None, process_internal_id=None, processor_name=None, started_at=None, updated_at=None, ended_at=None, recorded_successfully=None, tags=None, command_line=None)#
Parameters:
Return type:

None

Methods

__init__([task_id, task_name, host_name, ...])

create_from_current_thread(task_id[, ...])

Create a task and assuming the processor is the current OS thread.

deserialise(blob)

Serialise using dataclasses_json

from_dict(kvs, *[, infer_missing])

from_json(s, *[, parse_float, parse_int, ...])

get_ago()

Get how long ago this task finished.

get_duration()

Get the duration of this task.

get_processor_tracking_id()

Get the key used in our tracking table (Redis) for this task.

schema(*[, infer_missing, only, exclude, ...])

serialise()

Serialise using dataclasS_json

to_dict([encode_json])

to_json(*[, skipkeys, ensure_ascii, ...])

Attributes

command_line

Application command line

ended_at

When this task was ended.

host_name

Host name on multiserver deployments.

process_id

OS process id that started this task.

process_internal_id

If the application provides further ids for the processes.

processor_name

Human readable of the processor name is available

recorded_successfully

Did this task success?

started_at

When this task was started.

tags

Generic tracking tags that can be associated with tasks.

task_id

Unique identified for this task.

task_name

method and path.

thread_id

OS thread that started this task.

updated_at

When this task was last updated.

task_id: Optional[Union[int, str]] = None#

Unique identified for this task.

E.g. web request id if available. Depends on the application. Can be int or str depending on the context.

task_name: Optional[str] = None#

method and path.

host_name: Optional[str] = None#

Host name on multiserver deployments.

E.g. the web server DNS name if multiple servers behind a load balancer.

process_id: Optional[int] = None#

OS process id that started this task.

thread_id: Optional[int] = None#

OS thread that started this task.

process_internal_id: Optional[str] = None#

If the application provides further ids for the processes.

E.g. connection id in PostgreSQL

processor_name: Optional[str] = None#

Human readable of the processor name is available

started_at: Optional[datetime] = None#

When this task was started.

UTC timestamp serialised as ISO 8601. Automatically filled by create_from_current_thread().

updated_at: Optional[datetime] = None#

When this task was last updated.

UTC timestamp serialised as ISO 8601.

ended_at: Optional[datetime] = None#

When this task was ended.

UTC timestamp serialised as ISO 8601. Automatically filled by top.core.tracker.Tracker.end_task().

recorded_successfully: Optional[bool] = None#

Did this task success?

None = we do not know yet. True = task received its end_task() call. False = task was cleaned up by monitor/timeout.

tags: Optional[dict] = None#

Generic tracking tags that can be associated with tasks.

Frameworks like OpenTelemetry and statsd support tagging sources and events. Usually these are used to detect the server production mode, deployed version, Kubernetes/Docker/other deployment information and such,

Here you can add any tags to the request. When top.core.tracker.Tracker.start_task() is called, the tracker specific tags are automatically applied here.

command_line: Optional[List[str]] = None#

Application command line

Space separated command that was used to start this process. Same as sys.argv.

get_processor_tracking_id()[source]#

Get the key used in our tracking table (Redis) for this task.

You can also override this function to have specific processor id scheme for your application.

Return type:

str

get_duration()[source]#

Get the duration of this task.

  • For completed tasks, return the actual duration

  • For active tasks, return how much time has passed since start

  • If start is missing, return None

Return type:

Optional[timedelta]

get_ago()[source]#

Get how long ago this task finished.

Returns:

Relative to now, or None if the task is not yet finished

Return type:

Optional[timedelta]

serialise()[source]#

Serialise using dataclasS_json

Return type:

bytes

classmethod deserialise(blob)[source]#

Serialise using dataclasses_json

Parameters:

blob (bytes) –

Return type:

Task

classmethod create_from_current_thread(task_id, processor_name=None, **kwargs)[source]#

Create a task and assuming the processor is the current OS thread.

Automatically labels the task to belong to the OS current process/thread. This will fill the following fields:

Parameters:
  • task_id (Union[str, int]) – Something unique to identify this task. If nothing else then use Python object hash.

  • processor_name (Optional[str]) – Framework specific name for this processor

  • kwargs – Passed to Task dataclass constructor.

Return type:

Task

__init__(task_id=None, task_name=None, host_name=None, process_id=None, thread_id=None, process_internal_id=None, processor_name=None, started_at=None, updated_at=None, ended_at=None, recorded_successfully=None, tags=None, command_line=None)#
Parameters:
Return type:

None