Source code for osprofiler.drivers.otlp

# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import collections
from typing import Any
from urllib import parse as parser

from oslo_config import cfg
from oslo_serialization import jsonutils

from osprofiler import _utils as utils
from osprofiler.drivers import base
from osprofiler import exc


[docs] class OTLP(base.Driver): def __init__( self, connection_str: str, project: str | None = None, service: str | None = None, host: str | None = None, conf: cfg.ConfigOpts = cfg.CONF, **kwargs: Any, ) -> None: """OTLP driver using OTLP exporters.""" super().__init__( connection_str, project=project, service=service, host=host, conf=conf, **kwargs, ) try: from opentelemetry import trace as trace_api from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( OTLPSpanExporter, ) # noqa from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace import TracerProvider self.trace_api = trace_api except ImportError: raise exc.CommandError( "To use OSProfiler with OTLP exporters, " "please install `opentelemetry-sdk` and " "opentelemetry-exporter-otlp libraries. " "To install with pip:\n `pip install opentelemetry-sdk " "opentelemetry-exporter-otlp`." ) service_name = self._get_service_name(conf, project, service) resource = Resource(attributes={"service.name": service_name}) parsed_url = parser.urlparse(connection_str) # TODO("sahid"): We also want to handle https scheme? parsed_url = parsed_url._replace(scheme="http") self.trace_api.set_tracer_provider(TracerProvider(resource=resource)) self.tracer = self.trace_api.get_tracer(__name__) exporter = OTLPSpanExporter(f"{parsed_url.geturl()}/v1/traces") self.trace_api.get_tracer_provider().add_span_processor( # type: ignore[attr-defined] BatchSpanProcessor(exporter) ) self.spans: collections.deque[Any] = collections.deque() def _get_service_name( self, conf: cfg.ConfigOpts, project: str | None, service: str | None ) -> str: prefix = conf.profiler_otlp.service_name_prefix if prefix: return f"{prefix}-{project}-{service}" return f"{project}-{service}"
[docs] @classmethod def get_name(cls) -> str: return "otlp"
def _kind(self, name: str) -> Any: if "wsgi" in name: return self.trace_api.SpanKind.SERVER elif "db" in name or "http" in name or "api" in name: return self.trace_api.SpanKind.CLIENT return self.trace_api.SpanKind.INTERNAL def _name(self, payload: dict[str, Any]) -> str: info = payload["info"] if info.get("request"): return "WSGI_{}_{}".format( info["request"]["method"], info["request"]["path"] ) elif info.get("db"): return "SQL_{}".format( info["db"]["statement"].split(' ', 1)[0].upper() ) elif info.get("requests"): return "REQUESTS_{}_{}".format( info["requests"]["method"], info["requests"]["hostname"] ) return str(payload["name"]).rstrip("-start")
[docs] def notify(self, info: dict[str, Any], **kwargs: Any) -> None: payload = info if payload["name"].endswith("start"): ctx = None trace_id = utils.uuid_to_int128(payload["base_id"]) if payload["base_id"] != payload["parent_id"]: # only non-root spans should have a parent span and context parent = self.trace_api.SpanContext( trace_id=trace_id, span_id=utils.shorten_id(payload["parent_id"]), is_remote=False, trace_flags=self.trace_api.TraceFlags( self.trace_api.TraceFlags.SAMPLED ), ) ctx = self.trace_api.set_span_in_context( self.trace_api.NonRecordingSpan(parent) ) # OTLP Tracing span span = self.tracer.start_span( name=self._name(payload), kind=self._kind(payload['name']), attributes=self.create_span_tags(payload), context=ctx, ) span._context = self.trace_api.SpanContext( # type: ignore[attr-defined] trace_id=trace_id, span_id=utils.shorten_id(payload["trace_id"]), is_remote=span.context.is_remote, # type: ignore[attr-defined] trace_flags=span.context.trace_flags, # type: ignore[attr-defined] trace_state=span.context.trace_state, # type: ignore[attr-defined] ) self.spans.append(span) else: span = self.spans.pop() # Store result of db call and function call for call in ("db", "function"): if payload.get("info", {}).get(call, {}).get("result"): span.set_attribute( "result", payload["info"][call]["result"] ) # Store result of requests if payload.get("info", {}).get("requests"): span.set_attribute( "status_code", payload["info"]["requests"]["status_code"] ) # Span error tag and log if payload["info"].get("etype"): span.set_attribute("error", True) span.add_event( "log", { "error.kind": payload["info"]["etype"], "message": payload["info"]["message"], }, ) span.end()
[docs] def get_report(self, base_id: str) -> dict[str, Any]: return self._parse_results()
[docs] def list_traces( self, fields: set[str] | None = None ) -> list[dict[str, Any]]: return []
[docs] def list_error_traces(self) -> list[dict[str, Any]]: return []
[docs] def create_span_tags(self, payload: dict[str, Any]) -> dict[str, Any]: """Create tags an OpenTracing compatible span. :param info: Information from OSProfiler trace. :returns tags: A dictionary contains standard tags from OpenTracing sematic conventions, and some other custom tags related to http, db calls. """ tags: dict[str, Any] = {} info = payload["info"] if info.get("db"): # DB calls tags["db.statement"] = info["db"]["statement"] tags["db.params"] = jsonutils.dumps(info["db"]["params"]) elif info.get("request"): # WSGI call tags["http.path"] = info["request"]["path"] tags["http.query"] = info["request"]["query"] tags["http.method"] = info["request"]["method"] tags["http.scheme"] = info["request"]["scheme"] elif info.get("requests"): # requests call tags["http.path"] = info["requests"]["path"] tags["http.query"] = info["requests"]["query"] tags["http.method"] = info["requests"]["method"] tags["http.scheme"] = info["requests"]["scheme"] tags["http.hostname"] = info["requests"]["hostname"] tags["http.port"] = info["requests"]["port"] elif info.get("function"): # RPC, function calls if "args" in info["function"]: tags["args"] = info["function"]["args"] if "kwargs" in info["function"]: tags["kwargs"] = info["function"]["kwargs"] tags["name"] = info["function"]["name"] return tags