new file mode 100755
@@ -0,0 +1,376 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+'''
+DPDK telemetry exporter.
+
+It uses dynamically loaded endpoint exporters which are basic python files that
+must implement two functions:
+
+ def info() -> dict[MetricName, MetricInfo]:
+ """
+ Mapping of metric names to their description and type.
+ """
+
+ def metrics(sock: TelemetrySocket) -> list[MetricValue]:
+ """
+ Request data from sock and return it as metric values. A metric value
+ is a 3-tuple: (name: str, value: any, labels: dict). Each name must be
+ present in info().
+ """
+
+The sock argument passed to metrics() has a single method:
+
+ def cmd(self, uri, arg=None) -> dict | list:
+ """
+ Request JSON data to the telemetry socket and parse it to python
+ values.
+ """
+
+See existing endpoints for examples.
+
+The exporter supports multiple output formats:
+
+prometheus://ADDRESS:PORT
+openmetrics://ADDRESS:PORT
+ Expose the enabled endpoints via a local HTTP server listening on the
+ specified address and port. GET requests on that server are served with
+ text/plain responses in the prometheus/openmetrics format.
+
+ More details:
+ https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
+
+carbon://ADDRESS:PORT
+graphite://ADDRESS:PORT
+ Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle
+ carbon format.
+
+ More details:
+ https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
+'''
+
+import argparse
+from http import HTTPStatus, server
+import importlib.util
+import json
+import logging
+import os
+import pickle
+import re
+import socket
+import struct
+import sys
+import time
+import typing
+from urllib.parse import urlparse
+
+
+LOG = logging.getLogger(__name__)
+# Use local endpoints path only when running from source
+LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints")
+DEFAULT_LOAD_PATHS = []
+if os.path.isdir(LOCAL):
+ DEFAULT_LOAD_PATHS.append(LOCAL)
+DEFAULT_LOAD_PATHS += [
+ "/usr/local/share/dpdk/telemetry-endpoints",
+ "/usr/share/dpdk/telemetry-endpoints",
+]
+DEFAULT_OUTPUT = "openmetrics://:9876"
+
+
+def main():
+ logging.basicConfig(
+ stream=sys.stdout,
+ level=logging.INFO,
+ format="%(asctime)s %(levelname)s %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+ )
+ parser = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ parser.add_argument(
+ "-o",
+ "--output",
+ metavar="FORMAT://PARAMETERS",
+ default=urlparse(DEFAULT_OUTPUT),
+ type=urlparse,
+ help=f"""
+ Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format,
+ URL elements have different meanings. By default, the exporter starts a
+ local HTTP server on port 9876 that serves requests in the
+ prometheus/openmetrics plain text format.
+ """,
+ )
+ parser.add_argument(
+ "-p",
+ "--load-path",
+ dest="load_paths",
+ type=lambda v: v.split(os.pathsep),
+ default=DEFAULT_LOAD_PATHS,
+ help=f"""
+ The list of paths from which to disvover endpoints.
+ (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}").
+ """,
+ )
+ parser.add_argument(
+ "-e",
+ "--endpoint",
+ dest="endpoints",
+ action="append",
+ help="""
+ Telemetry endpoint to export (by default, all discovered endpoints are
+ enabled). This option can be specified more than once.
+ """,
+ )
+ parser.add_argument(
+ "-l",
+ "--list",
+ action="store_true",
+ help="""
+ Only list detected endpoints and exit.
+ """,
+ )
+ parser.add_argument(
+ "-s",
+ "--socket-path",
+ default="/run/dpdk/rte/dpdk_telemetry.v2",
+ help="""
+ The DPDK telemetry socket path (default: "%(default)s").
+ """,
+ )
+ args = parser.parse_args()
+ output = OUTPUT_FORMATS.get(args.output.scheme)
+ if output is None:
+ parser.error(f"unsupported output format: {args.output.scheme}://")
+ try:
+ endpoints = load_endpoints(args.load_paths, args.endpoints)
+ if args.list:
+ return
+ output(args, endpoints)
+ except KeyboardInterrupt:
+ pass
+ except Exception:
+ LOG.exception("")
+
+
+class TelemetrySocket:
+ """
+ Abstraction of the DPDK telemetry socket.
+ """
+
+ def __init__(self, path: str):
+ self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ self.sock.connect(path)
+ data = json.loads(self.sock.recv(1024).decode())
+ self.max_output_len = data["max_output_len"]
+
+ def cmd(
+ self, uri: str, arg: typing.Any = None
+ ) -> typing.Optional[typing.Union[dict, list]]:
+ """
+ Request JSON data to the telemetry socket and parse it to python
+ values.
+ """
+ if arg is not None:
+ u = f"{uri},{arg}"
+ else:
+ u = uri
+ self.sock.send(u.encode("utf-8"))
+ data = self.sock.recv(self.max_output_len)
+ return json.loads(data.decode("utf-8"))[uri]
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.sock.close()
+
+
+MetricDescription = str
+MetricType = str
+MetricName = str
+MetricLabels = typing.Dict[str, typing.Any]
+MetricInfo = typing.Tuple[MetricDescription, MetricType]
+MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels]
+
+
+class TelemetryEndpoint:
+ """
+ Placeholder class only used for typing annotations.
+ """
+
+ @staticmethod
+ def info() -> typing.Dict[MetricName, MetricInfo]:
+ """
+ Mapping of metric names to their description and type.
+ """
+ raise NotImplementedError()
+
+ @staticmethod
+ def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
+ """
+ Request data from sock and return it as metric values. Each metric
+ name must be present in info().
+ """
+ raise NotImplementedError()
+
+
+def load_endpoints(
+ paths: typing.List[str], names: typing.List[str]
+) -> typing.List[TelemetryEndpoint]:
+ """
+ Load selected telemetry endpoints from the specified paths.
+ """
+
+ endpoints = {}
+ dwb = sys.dont_write_bytecode
+ sys.dont_write_bytecode = True # never generate .pyc files for endpoints
+
+ for p in paths:
+ if not os.path.isdir(p):
+ continue
+ for fname in os.listdir(p):
+ f = os.path.join(p, fname)
+ if os.path.isdir(f):
+ continue
+ try:
+ name, _ = os.path.splitext(fname)
+ if names is not None and name not in names:
+ # not selected by user
+ continue
+ if name in endpoints:
+ # endpoint with same name already loaded
+ continue
+ spec = importlib.util.spec_from_file_location(name, f)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ endpoints[name] = module
+ except Exception:
+ LOG.exception("parsing endpoint: %s", f)
+
+ sys.dont_write_bytecode = dwb
+
+ modules = []
+ info = {}
+ for name, module in sorted(endpoints.items()):
+ LOG.info("using endpoint: %s (from %s)", name, module.__file__)
+ try:
+ for metric, (description, type_) in module.info().items():
+ info[(name, metric)] = (description, type_)
+ modules.append(module)
+ except Exception:
+ LOG.exception("getting endpoint info: %s", name)
+ return modules
+
+
+def serve_openmetrics(
+ args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]
+):
+ """
+ Start an HTTP server and serve requests in the openmetrics/prometheus
+ format.
+ """
+ listen = (args.output.hostname or "", int(args.output.port or 80))
+ with server.HTTPServer(listen, OpenmetricsHandler) as httpd:
+ httpd.dpdk_socket_path = args.socket_path
+ httpd.telemetry_endpoints = endpoints
+ LOG.info("listening on port %s", httpd.server_port)
+ httpd.serve_forever()
+
+
+class OpenmetricsHandler(server.BaseHTTPRequestHandler):
+ """
+ Basic HTTP handler that returns prometheus/openmetrics formatted responses.
+ """
+
+ CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"
+
+ def escape(self, value: typing.Any) -> str:
+ """
+ Escape a metric label value.
+ """
+ value = str(value)
+ value = value.replace('"', '\\"')
+ value = value.replace("\\", "\\\\")
+ return value.replace("\n", "\\n")
+
+ def do_GET(self):
+ """
+ Called uppon GET requests.
+ """
+ try:
+ lines = []
+ metrics_names = set()
+ with TelemetrySocket(self.server.dpdk_socket_path) as sock:
+ for e in self.server.telemetry_endpoints:
+ info = e.info()
+ metrics_lines = []
+ for name, value, labels in e.metrics(sock):
+ fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}")
+ labels = ", ".join(
+ f'{k}="{self.escape(v)}"' for k, v in labels.items()
+ )
+ if labels:
+ labels = f"{{{labels}}}"
+ metrics_lines.append(f"{fullname}{labels} {value}")
+ if fullname not in metrics_names:
+ metrics_names.add(fullname)
+ desc, metric_type = info[name]
+ lines += [
+ f"# HELP {fullname} {desc}",
+ f"# TYPE {fullname} {metric_type}",
+ ]
+ lines += metrics_lines
+ body = "\n".join(lines).encode("utf-8") + b"\n"
+ self.send_response(HTTPStatus.OK)
+ self.send_header("Content-Type", self.CONTENT_TYPE)
+ self.send_header("Content-Length", str(len(body)))
+ self.end_headers()
+ self.wfile.write(body)
+ LOG.info("%s %s", self.address_string(), self.requestline)
+
+ except Exception as e:
+ if isinstance(e, (FileNotFoundError, ConnectionRefusedError)):
+ self.send_error(HTTPStatus.SERVICE_UNAVAILABLE)
+ else:
+ self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
+ LOG.exception("%s %s", self.address_string(), self.requestline)
+
+ def log_message(self, fmt, *args):
+ pass # disable built-in logger
+
+
+def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]):
+ """
+ Collect all metrics and export them to a carbon server in the pickle format.
+ """
+ addr = (args.output.hostname or "", int(args.output.port or 80))
+ with TelemetrySocket(args.socket_path) as dpdk:
+ with socket.socket() as carbon:
+ carbon.connect(addr)
+ metrics = []
+ for e in endpoints:
+ for name, value, labels in e.metrics(dpdk):
+ fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}")
+ for key, val in labels.items():
+ val = str(val).replace(";", "")
+ fullname += f";{key}={val}"
+ metrics.append((fullname, (time.time(), value)))
+ payload = pickle.dumps(metrics, protocol=2)
+ header = struct.pack("!L", len(payload))
+ buf = header + payload
+ carbon.sendall(buf)
+
+
+OUTPUT_FORMATS = {
+ "openmetrics": serve_openmetrics,
+ "prometheus": serve_openmetrics,
+ "carbon": export_carbon,
+ "graphite": export_carbon,
+}
+
+
+if __name__ == "__main__":
+ main()
@@ -11,5 +11,11 @@ install_data([
'dpdk-telemetry.py',
'dpdk-hugepages.py',
'dpdk-rss-flows.py',
+ 'dpdk-telemetry-exporter.py',
],
install_dir: 'bin')
+
+install_subdir(
+ 'telemetry-endpoints',
+ install_dir: 'share/dpdk',
+ strip_directory: false)
new file mode 100644
@@ -0,0 +1,47 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+RX_PACKETS = "rx_packets"
+RX_BYTES = "rx_bytes"
+RX_MISSED = "rx_missed"
+RX_NOMBUF = "rx_nombuf"
+RX_ERRORS = "rx_errors"
+TX_PACKETS = "tx_packets"
+TX_BYTES = "tx_bytes"
+TX_ERRORS = "tx_errors"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+ return {
+ RX_PACKETS: ("Number of successfully received packets.", "counter"),
+ RX_BYTES: ("Number of successfully received bytes.", "counter"),
+ RX_MISSED: (
+ "Number of packets dropped by the HW because Rx queues are full.",
+ "counter",
+ ),
+ RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"),
+ RX_ERRORS: ("Number of erroneous received packets.", "counter"),
+ TX_PACKETS: ("Number of successfully transmitted packets.", "counter"),
+ TX_BYTES: ("Number of successfully transmitted bytes.", "counter"),
+ TX_ERRORS: ("Number of packet transmission failures.", "counter"),
+ }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+ out = []
+ for port_id in sock.cmd("/ethdev/list"):
+ port = sock.cmd("/ethdev/info", port_id)
+ stats = sock.cmd("/ethdev/stats", port_id)
+ labels = {"port": port["name"]}
+ out += [
+ (RX_PACKETS, stats["ipackets"], labels),
+ (RX_PACKETS, stats["ipackets"], labels),
+ (RX_BYTES, stats["ibytes"], labels),
+ (RX_MISSED, stats["imissed"], labels),
+ (RX_NOMBUF, stats["rx_nombuf"], labels),
+ (RX_ERRORS, stats["ierrors"], labels),
+ (TX_PACKETS, stats["opackets"], labels),
+ (TX_BYTES, stats["obytes"], labels),
+ (TX_ERRORS, stats["oerrors"], labels),
+ ]
+ return out
new file mode 100644
@@ -0,0 +1,29 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+CPU_TOTAL = "total_cycles"
+CPU_BUSY = "busy_cycles"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+ return {
+ CPU_TOTAL: ("Total number of CPU cycles.", "counter"),
+ CPU_BUSY: ("Number of busy CPU cycles.", "counter"),
+ }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+ out = []
+ for lcore_id in sock.cmd("/eal/lcore/list"):
+ lcore = sock.cmd("/eal/lcore/info", lcore_id)
+ cpu = ",".join(str(c) for c in lcore.get("cpuset", []))
+ total = lcore.get("total_cycles")
+ busy = lcore.get("busy_cycles", 0)
+ if not (cpu and total):
+ continue
+ labels = {"cpu": cpu, "numa": lcore.get("socket", 0)}
+ out += [
+ (CPU_TOTAL, total, labels),
+ (CPU_BUSY, busy, labels),
+ ]
+ return out
new file mode 100644
@@ -0,0 +1,37 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+MEM_TOTAL = "total_bytes"
+MEM_USED = "used_bytes"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+ return {
+ MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"),
+ MEM_USED: ("The currently used memory in bytes.", "gauge"),
+ }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+ zones = {}
+ used = 0
+ for zone in sock.cmd("/eal/memzone_list") or []:
+ z = sock.cmd("/eal/memzone_info", zone)
+ start = int(z["Hugepage_base"], 16)
+ end = start + (z["Hugepage_size"] * z["Hugepage_used"])
+ used += z["Length"]
+ for s, e in list(zones.items()):
+ if s < start < e < end:
+ zones[s] = end
+ break
+ if start < s < end < e:
+ del zones[s]
+ zones[start] = e
+ break
+ else:
+ zones[start] = end
+
+ return [
+ (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}),
+ (MEM_USED, max(0, used), {}),
+ ]