# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from typing import Any
from urllib import parse as parser
from oslo_config import cfg
from oslo_serialization import jsonutils
from osprofiler import _utils as utils
from osprofiler.drivers import base
from osprofiler import exc
[docs]
class OTLP(base.Driver):
def __init__(
self,
connection_str: str,
project: str | None = None,
service: str | None = None,
host: str | None = None,
conf: cfg.ConfigOpts = cfg.CONF,
**kwargs: Any,
) -> None:
"""OTLP driver using OTLP exporters."""
super().__init__(
connection_str,
project=project,
service=service,
host=host,
conf=conf,
**kwargs,
)
try:
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
) # noqa
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace import TracerProvider
self.trace_api = trace_api
except ImportError:
raise exc.CommandError(
"To use OSProfiler with OTLP exporters, "
"please install `opentelemetry-sdk` and "
"opentelemetry-exporter-otlp libraries. "
"To install with pip:\n `pip install opentelemetry-sdk "
"opentelemetry-exporter-otlp`."
)
service_name = self._get_service_name(conf, project, service)
resource = Resource(attributes={"service.name": service_name})
parsed_url = parser.urlparse(connection_str)
# TODO("sahid"): We also want to handle https scheme?
parsed_url = parsed_url._replace(scheme="http")
self.trace_api.set_tracer_provider(TracerProvider(resource=resource))
self.tracer = self.trace_api.get_tracer(__name__)
exporter = OTLPSpanExporter(f"{parsed_url.geturl()}/v1/traces")
self.trace_api.get_tracer_provider().add_span_processor( # type: ignore[attr-defined]
BatchSpanProcessor(exporter)
)
self.spans: collections.deque[Any] = collections.deque()
def _get_service_name(
self, conf: cfg.ConfigOpts, project: str | None, service: str | None
) -> str:
prefix = conf.profiler_otlp.service_name_prefix
if prefix:
return f"{prefix}-{project}-{service}"
return f"{project}-{service}"
[docs]
@classmethod
def get_name(cls) -> str:
return "otlp"
def _kind(self, name: str) -> Any:
if "wsgi" in name:
return self.trace_api.SpanKind.SERVER
elif "db" in name or "http" in name or "api" in name:
return self.trace_api.SpanKind.CLIENT
return self.trace_api.SpanKind.INTERNAL
def _name(self, payload: dict[str, Any]) -> str:
info = payload["info"]
if info.get("request"):
return "WSGI_{}_{}".format(
info["request"]["method"], info["request"]["path"]
)
elif info.get("db"):
return "SQL_{}".format(
info["db"]["statement"].split(' ', 1)[0].upper()
)
elif info.get("requests"):
return "REQUESTS_{}_{}".format(
info["requests"]["method"], info["requests"]["hostname"]
)
return str(payload["name"]).rstrip("-start")
[docs]
def notify(self, info: dict[str, Any], **kwargs: Any) -> None:
payload = info
if payload["name"].endswith("start"):
ctx = None
trace_id = utils.uuid_to_int128(payload["base_id"])
if payload["base_id"] != payload["parent_id"]:
# only non-root spans should have a parent span and context
parent = self.trace_api.SpanContext(
trace_id=trace_id,
span_id=utils.shorten_id(payload["parent_id"]),
is_remote=False,
trace_flags=self.trace_api.TraceFlags(
self.trace_api.TraceFlags.SAMPLED
),
)
ctx = self.trace_api.set_span_in_context(
self.trace_api.NonRecordingSpan(parent)
)
# OTLP Tracing span
span = self.tracer.start_span(
name=self._name(payload),
kind=self._kind(payload['name']),
attributes=self.create_span_tags(payload),
context=ctx,
)
span._context = self.trace_api.SpanContext( # type: ignore[attr-defined]
trace_id=trace_id,
span_id=utils.shorten_id(payload["trace_id"]),
is_remote=span.context.is_remote, # type: ignore[attr-defined]
trace_flags=span.context.trace_flags, # type: ignore[attr-defined]
trace_state=span.context.trace_state, # type: ignore[attr-defined]
)
self.spans.append(span)
else:
span = self.spans.pop()
# Store result of db call and function call
for call in ("db", "function"):
if payload.get("info", {}).get(call, {}).get("result"):
span.set_attribute(
"result", payload["info"][call]["result"]
)
# Store result of requests
if payload.get("info", {}).get("requests"):
span.set_attribute(
"status_code", payload["info"]["requests"]["status_code"]
)
# Span error tag and log
if payload["info"].get("etype"):
span.set_attribute("error", True)
span.add_event(
"log",
{
"error.kind": payload["info"]["etype"],
"message": payload["info"]["message"],
},
)
span.end()
[docs]
def get_report(self, base_id: str) -> dict[str, Any]:
return self._parse_results()
[docs]
def list_traces(
self, fields: set[str] | None = None
) -> list[dict[str, Any]]:
return []
[docs]
def list_error_traces(self) -> list[dict[str, Any]]:
return []