2015-02-18 17:42, John McNamara:
> From: Richardson, Bruce <bruce.richardson@intel.com>
>
> Add in support for inline processing of packets inside the RX or
> TX call. For an RX callback, what happens is that we get a set of
> packets from the NIC and then pass them to a callback function, if
> configured, to allow additional processing to be done on them, e.g.
> filling in more mbuf fields, before passing back to the application.
> On TX, the packets are similarly post-processed before being handed
> to the NIC for transmission.
>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: John McNamara <john.mcnamara@intel.com>
[...]
> @@ -2393,7 +2447,18 @@ rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
> struct rte_eth_dev *dev;
>
> dev = &rte_eth_devices[port_id];
> - return (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts, nb_pkts);
> + nb_pkts = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts,
> + nb_pkts);
> + struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
> +
> + if (unlikely(cb != NULL)) {
> + do {
> + nb_pkts = cb->fn(port_id, queue_id, rx_pkts, nb_pkts,
> + cb->param);
> + cb = cb->next;
> + } while (cb != NULL);
> + }
Excuse me, it wasn't very clear for me but I thought from the following email
that the consensus was to use a compile-time option:
http://dpdk.org/ml/archives/dev/2015-February/013450.html
@@ -337,6 +337,16 @@ rte_eth_dev_rx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
dev->data->nb_rx_queues = 0;
return -(ENOMEM);
}
+ dev->post_rx_burst_cbs = rte_zmalloc(
+ "ethdev->post_rx_burst_cbs",
+ sizeof(*dev->post_rx_burst_cbs) * nb_queues,
+ RTE_CACHE_LINE_SIZE);
+ if (dev->post_rx_burst_cbs == NULL) {
+ rte_free(dev->data->rx_queues);
+ dev->data->rx_queues = NULL;
+ dev->data->nb_rx_queues = 0;
+ return -(ENOMEM);
+ }
} else { /* re-configure */
FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, -ENOTSUP);
@@ -348,10 +358,20 @@ rte_eth_dev_rx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
RTE_CACHE_LINE_SIZE);
if (rxq == NULL)
return -(ENOMEM);
+ dev->post_rx_burst_cbs = rte_realloc(
+ dev->post_rx_burst_cbs,
+ sizeof(*dev->post_rx_burst_cbs) *
+ nb_queues, RTE_CACHE_LINE_SIZE);
+ if (dev->post_rx_burst_cbs == NULL)
+ return -(ENOMEM);
- if (nb_queues > old_nb_queues)
+ if (nb_queues > old_nb_queues) {
+ uint16_t new_qs = nb_queues - old_nb_queues;
memset(rxq + old_nb_queues, 0,
- sizeof(rxq[0]) * (nb_queues - old_nb_queues));
+ sizeof(rxq[0]) * new_qs);
+ memset(dev->post_rx_burst_cbs + old_nb_queues, 0,
+ sizeof(dev->post_rx_burst_cbs[0]) * new_qs);
+ }
dev->data->rx_queues = rxq;
@@ -479,6 +499,16 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
dev->data->nb_tx_queues = 0;
return -(ENOMEM);
}
+ dev->pre_tx_burst_cbs = rte_zmalloc(
+ "ethdev->pre_tx_burst_cbs",
+ sizeof(*dev->pre_tx_burst_cbs) * nb_queues,
+ RTE_CACHE_LINE_SIZE);
+ if (dev->pre_tx_burst_cbs == NULL) {
+ rte_free(dev->data->tx_queues);
+ dev->data->tx_queues = NULL;
+ dev->data->nb_tx_queues = 0;
+ return -(ENOMEM);
+ }
} else { /* re-configure */
FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
@@ -490,10 +520,21 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
RTE_CACHE_LINE_SIZE);
if (txq == NULL)
return -(ENOMEM);
+ dev->pre_tx_burst_cbs = rte_realloc(
+ dev->pre_tx_burst_cbs,
+ sizeof(*dev->pre_tx_burst_cbs) *
+ nb_queues, RTE_CACHE_LINE_SIZE);
+ if (dev->pre_tx_burst_cbs == NULL)
+ return -(ENOMEM);
- if (nb_queues > old_nb_queues)
+
+ if (nb_queues > old_nb_queues) {
+ uint16_t new_qs = nb_queues - old_nb_queues;
memset(txq + old_nb_queues, 0,
- sizeof(txq[0]) * (nb_queues - old_nb_queues));
+ sizeof(txq[0]) * new_qs);
+ memset(dev->pre_tx_burst_cbs + old_nb_queues, 0,
+ sizeof(dev->pre_tx_burst_cbs[0]) * new_qs);
+ }
dev->data->tx_queues = txq;
@@ -3258,3 +3299,125 @@ rte_eth_dev_filter_ctrl(uint8_t port_id, enum rte_filter_type filter_type,
FUNC_PTR_OR_ERR_RET(*dev->dev_ops->filter_ctrl, -ENOTSUP);
return (*dev->dev_ops->filter_ctrl)(dev, filter_type, filter_op, arg);
}
+
+void *
+rte_eth_add_rx_callback(uint8_t port_id, uint16_t queue_id,
+ rte_rxtx_callback_fn fn, void *user_param)
+{
+ /* check input parameters */
+ if (port_id >= nb_ports || fn == NULL ||
+ queue_id >= rte_eth_devices[port_id].data->nb_rx_queues) {
+ rte_errno = EINVAL;
+ return NULL;
+ }
+
+ struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
+
+ if (cb == NULL) {
+ rte_errno = ENOMEM;
+ return NULL;
+ }
+
+ cb->fn = fn;
+ cb->param = user_param;
+ cb->next = rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
+ rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
+ return cb;
+}
+
+void *
+rte_eth_add_tx_callback(uint8_t port_id, uint16_t queue_id,
+ rte_rxtx_callback_fn fn, void *user_param)
+{
+ /* check input parameters */
+ if (port_id >= nb_ports || fn == NULL ||
+ queue_id >= rte_eth_devices[port_id].data->nb_tx_queues) {
+ rte_errno = EINVAL;
+ return NULL;
+ }
+
+ struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
+
+ if (cb == NULL) {
+ rte_errno = ENOMEM;
+ return NULL;
+ }
+
+ cb->fn = fn;
+ cb->param = user_param;
+ cb->next = rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id];
+ rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id] = cb;
+ return cb;
+}
+
+int
+rte_eth_remove_rx_callback(uint8_t port_id, uint16_t queue_id,
+ struct rte_eth_rxtx_callback *user_cb)
+{
+ /* Check input parameters. */
+ if (port_id >= nb_ports || user_cb == NULL ||
+ queue_id >= rte_eth_devices[port_id].data->nb_rx_queues) {
+ return (-EINVAL);
+ }
+
+ struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+ struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
+ struct rte_eth_rxtx_callback *prev_cb;
+
+ /* Reset head pointer and remove user cb if first in the list. */
+ if (cb == user_cb) {
+ dev->post_rx_burst_cbs[queue_id] = user_cb->next;
+ return 0;
+ }
+
+ /* Remove the user cb from the callback list. */
+ do {
+ prev_cb = cb;
+ cb = cb->next;
+
+ if (cb == user_cb) {
+ prev_cb->next = user_cb->next;
+ return 0;
+ }
+
+ } while (cb != NULL);
+
+ /* Callback wasn't found. */
+ return (-EINVAL);
+}
+
+int
+rte_eth_remove_tx_callback(uint8_t port_id, uint16_t queue_id,
+ struct rte_eth_rxtx_callback *user_cb)
+{
+ /* Check input parameters. */
+ if (port_id >= nb_ports || user_cb == NULL ||
+ queue_id >= rte_eth_devices[port_id].data->nb_tx_queues) {
+ return (-EINVAL);
+ }
+
+ struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+ struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
+ struct rte_eth_rxtx_callback *prev_cb;
+
+ /* Reset head pointer and remove user cb if first in the list. */
+ if (cb == user_cb) {
+ dev->pre_tx_burst_cbs[queue_id] = user_cb->next;
+ return 0;
+ }
+
+ /* Remove the user cb from the callback list. */
+ do {
+ prev_cb = cb;
+ cb = cb->next;
+
+ if (cb == user_cb) {
+ prev_cb->next = user_cb->next;
+ return 0;
+ }
+
+ } while (cb != NULL);
+
+ /* Callback wasn't found. */
+ return (-EINVAL);
+}
@@ -1523,6 +1523,47 @@ struct eth_dev_ops {
};
/**
+ * Function type used for callbacks for processing packets on RX and TX
+ *
+ * If configured for RX, it is called with a burst of packets that have just
+ * been received on the given port and queue. On TX, it is called with a burst
+ * of packets immediately before those packets are put onto the hardware queue
+ * for transmission.
+ *
+ * @param port
+ * The ethernet port on which rx or tx is being performed
+ * @param queue
+ * The queue on the ethernet port which is being used to receive or transmit
+ * the packets.
+ * @param pkts
+ * The burst of packets on which processing is to be done. On RX, these
+ * packets have just been received. On TX, they are about to be transmitted.
+ * @param nb_pkts
+ * The number of packets in the burst pointed to by "pkts"
+ * @param user_param
+ * The arbitrary user parameter passed in by the application when the callback
+ * was originally configured.
+ * @return
+ * The number of packets remaining in pkts are processing.
+ * * On RX, this will be returned to the user as the return value from
+ * rte_eth_rx_burst.
+ * * On TX, this will be the number of packets actually written to the NIC.
+ */
+typedef uint16_t (*rte_rxtx_callback_fn)(uint8_t port, uint16_t queue,
+ struct rte_mbuf *pkts[], uint16_t nb_pkts, void *user_param);
+
+/**
+ * @internal
+ * Structure used to hold information about the callbacks to be called for a
+ * queue on RX and TX.
+ */
+struct rte_eth_rxtx_callback {
+ struct rte_eth_rxtx_callback *next;
+ rte_rxtx_callback_fn fn;
+ void *param;
+};
+
+/**
* @internal
* The generic data structure associated with each ethernet device.
*
@@ -1539,7 +1580,20 @@ struct rte_eth_dev {
const struct eth_driver *driver;/**< Driver for this device */
struct eth_dev_ops *dev_ops; /**< Functions exported by PMD */
struct rte_pci_device *pci_dev; /**< PCI info. supplied by probing */
- struct rte_eth_dev_cb_list link_intr_cbs; /**< User application callbacks on interrupt*/
+ /** User application callbacks for NIC interrupts */
+ struct rte_eth_dev_cb_list link_intr_cbs;
+
+ /**
+ * User-supplied functions called from rx_burst to post-process
+ * received packets before passing them to the user
+ */
+ struct rte_eth_rxtx_callback **post_rx_burst_cbs;
+
+ /**
+ * User-supplied functions called from tx_burst to pre-process
+ * received packets before passing them to the driver for transmission.
+ */
+ struct rte_eth_rxtx_callback **pre_tx_burst_cbs;
};
struct rte_eth_dev_sriov {
@@ -2393,7 +2447,18 @@ rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
struct rte_eth_dev *dev;
dev = &rte_eth_devices[port_id];
- return (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts, nb_pkts);
+ nb_pkts = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts,
+ nb_pkts);
+ struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
+
+ if (unlikely(cb != NULL)) {
+ do {
+ nb_pkts = cb->fn(port_id, queue_id, rx_pkts, nb_pkts,
+ cb->param);
+ cb = cb->next;
+ } while (cb != NULL);
+ }
+ return nb_pkts;
}
#endif
@@ -2520,6 +2585,15 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
struct rte_eth_dev *dev;
dev = &rte_eth_devices[port_id];
+ struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
+
+ if (unlikely(cb != NULL)) {
+ do {
+ nb_pkts = cb->fn(port_id, queue_id, tx_pkts, nb_pkts,
+ cb->param);
+ cb = cb->next;
+ } while (cb != NULL);
+ }
return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
}
#endif
@@ -3667,6 +3741,120 @@ int rte_eth_dev_filter_supported(uint8_t port_id, enum rte_filter_type filter_ty
int rte_eth_dev_filter_ctrl(uint8_t port_id, enum rte_filter_type filter_type,
enum rte_filter_op filter_op, void *arg);
+/**
+ * Add a callback to be called on packet RX on a given port and queue.
+ *
+ * This API configures a function to be called for each burst of
+ * packets received on a given NIC port queue. The return value is a pointer
+ * that can be used to later remove the callback using
+ * rte_eth_remove_rx_callback().
+ *
+ * @param port_id
+ * The port identifier of the Ethernet device.
+ * @param queue_id
+ * The queue on the Ethernet device on which the callback is to be added.
+ * @param fn
+ * The callback function
+ * @param user_param
+ * A generic pointer parameter which will be passed to each invocation of the
+ * callback function on this port and queue.
+ *
+ * @return
+ * NULL on error.
+ * On success, a pointer value which can later be used to remove the callback.
+ */
+void *rte_eth_add_rx_callback(uint8_t port_id, uint16_t queue_id,
+ rte_rxtx_callback_fn fn, void *user_param);
+
+/**
+ * Add a callback to be called on packet TX on a given port and queue.
+ *
+ * This API configures a function to be called for each burst of
+ * packets sent on a given NIC port queue. The return value is a pointer
+ * that can be used to later remove the callback using
+ * rte_eth_remove_tx_callback().
+ *
+ * @param port_id
+ * The port identifier of the Ethernet device.
+ * @param queue_id
+ * The queue on the Ethernet device on which the callback is to be added.
+ * @param fn
+ * The callback function
+ * @param user_param
+ * A generic pointer parameter which will be passed to each invocation of the
+ * callback function on this port and queue.
+ *
+ * @return
+ * NULL on error.
+ * On success, a pointer value which can later be used to remove the callback.
+ */
+void *rte_eth_add_tx_callback(uint8_t port_id, uint16_t queue_id,
+ rte_rxtx_callback_fn fn, void *user_param);
+
+/**
+ * Remove an RX packet callback from a given port and queue.
+ *
+ * This function is used to removed callbacks that were added to a NIC port
+ * queue using rte_eth_add_rx_callback().
+ *
+ * Note: the callback is removed from the callback list but it isn't freed
+ * since the it may still be in use. The memory for the callback can be
+ * subsequently freed back by the application by calling rte_free():
+ *
+ * - Immediately - if the port is stopped, or the user knows that no
+ * callbacks are in flight e.g. if called from the thread doing RX/TX
+ * on that queue.
+ *
+ * - After a short delay - where the delay is sufficient to allow any
+ * in-flight callbacks to complete.
+ *
+ * @param port_id
+ * The port identifier of the Ethernet device.
+ * @param queue_id
+ * The queue on the Ethernet device from which the callback is to be removed.
+ * @param user_cb
+ * User supplied callback created via rte_eth_add_rx_callback().
+ *
+ * @return
+ * - 0: Success. Callback was removed.
+ * - -EINVAL: The port_id or the queue_id is out of range, or the callback is
+ * NULL or not found for the port/queue.
+ */
+int rte_eth_remove_rx_callback(uint8_t port_id, uint16_t queue_id,
+ struct rte_eth_rxtx_callback *user_cb);
+
+/**
+ * Remove a TX packet callback from a given port and queue.
+ *
+ * This function is used to removed callbacks that were added to a NIC port
+ * queue using rte_eth_add_tx_callback().
+ *
+ * Note: the callback is removed from the callback list but it isn't freed
+ * since the it may still be in use. The memory for the callback can be
+ * subsequently freed back by the application by calling rte_free():
+ *
+ * - Immediately - if the port is stopped, or the user knows that no
+ * callbacks are in flight e.g. if called from the thread doing RX/TX
+ * on that queue.
+ *
+ * - After a short delay - where the delay is sufficient to allow any
+ * in-flight callbacks to complete.
+ *
+ * @param port_id
+ * The port identifier of the Ethernet device.
+ * @param queue_id
+ * The queue on the Ethernet device from which the callback is to be removed.
+ * @param user_cb
+ * User supplied callback created via rte_eth_add_tx_callback().
+ *
+ * @return
+ * - 0: Success. Callback was removed.
+ * - -EINVAL: The port_id or the queue_id is out of range, or the callback is
+ * NULL or not found for the port/queue.
+ */
+int rte_eth_remove_tx_callback(uint8_t port_id, uint16_t queue_id,
+ struct rte_eth_rxtx_callback *user_cb);
+
#ifdef __cplusplus
}
#endif
@@ -2,6 +2,8 @@ DPDK_2.0 {
global:
_rte_eth_dev_callback_process;
+ rte_eth_add_rx_callback;
+ rte_eth_add_tx_callback;
rte_eth_allmulticast_disable;
rte_eth_allmulticast_enable;
rte_eth_allmulticast_get;
@@ -96,6 +98,8 @@ DPDK_2.0 {
rte_eth_promiscuous_disable;
rte_eth_promiscuous_enable;
rte_eth_promiscuous_get;
+ rte_eth_remove_rx_callback;
+ rte_eth_remove_tx_callback;
rte_eth_rx_burst;
rte_eth_rx_descriptor_done;
rte_eth_rx_queue_count;