[RFC] usertools: add telemetry exporter

Message ID 20230926163442.844006-2-rjarry@redhat.com (mailing list archive)
State New
Delegated to: Thomas Monjalon
Headers
Series [RFC] usertools: add telemetry exporter |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/loongarch-compilation success Compilation OK
ci/loongarch-unit-testing success Unit Testing PASS
ci/Intel-compilation success Compilation OK
ci/intel-Testing success Testing PASS
ci/intel-Functional success Functional PASS

Commit Message

Robin Jarry Sept. 26, 2023, 4:34 p.m. UTC
  For now the telemetry socket is local to the machine running a DPDK
application. Also, there is no official "schema" for the exposed
metrics. Add a framework and a script to collect and expose these
metrics to telemetry and observability agree gators such as Prometheus,
Carbon or Influxdb. The exposed data must be done with end-users in
mind, some DPDK terminology or internals may not make sense to everyone.

The script only serves as an entry point and does not know anything
about any specific metrics nor JSON data structures exposed in the
telemetry socket.

It uses dynamically loaded endpoint exporters which are basic python
files that must implement two functions:

 def info() -> dict[MetricName, MetricInfo]:
     Mapping of metric names to their description and type.

 def metrics(sock: TelemetrySocket) -> list[MetricValue]:
     Request data from sock and return it as metric values. A metric
     value is a 3-tuple: (name: str, value: any, labels: dict). Each
     name must be present in info().

The sock argument passed to metrics() has a single method:

 def cmd(self, uri: str, arg: any = None) -> dict | list:
     Request JSON data to the telemetry socket and parse it to python
     values.

The main script invokes endpoints and exports the data into an output
format. For now, only two formats are implemented:

* openmetrics/prometheus: text based format exported via a local HTTP
  server.
* carbon/graphite: binary (python pickle) format exported to a distant
  carbon TCP server.

As a starting point, 3 built-in endpoints are implemented:

* counters: ethdev hardware counters
* cpu: lcore usage
* memory: overall memory usage

The goal is to keep all built-in endpoints in the DPDK repository so
that they can be updated along with the telemetry JSON data structures.

Example output for the openmetrics:// format:

 ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 &
 INFO using endpoint: counters (from .../telemetry-endpoints/counters.py)
 INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py)
 INFO using endpoint: memory (from .../telemetry-endpoints/memory.py)
 INFO listening on port 9876
 [1] 838829

 ~$ curl http://127.0.0.1:9876/
 # HELP dpdk_cpu_total_cycles Total number of CPU cycles.
 # TYPE dpdk_cpu_total_cycles counter
 # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles.
 # TYPE dpdk_cpu_busy_cycles counter
 dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980
 dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860
 dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740
 dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860
 dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540
 dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160
 dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320
 dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860
 # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes.
 # TYPE dpdk_memory_total_bytes gauge
 # HELP dpdk_memory_used_bytes The currently used memory in bytes.
 # TYPE dpdk_memory_used_bytes gauge
 dpdk_memory_total_bytes 1073741824
 dpdk_memory_used_bytes 794197376

Link: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
Link: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format
Link: https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
Link: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus
Signed-off-by: Robin Jarry <rjarry@redhat.com>
---

Notes:
    v1:
    
    * Ideally, this script should be tested in CI to avoid breakage when the
      telemetry data structures change. I don't know where such a test could
      be done.
    
    * There was work done 3/4 years ago in collectd to add DPDK telemetry
      support but I think this has now been abandoned.
    
      https://github.com/collectd/collectd/blob/main/src/dpdk_telemetry.c
    
      I think that keeping the exporters in the DPDK repository makes more
      sense from a maintainability perspective.

 usertools/dpdk-telemetry-exporter.py      | 376 ++++++++++++++++++++++
 usertools/meson.build                     |   6 +
 usertools/telemetry-endpoints/counters.py |  47 +++
 usertools/telemetry-endpoints/cpu.py      |  29 ++
 usertools/telemetry-endpoints/memory.py   |  37 +++
 5 files changed, 495 insertions(+)
 create mode 100755 usertools/dpdk-telemetry-exporter.py
 create mode 100644 usertools/telemetry-endpoints/counters.py
 create mode 100644 usertools/telemetry-endpoints/cpu.py
 create mode 100644 usertools/telemetry-endpoints/memory.py
  

Comments

