106 lines
3.9 KiB
Python
106 lines
3.9 KiB
Python
import inspect
|
|
import logging
|
|
from types import TracebackType
|
|
from typing import Any, Dict, Optional, Type
|
|
|
|
from ._models import Request
|
|
|
|
|
|
class Trace:
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
logger: logging.Logger,
|
|
request: Optional[Request] = None,
|
|
kwargs: Optional[Dict[str, Any]] = None,
|
|
) -> None:
|
|
self.name = name
|
|
self.logger = logger
|
|
self.trace_extension = (
|
|
None if request is None else request.extensions.get("trace")
|
|
)
|
|
self.debug = self.logger.isEnabledFor(logging.DEBUG)
|
|
self.kwargs = kwargs or {}
|
|
self.return_value: Any = None
|
|
self.should_trace = self.debug or self.trace_extension is not None
|
|
self.prefix = self.logger.name.split(".")[-1]
|
|
|
|
def trace(self, name: str, info: Dict[str, Any]) -> None:
|
|
if self.trace_extension is not None:
|
|
prefix_and_name = f"{self.prefix}.{name}"
|
|
ret = self.trace_extension(prefix_and_name, info)
|
|
if inspect.iscoroutine(ret): # pragma: no cover
|
|
raise TypeError(
|
|
"If you are using a synchronous interface, "
|
|
"the callback of the `trace` extension should "
|
|
"be a normal function instead of an asynchronous function."
|
|
)
|
|
|
|
if self.debug:
|
|
if not info or "return_value" in info and info["return_value"] is None:
|
|
message = name
|
|
else:
|
|
args = " ".join([f"{key}={value!r}" for key, value in info.items()])
|
|
message = f"{name} {args}"
|
|
self.logger.debug(message)
|
|
|
|
def __enter__(self) -> "Trace":
|
|
if self.should_trace:
|
|
info = self.kwargs
|
|
self.trace(f"{self.name}.started", info)
|
|
return self
|
|
|
|
def __exit__(
|
|
self,
|
|
exc_type: Optional[Type[BaseException]] = None,
|
|
exc_value: Optional[BaseException] = None,
|
|
traceback: Optional[TracebackType] = None,
|
|
) -> None:
|
|
if self.should_trace:
|
|
if exc_value is None:
|
|
info = {"return_value": self.return_value}
|
|
self.trace(f"{self.name}.complete", info)
|
|
else:
|
|
info = {"exception": exc_value}
|
|
self.trace(f"{self.name}.failed", info)
|
|
|
|
async def atrace(self, name: str, info: Dict[str, Any]) -> None:
|
|
if self.trace_extension is not None:
|
|
prefix_and_name = f"{self.prefix}.{name}"
|
|
coro = self.trace_extension(prefix_and_name, info)
|
|
if not inspect.iscoroutine(coro): # pragma: no cover
|
|
raise TypeError(
|
|
"If you're using an asynchronous interface, "
|
|
"the callback of the `trace` extension should "
|
|
"be an asynchronous function rather than a normal function."
|
|
)
|
|
await coro
|
|
|
|
if self.debug:
|
|
if not info or "return_value" in info and info["return_value"] is None:
|
|
message = name
|
|
else:
|
|
args = " ".join([f"{key}={value!r}" for key, value in info.items()])
|
|
message = f"{name} {args}"
|
|
self.logger.debug(message)
|
|
|
|
async def __aenter__(self) -> "Trace":
|
|
if self.should_trace:
|
|
info = self.kwargs
|
|
await self.atrace(f"{self.name}.started", info)
|
|
return self
|
|
|
|
async def __aexit__(
|
|
self,
|
|
exc_type: Optional[Type[BaseException]] = None,
|
|
exc_value: Optional[BaseException] = None,
|
|
traceback: Optional[TracebackType] = None,
|
|
) -> None:
|
|
if self.should_trace:
|
|
if exc_value is None:
|
|
info = {"return_value": self.return_value}
|
|
await self.atrace(f"{self.name}.complete", info)
|
|
else:
|
|
info = {"exception": exc_value}
|
|
await self.atrace(f"{self.name}.failed", info)
|