Source code for top.core.task

"""Task core structure definition."""

import datetime
import os
import socket
import sys
import threading
from dataclasses import dataclass, field
from typing import Optional, Union, List

from dataclasses_json import dataclass_json, config
from marshmallow import fields

from top.core.encoding import encode_date, decode_date


[docs]@dataclass_json @dataclass class Task: """A generic structure for tracking tasks across workers. All fields are optional for maximum flexibility. `dataclasses_json library <https://github.com/lidatong/dataclasses-json>`_ is used to automatically convert Python :py:mod:`dataclasses` structures to JSON and back. """ #: Unique identified for this task. #: #: E.g. web request id if available. #: Depends on the application. #: Can be int or str depending on the context. #: task_id: Optional[Union[int, str]] = None #: Human readable name for this task. #: #: From cron-like jobs this can be the cron job name. #: For :py:class:`~top.core.web.task.HTTPTask` #: this is not set, but tasks are identified by URI that consists of # protocol, host, #: :py:data:`~top.core.web.task.HTTPTask.method` and :py:data:`~top.core.web.task.HTTPTask.path`. task_name: Optional[str] = None #: Host name on multiserver deployments. #: #: E.g. the web server DNS name if multiple #: servers behind a load balancer. host_name: Optional[str] = None #: OS process id that started this task. #: process_id: Optional[int] = None #: OS thread that started this task. #: thread_id: Optional[int] = None #: If the application provides further ids for the processes. #: #: E.g. connection id in PostgreSQL process_internal_id: Optional[str] = None #: Human readable of the processor name is available processor_name: Optional[str] = None #: When this task was started. #: #: UTC timestamp serialised as ISO 8601. #: Automatically filled by :py:meth:`create_from_current_thread`. started_at: Optional[datetime.datetime] = field( default=None, metadata=config( encoder=encode_date, decoder=decode_date, mm_field=fields.DateTime(format='iso') ) ) #: When this task was last updated. #: #: UTC timestamp serialised as ISO 8601. updated_at: Optional[datetime.datetime] = field( default=None, metadata=config( encoder=encode_date, decoder=decode_date, mm_field=fields.DateTime(format='iso') ) ) #: When this task was ended. #: #: UTC timestamp serialised as ISO 8601. #: Automatically filled by :py:meth:`top.core.tracker.Tracker.end_task`. ended_at: Optional[datetime.datetime] = field( default=None, metadata=config( encoder=encode_date, decoder=decode_date, mm_field=fields.DateTime(format='iso') ) ) #: Did this task success? #: #: None = we do not know yet. #: True = task received its end_task() call. #: False = task was cleaned up by monitor/timeout. recorded_successfully: Optional[bool] = None #: Generic tracking tags that can be associated with tasks. #: #: Frameworks like `OpenTelemetry <https://opentelemetry.io/>`__ #: and statsd support tagging sources and events. #: Usually these are used to detect the server production mode, #: deployed version, #: Kubernetes/Docker/other deployment information and such, #: #: Here you can add any tags to the request. #: When :py:meth:`top.core.tracker.Tracker.start_task` #: is called, the tracker specific tags are automatically #: applied here. #: #: - `See OpenTelemetry specification <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute>`_ #: #: - `See some example OpenTelemetry attributes and labels <https://lightstep.com/opentelemetry/attributes-and-labels>`_ #: tags: Optional[dict] = None #: Application command line #: #: Space separated command that was used to start this process. #: Same as sys.argv. #: command_line: Optional[List[str]] = None def __eq__(self, other: "Task"): """All tasks are identified by their task_id attribute. """ return self.task_id == other.task_id def __hash__(self): """Allows easily create index (hash map) of all tasks.""" return hash((self.task_id))
[docs] def get_processor_tracking_id(self) -> str: """Get the key used in our tracking table (Redis) for this task. You can also override this function to have specific processor id scheme for your application. """ if self.process_internal_id: return f"{self.process_id}-{self.thread_id}-{self.process_internal_id}" else: return f"pid:{self.process_id} tid:{self.thread_id}"
[docs] def get_duration(self) -> Optional[datetime.timedelta]: """Get the duration of this task. - For completed tasks, return the actual duration - For active tasks, return how much time has passed since start - If start is missing, return None """ if self.started_at: if self.ended_at: return self.ended_at - self.started_at else: return datetime.datetime.now(datetime.timezone.utc) - self.started_at return None
[docs] def get_ago(self) -> Optional[datetime.timedelta]: """Get how long ago this task finished. :return: Relative to now, or None if the task is not yet finished """ if self.ended_at: return datetime.datetime.now(datetime.timezone.utc) - self.ended_at return None
[docs] def serialise(self) -> bytes: """Serialise using dataclasS_json""" blob = self.to_json().encode("utf-8") return blob
[docs] @classmethod def deserialise(cls, blob: bytes) -> "Task": """Serialise using dataclasses_json""" return cls.from_json(blob)
[docs] @classmethod def create_from_current_thread(cls, task_id: Union[str, int], processor_name: Optional[str] = None, **kwargs) -> "Task": """Create a task and assuming the processor is the current OS thread. Automatically labels the task to belong to the OS current process/thread. This will fill the following fields: - :py:attr:`process_id` - :py:attr:`thread_id` - :py:attr:`processor_name` - :py:attr:`host_name` (`socket.gethostname()`) - Command line (`sys.argv`) :param task_id: Something unique to identify this task. If nothing else then use Python object hash. :param processor_name: Framework specific name for this processor :param kwargs: Passed to :py:class:`Task` dataclass constructor. """ pid = os.getpid() tid = threading.get_ident() thread_name = threading.current_thread().name # https://stackoverflow.com/a/49610911/315168 host_name = socket.gethostname() if not processor_name: processor_name = f"{pid}:{thread_name}" return cls( task_id=task_id, process_id=pid, thread_id=tid, host_name=host_name, started_at=datetime.datetime.now(datetime.timezone.utc), processor_name=processor_name, command_line=sys.argv, **kwargs, )