Robin Jarry Nov. 20, 2023, 1:26 p.m. UTC | #1
Gentle ping.

Is anybody interested in this?
  
Anthony Harivel March 27, 2024, 3:18 p.m. UTC | #2
Unaddressed
Hi Robin, 

Thanks for this patch. I did test it and it works as expected. 
Nonetheless, maybe we can improve on some parts. 

In 'class  TelemetrySocket', there is:
...
self.sock.connect(path)
data = json.loads(self.sock.recv(1024).decode())
...

Maybe we can improve with something like: 

        try:
	    rcv_data = self.sock.recv(1024)

            if rcv_data:
                data = json.loads(rcv_data.decode())
            else:
                print("No data received from socket.")
        except json.JSONDecodeError as e:
                print("Error decoding JSON:", e)
        except Exception as e:
                print("An error occurred:", e)

So that it handles a bit better the error cases.

In the same way to implement more robust error handling mechanisms in:
def load_endpoints
...
except Exception as e:
    LOG.error("Failed to load endpoint module '%s' from '%s': %s", name, f, e)
...

For example, you might catch FileNotFoundError, ImportError, or SyntaxError.
That could help to debug!


About TelemetryEndpoint I would see something like: 

class TelemetryEndpoint:
    """
    Placeholder class only used for typing annotations.
    """

    @staticmethod
    def info() -> typing.Dict[MetricName, MetricInfo]:
        """
        Mapping of metric names to their description and type.
        """
        raise NotImplementedError()

    @staticmethod
    def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
        """
        Request data from sock and return it as metric values. Each metric
        name must be present in info().
        """
        try:
            metrics = []
            metrics_data = sock.fetch_metrics_data()
            for metric_name, metric_value in metrics_data.items():
                metrics.append((metric_name, metric_value, {}))
            return metrics
        except Exception as e:
            LOG.error("Failed to fetch metrics data: %s", e)
            # If unable to fetch metrics data, return an empty list
            return []

With these changes, the metrics method of the TelemetryEndpoint class could
handle errors better and the exporter can continue functioning even if there
are issues with fetching metrics data.

I don't know if all of that makes sens or if it's just nitpicking !
I can also propose an enhanced version of your patch if you prefer.

Regards,
Anthony
  
Robin Jarry April 1, 2024, 9:28 p.m. UTC | #3
Anthony Harivel, Mar 27, 2024 at 16:18:
> Hi Robin,
>
> Thanks for this patch. I did test it and it works as expected. 
> Nonetheless, maybe we can improve on some parts.

Hey Anthony, thanks a lot for testing!

> In 'class  TelemetrySocket', there is:
> ...
> self.sock.connect(path)
> data = json.loads(self.sock.recv(1024).decode())
> ...
>
> Maybe we can improve with something like:
>
>         try:
>             rcv_data = self.sock.recv(1024)
>             if rcv_data:
>                 data = json.loads(rcv_data.decode())
>             else:
>                 print("No data received from socket.")
>         except json.JSONDecodeError as e:
>                 print("Error decoding JSON:", e)
>         except Exception as e:
>                 print("An error occurred:", e)
>
> So that it handles a bit better the error cases.

This is undesired as it would actually mask the errors from the calling 
code. Unless you can do something about the error, printing it should be 
left to the calling code. I will handle these errors better in v2.

> In the same way to implement more robust error handling mechanisms in:
> def load_endpoints
> ...
> except Exception as e:
>     LOG.error("Failed to load endpoint module '%s' from '%s': %s", name, f, e)
> ...
>
> For example, you might catch FileNotFoundError, ImportError, or SyntaxError.
> That could help to debug!

We could print the whole stack trace but I don't see what special 
handling could be done depending on the exception. I will try to make 
this better for v2.

> About TelemetryEndpoint I would see something like:
>
> class TelemetryEndpoint:
>     """
>     Placeholder class only used for typing annotations.
>     """
>
>     @staticmethod
>     def info() -> typing.Dict[MetricName, MetricInfo]:
>         """
>         Mapping of metric names to their description and type.
>         """
>         raise NotImplementedError()
>
>     @staticmethod
>     def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
>         """
>         Request data from sock and return it as metric values. Each metric
>         name must be present in info().
>         """
>         try:
>             metrics = []
>             metrics_data = sock.fetch_metrics_data()
>             for metric_name, metric_value in metrics_data.items():
>                 metrics.append((metric_name, metric_value, {}))
>             return metrics
>         except Exception as e:
>             LOG.error("Failed to fetch metrics data: %s", e)
>             # If unable to fetch metrics data, return an empty list
>             return []
>
> With these changes, the metrics method of the TelemetryEndpoint class 
> could handle errors better and the exporter can continue functioning 
> even if there are issues with fetching metrics data.
>
> I don't know if all of that makes sens or if it's just nitpicking! 
> I can also propose an enhanced version of your patch if you prefer.

As indicated in the docstring, this class is merely a placeholder. Its 
code is never executed. It only stands to represent what functions must 
be implemented in endpoints.

However, your point is valid. I will update my code to handle errors in 
endpoints more gracefully and avoid failing the whole request if there 
is only one error.

Thanks for the thorough review!
  

Patch

diff --git a/usertools/dpdk-telemetry-exporter.py b/usertools/dpdk-telemetry-exporter.py
new file mode 100755
index 000000000000..6c1495a9e1e8
--- /dev/null
+++ b/usertools/dpdk-telemetry-exporter.py
@@ -0,0 +1,376 @@ 
+#!/usr/bin/env python3
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+'''
+DPDK telemetry exporter.
+
+It uses dynamically loaded endpoint exporters which are basic python files that
+must implement two functions:
+
+    def info() -> dict[MetricName, MetricInfo]:
+        """
+        Mapping of metric names to their description and type.
+        """
+
+    def metrics(sock: TelemetrySocket) -> list[MetricValue]:
+        """
+        Request data from sock and return it as metric values. A metric value
+        is a 3-tuple: (name: str, value: any, labels: dict). Each name must be
+        present in info().
+        """
+
+The sock argument passed to metrics() has a single method:
+
+    def cmd(self, uri, arg=None) -> dict | list:
+        """
+        Request JSON data to the telemetry socket and parse it to python
+        values.
+        """
+
+See existing endpoints for examples.
+
+The exporter supports multiple output formats:
+
+prometheus://ADDRESS:PORT
+openmetrics://ADDRESS:PORT
+  Expose the enabled endpoints via a local HTTP server listening on the
+  specified address and port. GET requests on that server are served with
+  text/plain responses in the prometheus/openmetrics format.
+
+  More details:
+  https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
+
+carbon://ADDRESS:PORT
+graphite://ADDRESS:PORT
+  Export all enabled endpoints to the specified TCP ADDRESS:PORT in the pickle
+  carbon format.
+
+  More details:
+  https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol
+'''
+
+import argparse
+from http import HTTPStatus, server
+import importlib.util
+import json
+import logging
+import os
+import pickle
+import re
+import socket
+import struct
+import sys
+import time
+import typing
+from urllib.parse import urlparse
+
+
+LOG = logging.getLogger(__name__)
+# Use local endpoints path only when running from source
+LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints")
+DEFAULT_LOAD_PATHS = []
+if os.path.isdir(LOCAL):
+    DEFAULT_LOAD_PATHS.append(LOCAL)
+DEFAULT_LOAD_PATHS += [
+    "/usr/local/share/dpdk/telemetry-endpoints",
+    "/usr/share/dpdk/telemetry-endpoints",
+]
+DEFAULT_OUTPUT = "openmetrics://:9876"
+
+
+def main():
+    logging.basicConfig(
+        stream=sys.stdout,
+        level=logging.INFO,
+        format="%(asctime)s %(levelname)s %(message)s",
+        datefmt="%Y-%m-%d %H:%M:%S",
+    )
+    parser = argparse.ArgumentParser(
+        description=__doc__,
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+    )
+    parser.add_argument(
+        "-o",
+        "--output",
+        metavar="FORMAT://PARAMETERS",
+        default=urlparse(DEFAULT_OUTPUT),
+        type=urlparse,
+        help=f"""
+        Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format,
+        URL elements have different meanings. By default, the exporter starts a
+        local HTTP server on port 9876 that serves requests in the
+        prometheus/openmetrics plain text format.
+        """,
+    )
+    parser.add_argument(
+        "-p",
+        "--load-path",
+        dest="load_paths",
+        type=lambda v: v.split(os.pathsep),
+        default=DEFAULT_LOAD_PATHS,
+        help=f"""
+        The list of paths from which to disvover endpoints.
+        (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}").
+        """,
+    )
+    parser.add_argument(
+        "-e",
+        "--endpoint",
+        dest="endpoints",
+        action="append",
+        help="""
+        Telemetry endpoint to export (by default, all discovered endpoints are
+        enabled). This option can be specified more than once.
+        """,
+    )
+    parser.add_argument(
+        "-l",
+        "--list",
+        action="store_true",
+        help="""
+        Only list detected endpoints and exit.
+        """,
+    )
+    parser.add_argument(
+        "-s",
+        "--socket-path",
+        default="/run/dpdk/rte/dpdk_telemetry.v2",
+        help="""
+        The DPDK telemetry socket path (default: "%(default)s").
+        """,
+    )
+    args = parser.parse_args()
+    output = OUTPUT_FORMATS.get(args.output.scheme)
+    if output is None:
+        parser.error(f"unsupported output format: {args.output.scheme}://")
+    try:
+        endpoints = load_endpoints(args.load_paths, args.endpoints)
+        if args.list:
+            return
+        output(args, endpoints)
+    except KeyboardInterrupt:
+        pass
+    except Exception:
+        LOG.exception("")
+
+
+class TelemetrySocket:
+    """
+    Abstraction of the DPDK telemetry socket.
+    """
+
+    def __init__(self, path: str):
+        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        self.sock.connect(path)
+        data = json.loads(self.sock.recv(1024).decode())
+        self.max_output_len = data["max_output_len"]
+
+    def cmd(
+        self, uri: str, arg: typing.Any = None
+    ) -> typing.Optional[typing.Union[dict, list]]:
+        """
+        Request JSON data to the telemetry socket and parse it to python
+        values.
+        """
+        if arg is not None:
+            u = f"{uri},{arg}"
+        else:
+            u = uri
+        self.sock.send(u.encode("utf-8"))
+        data = self.sock.recv(self.max_output_len)
+        return json.loads(data.decode("utf-8"))[uri]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.sock.close()
+
+
+MetricDescription = str
+MetricType = str
+MetricName = str
+MetricLabels = typing.Dict[str, typing.Any]
+MetricInfo = typing.Tuple[MetricDescription, MetricType]
+MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels]
+
+
+class TelemetryEndpoint:
+    """
+    Placeholder class only used for typing annotations.
+    """
+
+    @staticmethod
+    def info() -> typing.Dict[MetricName, MetricInfo]:
+        """
+        Mapping of metric names to their description and type.
+        """
+        raise NotImplementedError()
+
+    @staticmethod
+    def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]:
+        """
+        Request data from sock and return it as metric values. Each metric
+        name must be present in info().
+        """
+        raise NotImplementedError()
+
+
+def load_endpoints(
+    paths: typing.List[str], names: typing.List[str]
+) -> typing.List[TelemetryEndpoint]:
+    """
+    Load selected telemetry endpoints from the specified paths.
+    """
+
+    endpoints = {}
+    dwb = sys.dont_write_bytecode
+    sys.dont_write_bytecode = True  # never generate .pyc files for endpoints
+
+    for p in paths:
+        if not os.path.isdir(p):
+            continue
+        for fname in os.listdir(p):
+            f = os.path.join(p, fname)
+            if os.path.isdir(f):
+                continue
+            try:
+                name, _ = os.path.splitext(fname)
+                if names is not None and name not in names:
+                    # not selected by user
+                    continue
+                if name in endpoints:
+                    # endpoint with same name already loaded
+                    continue
+                spec = importlib.util.spec_from_file_location(name, f)
+                module = importlib.util.module_from_spec(spec)
+                spec.loader.exec_module(module)
+                endpoints[name] = module
+            except Exception:
+                LOG.exception("parsing endpoint: %s", f)
+
+    sys.dont_write_bytecode = dwb
+
+    modules = []
+    info = {}
+    for name, module in sorted(endpoints.items()):
+        LOG.info("using endpoint: %s (from %s)", name, module.__file__)
+        try:
+            for metric, (description, type_) in module.info().items():
+                info[(name, metric)] = (description, type_)
+            modules.append(module)
+        except Exception:
+            LOG.exception("getting endpoint info: %s", name)
+    return modules
+
+
+def serve_openmetrics(
+    args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]
+):
+    """
+    Start an HTTP server and serve requests in the openmetrics/prometheus
+    format.
+    """
+    listen = (args.output.hostname or "", int(args.output.port or 80))
+    with server.HTTPServer(listen, OpenmetricsHandler) as httpd:
+        httpd.dpdk_socket_path = args.socket_path
+        httpd.telemetry_endpoints = endpoints
+        LOG.info("listening on port %s", httpd.server_port)
+        httpd.serve_forever()
+
+
+class OpenmetricsHandler(server.BaseHTTPRequestHandler):
+    """
+    Basic HTTP handler that returns prometheus/openmetrics formatted responses.
+    """
+
+    CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"
+
+    def escape(self, value: typing.Any) -> str:
+        """
+        Escape a metric label value.
+        """
+        value = str(value)
+        value = value.replace('"', '\\"')
+        value = value.replace("\\", "\\\\")
+        return value.replace("\n", "\\n")
+
+    def do_GET(self):
+        """
+        Called uppon GET requests.
+        """
+        try:
+            lines = []
+            metrics_names = set()
+            with TelemetrySocket(self.server.dpdk_socket_path) as sock:
+                for e in self.server.telemetry_endpoints:
+                    info = e.info()
+                    metrics_lines = []
+                    for name, value, labels in e.metrics(sock):
+                        fullname = re.sub(r"\W", "_", f"dpdk_{e.__name__}_{name}")
+                        labels = ", ".join(
+                            f'{k}="{self.escape(v)}"' for k, v in labels.items()
+                        )
+                        if labels:
+                            labels = f"{{{labels}}}"
+                        metrics_lines.append(f"{fullname}{labels} {value}")
+                        if fullname not in metrics_names:
+                            metrics_names.add(fullname)
+                            desc, metric_type = info[name]
+                            lines += [
+                                f"# HELP {fullname} {desc}",
+                                f"# TYPE {fullname} {metric_type}",
+                            ]
+                    lines += metrics_lines
+            body = "\n".join(lines).encode("utf-8") + b"\n"
+            self.send_response(HTTPStatus.OK)
+            self.send_header("Content-Type", self.CONTENT_TYPE)
+            self.send_header("Content-Length", str(len(body)))
+            self.end_headers()
+            self.wfile.write(body)
+            LOG.info("%s %s", self.address_string(), self.requestline)
+
+        except Exception as e:
+            if isinstance(e, (FileNotFoundError, ConnectionRefusedError)):
+                self.send_error(HTTPStatus.SERVICE_UNAVAILABLE)
+            else:
+                self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
+            LOG.exception("%s %s", self.address_string(), self.requestline)
+
+    def log_message(self, fmt, *args):
+        pass  # disable built-in logger
+
+
+def export_carbon(args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint]):
+    """
+    Collect all metrics and export them to a carbon server in the pickle format.
+    """
+    addr = (args.output.hostname or "", int(args.output.port or 80))
+    with TelemetrySocket(args.socket_path) as dpdk:
+        with socket.socket() as carbon:
+            carbon.connect(addr)
+            metrics = []
+            for e in endpoints:
+                for name, value, labels in e.metrics(dpdk):
+                    fullname = re.sub(r"\W", ".", f"dpdk.{e.__name__}.{name}")
+                    for key, val in labels.items():
+                        val = str(val).replace(";", "")
+                        fullname += f";{key}={val}"
+                    metrics.append((fullname, (time.time(), value)))
+            payload = pickle.dumps(metrics, protocol=2)
+            header = struct.pack("!L", len(payload))
+            buf = header + payload
+            carbon.sendall(buf)
+
+
+OUTPUT_FORMATS = {
+    "openmetrics": serve_openmetrics,
+    "prometheus": serve_openmetrics,
+    "carbon": export_carbon,
+    "graphite": export_carbon,
+}
+
+
+if __name__ == "__main__":
+    main()
diff --git a/usertools/meson.build b/usertools/meson.build
index 740b4832f36d..eb48e2f4403f 100644
--- a/usertools/meson.build
+++ b/usertools/meson.build
@@ -11,5 +11,11 @@  install_data([
             'dpdk-telemetry.py',
             'dpdk-hugepages.py',
             'dpdk-rss-flows.py',
+            'dpdk-telemetry-exporter.py',
         ],
         install_dir: 'bin')
+
+install_subdir(
+        'telemetry-endpoints',
+        install_dir: 'share/dpdk',
+        strip_directory: false)
diff --git a/usertools/telemetry-endpoints/counters.py b/usertools/telemetry-endpoints/counters.py
new file mode 100644
index 000000000000..e17cffb43b2c
--- /dev/null
+++ b/usertools/telemetry-endpoints/counters.py
@@ -0,0 +1,47 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+RX_PACKETS = "rx_packets"
+RX_BYTES = "rx_bytes"
+RX_MISSED = "rx_missed"
+RX_NOMBUF = "rx_nombuf"
+RX_ERRORS = "rx_errors"
+TX_PACKETS = "tx_packets"
+TX_BYTES = "tx_bytes"
+TX_ERRORS = "tx_errors"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+    return {
+        RX_PACKETS: ("Number of successfully received packets.", "counter"),
+        RX_BYTES: ("Number of successfully received bytes.", "counter"),
+        RX_MISSED: (
+            "Number of packets dropped by the HW because Rx queues are full.",
+            "counter",
+        ),
+        RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"),
+        RX_ERRORS: ("Number of erroneous received packets.", "counter"),
+        TX_PACKETS: ("Number of successfully transmitted packets.", "counter"),
+        TX_BYTES: ("Number of successfully transmitted bytes.", "counter"),
+        TX_ERRORS: ("Number of packet transmission failures.", "counter"),
+    }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+    out = []
+    for port_id in sock.cmd("/ethdev/list"):
+        port = sock.cmd("/ethdev/info", port_id)
+        stats = sock.cmd("/ethdev/stats", port_id)
+        labels = {"port": port["name"]}
+        out += [
+            (RX_PACKETS, stats["ipackets"], labels),
+            (RX_PACKETS, stats["ipackets"], labels),
+            (RX_BYTES, stats["ibytes"], labels),
+            (RX_MISSED, stats["imissed"], labels),
+            (RX_NOMBUF, stats["rx_nombuf"], labels),
+            (RX_ERRORS, stats["ierrors"], labels),
+            (TX_PACKETS, stats["opackets"], labels),
+            (TX_BYTES, stats["obytes"], labels),
+            (TX_ERRORS, stats["oerrors"], labels),
+        ]
+    return out
diff --git a/usertools/telemetry-endpoints/cpu.py b/usertools/telemetry-endpoints/cpu.py
new file mode 100644
index 000000000000..d38d8d6e2558
--- /dev/null
+++ b/usertools/telemetry-endpoints/cpu.py
@@ -0,0 +1,29 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+CPU_TOTAL = "total_cycles"
+CPU_BUSY = "busy_cycles"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+    return {
+        CPU_TOTAL: ("Total number of CPU cycles.", "counter"),
+        CPU_BUSY: ("Number of busy CPU cycles.", "counter"),
+    }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+    out = []
+    for lcore_id in sock.cmd("/eal/lcore/list"):
+        lcore = sock.cmd("/eal/lcore/info", lcore_id)
+        cpu = ",".join(str(c) for c in lcore.get("cpuset", []))
+        total = lcore.get("total_cycles")
+        busy = lcore.get("busy_cycles", 0)
+        if not (cpu and total):
+            continue
+        labels = {"cpu": cpu, "numa": lcore.get("socket", 0)}
+        out += [
+            (CPU_TOTAL, total, labels),
+            (CPU_BUSY, busy, labels),
+        ]
+    return out
diff --git a/usertools/telemetry-endpoints/memory.py b/usertools/telemetry-endpoints/memory.py
new file mode 100644
index 000000000000..32cce1e59382
--- /dev/null
+++ b/usertools/telemetry-endpoints/memory.py
@@ -0,0 +1,37 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2023 Robin Jarry
+
+MEM_TOTAL = "total_bytes"
+MEM_USED = "used_bytes"
+
+
+def info() -> "dict[Name, tuple[Description, Type]]":
+    return {
+        MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"),
+        MEM_USED: ("The currently used memory in bytes.", "gauge"),
+    }
+
+
+def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]":
+    zones = {}
+    used = 0
+    for zone in sock.cmd("/eal/memzone_list") or []:
+        z = sock.cmd("/eal/memzone_info", zone)
+        start = int(z["Hugepage_base"], 16)
+        end = start + (z["Hugepage_size"] * z["Hugepage_used"])
+        used += z["Length"]
+        for s, e in list(zones.items()):
+            if s < start < e < end:
+                zones[s] = end
+                break
+            if start < s < end < e:
+                del zones[s]
+                zones[start] = e
+                break
+        else:
+            zones[start] = end
+
+    return [
+        (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}),
+        (MEM_USED, max(0, used), {}),
+    ]