[RFC,5/6] ring: introduce HTS ring mode
diff mbox series

Message ID 20200224113515.1744-6-konstantin.ananyev@intel.com
State Superseded
Delegated to: Thomas Monjalon
Headers show
Series
  • New sync modes for ring
Related show

Checks

Context Check Description
ci/Intel-compilation success Compilation OK
ci/checkpatch success coding style OK

Commit Message

Ananyev, Konstantin Feb. 24, 2020, 11:35 a.m. UTC
Introduce head/tail sync mode for MT ring synchronization.
In that mode enqueue/dequeue operation is fully serialized:
only one thread at a time is allowed to perform given op.
Suppose to reduce stall times in case when ring is used on
overcommitted cpus (multiple active threads on the same cpu).
As another enhancement provide ability to split enqueue/dequeue
operation into two phases:
  - enqueue/dequeue start
  - enqueue/dequeue finish
That allows user to inspect objects in the ring without removing
them from it (aka MT safe peek).

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
---
 lib/librte_ring/Makefile               |   1 +
 lib/librte_ring/meson.build            |   1 +
 lib/librte_ring/rte_ring.c             |  15 +-
 lib/librte_ring/rte_ring.h             | 259 ++++++++++++++++++++++++-
 lib/librte_ring/rte_ring_hts_generic.h | 228 ++++++++++++++++++++++
 5 files changed, 500 insertions(+), 4 deletions(-)
 create mode 100644 lib/librte_ring/rte_ring_hts_generic.h

Comments

Honnappa Nagarahalli March 25, 2020, 8:44 p.m. UTC | #1
<snip>

> 
> Introduce head/tail sync mode for MT ring synchronization.
> In that mode enqueue/dequeue operation is fully serialized:
> only one thread at a time is allowed to perform given op.
> Suppose to reduce stall times in case when ring is used on overcommitted
> cpus (multiple active threads on the same cpu).
> As another enhancement provide ability to split enqueue/dequeue operation
> into two phases:
>   - enqueue/dequeue start
>   - enqueue/dequeue finish
> That allows user to inspect objects in the ring without removing them from it
> (aka MT safe peek).
> 
> Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ---
>  lib/librte_ring/Makefile               |   1 +
>  lib/librte_ring/meson.build            |   1 +
>  lib/librte_ring/rte_ring.c             |  15 +-
>  lib/librte_ring/rte_ring.h             | 259 ++++++++++++++++++++++++-
>  lib/librte_ring/rte_ring_hts_generic.h | 228 ++++++++++++++++++++++
>  5 files changed, 500 insertions(+), 4 deletions(-)  create mode 100644
> lib/librte_ring/rte_ring_hts_generic.h
> 
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> 4f90344f4..0c7f8f918 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -19,6 +19,7 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include :=
> rte_ring.h \
>  					rte_ring_elem.h \
>  					rte_ring_generic.h \
>  					rte_ring_c11_mem.h \
> +					rte_ring_hts_generic.h \
>  					rte_ring_rts_generic.h
> 
>  include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> dc8d7dbea..5aa673199 100644
> --- a/lib/librte_ring/meson.build
> +++ b/lib/librte_ring/meson.build
> @@ -6,6 +6,7 @@ headers = files('rte_ring.h',
>  		'rte_ring_elem.h',
>  		'rte_ring_c11_mem.h',
>  		'rte_ring_generic.h',
> +		'rte_ring_hts_generic.h',
>  		'rte_ring_rts_generic.h')
> 
>  # rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> 1ce0af3e5..d3b948667 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -102,9 +102,9 @@ static int
>  get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st)  {
>  	static const uint32_t prod_st_flags =
> -		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ);
> +		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ |
> RING_F_MP_HTS_ENQ);
>  	static const uint32_t cons_st_flags =
> -		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ);
> +		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ |
> RING_F_MC_HTS_DEQ);
> 
>  	switch (flags & prod_st_flags) {
>  	case 0:
> @@ -116,6 +116,9 @@ get_sync_type(uint32_t flags, uint32_t *prod_st,
> uint32_t *cons_st)
>  	case RING_F_MP_RTS_ENQ:
>  		*prod_st = RTE_RING_SYNC_MT_RTS;
>  		break;
> +	case RING_F_MP_HTS_ENQ:
> +		*prod_st = RTE_RING_SYNC_MT_HTS;
> +		break;
>  	default:
>  		return -EINVAL;
>  	}
> @@ -130,6 +133,9 @@ get_sync_type(uint32_t flags, uint32_t *prod_st,
> uint32_t *cons_st)
>  	case RING_F_MC_RTS_DEQ:
>  		*cons_st = RTE_RING_SYNC_MT_RTS;
>  		break;
> +	case RING_F_MC_HTS_DEQ:
> +		*cons_st = RTE_RING_SYNC_MT_HTS;
> +		break;
>  	default:
>  		return -EINVAL;
>  	}
> @@ -151,6 +157,11 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>  	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
>  			  RTE_CACHE_LINE_MASK) != 0);
> 
> +	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
> +		offsetof(struct rte_ring_hts_headtail, sync_type));
> +	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
> +		offsetof(struct rte_ring_hts_headtail, ht.pos.tail));
> +
>  	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
>  		offsetof(struct rte_ring_rts_headtail, sync_type));
>  	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != diff --git
> a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> a130aeb9d..52edcea11 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -66,11 +66,11 @@ enum {
>  	RTE_RING_SYNC_MT,     /**< multi-thread safe (default mode) */
>  	RTE_RING_SYNC_ST,     /**< single thread only */
>  	RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */
> +	RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */
>  };
> 
>  /**
> - * structure to hold a pair of head/tail values and other metadata.
> - * used by RTE_RING_SYNC_MT, RTE_RING_SYNC_ST sync types.
> + * Structure to hold a pair of head/tail values and other metadata.
>   * Depending on sync_type format of that structure might differ
>   * depending on the sync mechanism selelcted, but offsets for
>   * *sync_type* and *tail* values should always remain the same.
> @@ -96,6 +96,19 @@ struct rte_ring_rts_headtail {
>  	volatile union rte_ring_ht_poscnt head;  };
> 
> +union rte_ring_ht_pos {
> +	uint64_t raw;
> +	struct {
> +		uint32_t tail; /**< tail position */
> +		uint32_t head; /**< head position */
> +	} pos;
> +};
> +
> +struct rte_ring_hts_headtail {
> +	uint32_t sync_type; /**< sync type of prod/cons */
> +	volatile union rte_ring_ht_pos ht __rte_aligned(8); };
> +
>  /**
>   * An RTE ring structure.
>   *
> @@ -126,6 +139,7 @@ struct rte_ring {
>  	RTE_STD_C11
>  	union {
>  		struct rte_ring_headtail prod;
> +		struct rte_ring_hts_headtail hts_prod;
>  		struct rte_ring_rts_headtail rts_prod;
>  	}  __rte_cache_aligned;
> 
> @@ -135,6 +149,7 @@ struct rte_ring {
>  	RTE_STD_C11
>  	union {
>  		struct rte_ring_headtail cons;
> +		struct rte_ring_hts_headtail hts_cons;
>  		struct rte_ring_rts_headtail rts_cons;
>  	}  __rte_cache_aligned;
> 
> @@ -157,6 +172,9 @@ struct rte_ring {
>  #define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS".
> */  #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC
> RTS". */
> 
> +#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP
> HTS".
> +*/ #define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC
> +HTS". */
> +
>  #define __IS_SP RTE_RING_SYNC_ST
>  #define __IS_MP RTE_RING_SYNC_MT
>  #define __IS_SC RTE_RING_SYNC_ST
> @@ -513,6 +531,82 @@ __rte_ring_do_rts_dequeue(struct rte_ring *r, void
> **obj_table,
>  	return n;
>  }
> 
> +#include <rte_ring_hts_generic.h>
> +
> +/**
> + * @internal Start to enqueue several objects on the HTS ring.
> + * Note that user has to call appropriate enqueue_finish()
> + * to complete given enqueue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> ring
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_hts_enqueue_start(struct rte_ring *r, void * const *obj_table,
> +		uint32_t n, enum rte_ring_queue_behavior behavior,
> +		uint32_t *free_space)
> +{
> +	uint32_t free, head;
> +
> +	n =  __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
> +
> +	if (n != 0)
> +		ENQUEUE_PTRS(r, &r[1], head, obj_table, n, void *);
> +
> +	if (free_space != NULL)
> +		*free_space = free - n;
> +	return n;
> +}
rte_ring.h is becoming too big. May be we should move these functions to another HTS specific file. But leave the top level API in rte_ring.h. Similarly for RTS.

> +
> +/**
> + * @internal Start to dequeue several objects from the HTS ring.
> + * Note that user has to call appropriate dequeue_finish()
> + * to complete given dequeue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> ring
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_hts_dequeue_start(struct rte_ring *r, void **obj_table,
> +		unsigned int n, enum rte_ring_queue_behavior behavior,
> +		unsigned int *available)
> +{
> +	uint32_t entries, head;
> +
> +	n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries);
> +
> +	if (n != 0)
> +		DEQUEUE_PTRS(r, &r[1], head, obj_table, n, void *);
> +
> +	if (available != NULL)
> +		*available = entries - n;
> +	return n;
> +}
> +
>  /**
>   * Enqueue several objects on the ring (multi-producers safe).
>   *
> @@ -585,6 +679,47 @@ rte_ring_rts_enqueue_bulk(struct rte_ring *r, void *
> const *obj_table,
>  			free_space);
>  }
> 
> +/**
> + * Start to enqueue several objects on the HTS ring (multi-producers safe).
> + * Note that user has to call appropriate dequeue_finish()
> + * to complete given dequeue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_hts_enqueue_bulk_start(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_hts_enqueue_start(r, obj_table, n,
> +		RTE_RING_QUEUE_FIXED, free_space);
> +}
I do not clearly understand the requirements on the enqueue_start and enqueue_finish in the form they are here.
IMO, only requirement for these APIs is to provide the ability to avoid intermediate memcpys.

> +
> +static __rte_always_inline void
> +rte_ring_hts_enqueue_finish(struct rte_ring *r, unsigned int n) {
> +	__rte_ring_hts_update_tail(&r->hts_prod, n, 1); }
> +
> +static __rte_always_inline unsigned int
> +rte_ring_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, unsigned int *free_space) {
> +	n = rte_ring_hts_enqueue_bulk_start(r, obj_table, n, free_space);
> +	if (n != 0)
> +		rte_ring_hts_enqueue_finish(r, n);
> +	return n;
> +}
> +
>  /**
>   * Enqueue several objects on a ring.
>   *
> @@ -615,6 +750,8 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const
> *obj_table,
>  		return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
>  	case RTE_RING_SYNC_MT_RTS:
>  		return rte_ring_rts_enqueue_bulk(r, obj_table, n, free_space);
> +	case RTE_RING_SYNC_MT_HTS:
> +		return rte_ring_hts_enqueue_bulk(r, obj_table, n,
> free_space);
>  	}
> 
>  	/* valid ring should never reach this point */ @@ -753,6 +890,47 @@
> rte_ring_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
>  			available);
>  }
> 
> +/**
> + * Start to dequeue several objects from an HTS ring (multi-consumers safe).
> + * Note that user has to call appropriate dequeue_finish()
> + * to complete given dequeue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_hts_dequeue_bulk_start(struct rte_ring *r, void **obj_table,
> +		unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_hts_dequeue_start(r, obj_table, n,
> +		RTE_RING_QUEUE_FIXED, available);
> +}
IMO, we should look to provide the ability to avoid intermediate copies when the data from the ring needs to be distributed to different locations.
My proposal in its form is complicated. But, I am thinking that, if the return values are abstracted in a structure, it might look much simple.

> +
> +static __rte_always_inline void
> +rte_ring_hts_dequeue_finish(struct rte_ring *r, unsigned int n) {
> +	__rte_ring_hts_update_tail(&r->hts_cons, n, 0); }
> +
> +static __rte_always_inline unsigned int
> +rte_ring_hts_dequeue_bulk(struct rte_ring *r, void **obj_table,
> +		unsigned int n, unsigned int *available) {
> +	n = rte_ring_hts_dequeue_bulk_start(r, obj_table, n, available);
> +	if (n != 0)
> +		rte_ring_hts_dequeue_finish(r, n);
> +	return n;
> +}
> +
>  /**
>   * Dequeue several objects from a ring.
>   *
> @@ -783,6 +961,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void
> **obj_table, unsigned int n,
>  		return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
>  	case RTE_RING_SYNC_MT_RTS:
>  		return rte_ring_rts_dequeue_bulk(r, obj_table, n, available);
> +	case RTE_RING_SYNC_MT_HTS:
> +		return rte_ring_hts_dequeue_bulk(r, obj_table, n, available);
>  	}
> 
>  	/* valid ring should never reach this point */ @@ -1111,6 +1291,41
> @@ rte_ring_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
>  			RTE_RING_QUEUE_VARIABLE, free_space);  }
> 
> +/**
> + * Start to enqueue several objects on the HTS ring (multi-producers safe).
> + * Note that user has to call appropriate dequeue_finish()
> + * to complete given dequeue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_hts_enqueue_burst_start(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, unsigned int *free_space) {
> +	return __rte_ring_do_hts_enqueue_start(r, obj_table, n,
> +		RTE_RING_QUEUE_VARIABLE, free_space); }
> +
rte_ring_hts_enqueue_burst_finish is not implemented. It requires the 'n' returned from ' rte_ring_hts_enqueue_burst_start' to be passed. We can't completely avoid passing correct information between xxx_start and xxx_finish APIs.

> +static __rte_always_inline unsigned int
> +rte_ring_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> +			 unsigned int n, unsigned int *free_space) {
> +	n = rte_ring_hts_enqueue_burst_start(r, obj_table, n, free_space);
> +	if (n != 0)
> +		rte_ring_hts_enqueue_finish(r, n);
> +	return n;
> +}
> +
>  /**
>   * Enqueue several objects on a ring.
>   *
> @@ -1141,6 +1356,8 @@ rte_ring_enqueue_burst(struct rte_ring *r, void *
> const *obj_table,
>  		return rte_ring_sp_enqueue_burst(r, obj_table, n,
> free_space);
>  	case RTE_RING_SYNC_MT_RTS:
>  		return rte_ring_rts_enqueue_burst(r, obj_table, n,
> free_space);
> +	case RTE_RING_SYNC_MT_HTS:
> +		return rte_ring_hts_enqueue_burst(r, obj_table, n,
> free_space);
>  	}
> 
>  	/* valid ring should never reach this point */ @@ -1225,6 +1442,42
> @@ rte_ring_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
>  	return __rte_ring_do_rts_dequeue(r, obj_table, n,
>  			RTE_RING_QUEUE_VARIABLE, available);  }
> +
> +/**
> + * Start to dequeue several objects from an HTS ring (multi-consumers safe).
> + * Note that user has to call appropriate dequeue_finish()
> + * to complete given dequeue operation.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_hts_dequeue_burst_start(struct rte_ring *r, void **obj_table,
> +		unsigned int n, unsigned int *available) {
> +	return __rte_ring_do_hts_dequeue_start(r, obj_table, n,
> +		RTE_RING_QUEUE_VARIABLE, available);
> +}
> +
> +static __rte_always_inline unsigned int
> +rte_ring_hts_dequeue_burst(struct rte_ring *r, void **obj_table,
> +		unsigned int n, unsigned int *available) {
> +	n = rte_ring_hts_dequeue_burst_start(r, obj_table, n, available);
> +	if (n != 0)
> +		rte_ring_hts_dequeue_finish(r, n);
> +	return n;
> +}
> +
>  /**
>   * Dequeue multiple objects from a ring up to a maximum number.
>   *
> @@ -1255,6 +1508,8 @@ rte_ring_dequeue_burst(struct rte_ring *r, void
> **obj_table,
>  		return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
>  	case RTE_RING_SYNC_MT_RTS:
>  		return rte_ring_rts_dequeue_burst(r, obj_table, n, available);
> +	case RTE_RING_SYNC_MT_HTS:
> +		return rte_ring_hts_dequeue_burst(r, obj_table, n, available);
>  	}
> 
>  	/* valid ring should never reach this point */ diff --git
> a/lib/librte_ring/rte_ring_hts_generic.h
> b/lib/librte_ring/rte_ring_hts_generic.h
> new file mode 100644
> index 000000000..7e447e30b
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_hts_generic.h
> @@ -0,0 +1,228 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2020 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_HTS_GENERIC_H_
> +#define _RTE_RING_HTS_GENERIC_H_
> +
> +/**
> + * @file rte_ring_hts_generic.h
> + * It is not recommended to include this file directly,
> + * include <rte_ring.h> instead.
> + * Contains internal helper functions for head/tail sync (HTS) ring mode.
> + * In that mode enqueue/dequeue operation is fully serialized:
> + * only one thread at a time is allowed to perform given op.
> + * This is achieved by thread is allowed to proceed with changing
> +head.value
> + * only when head.value == tail.value.
> + * Both head and tail values are updated atomically (as one 64-bit value).
> + * As another enhancement that provides ability to split
> +enqueue/dequeue
> + * operation into two phases:
> + * - enqueue/dequeue start
> + * - enqueue/dequeue finish
> + * That allows user to inspect objects in the ring without removing
> + * them from it (aka MT safe peek).
> + * As an example:
> + * // read 1 elem from the ring:
> + * n = rte_ring_hts_dequeue_bulk_start(ring, &obj, 1, NULL);
> + * if (n != 0) {
> + *    //examined object
> + *    if (object_examine(obj) == KEEP)
> + *       //decided to keep it in the ring.
> + *       rte_ring_hts_dequeue_finish(ring, 0);
> + *    else
> + *       //decided to remove it in the ring.
> + *       rte_ring_hts_dequeue_finish(ring, n);
> + * }
> + * Note that between _start_ and _finish_ the ring is sort of locked -
> + * none other thread can proceed with enqueue(/dequeue) operation till
> + * _finish_ will complete.
This means it does not solve the problem for over committed systems. Do you agree?

> + */
> +
> +static __rte_always_inline void
> +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num,
> +	uint32_t enqueue)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_pos p;
> +
> +	if (enqueue)
> +		rte_smp_wmb();
> +	else
> +		rte_smp_rmb();
> +
> +	p.raw = rte_atomic64_read((rte_atomic64_t *)(uintptr_t)&ht->ht.raw);
> +
> +	n = p.pos.head - p.pos.tail;
> +	RTE_ASSERT(n >= num);
> +	RTE_SET_USED(n);
> +
> +	p.pos.head = p.pos.tail + num;
> +	p.pos.tail = p.pos.head;
> +
> +	rte_atomic64_set((rte_atomic64_t *)(uintptr_t)&ht->ht.raw, p.raw); }
> +
> +/**
> + * @internal waits till tail will become equal to head.
> + * Means no writer/reader is active for that ring.
> + * Suppose to work as serialization point.
> + */
> +static __rte_always_inline void
> +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
> +		union rte_ring_ht_pos *p)
> +{
> +	p->raw = rte_atomic64_read((rte_atomic64_t *)
> +			(uintptr_t)&ht->ht.raw);
> +
> +	while (p->pos.head != p->pos.tail) {
> +		rte_pause();
> +		p->raw = rte_atomic64_read((rte_atomic64_t *)
> +				(uintptr_t)&ht->ht.raw);
> +	}
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sp
> + *   Indicates whether multi-producer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where enqueue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + *   Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *free_entries)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_pos np, op;
> +
> +	const uint32_t capacity = r->capacity;
> +
> +	do {
> +		/* Reset n to the initial burst count */
> +		n = num;
> +
> +		/* wait for tail to be equal to head */
> +		__rte_ring_hts_head_wait(&r->hts_prod, &op);
> +
> +		/* add rmb barrier to avoid load/load reorder in weak
> +		 * memory model. It is noop on x86
> +		 */
> +		rte_smp_rmb();
> +
> +		/*
> +		 *  The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * *old_head > cons_tail). So 'free_entries' is always between
> 0
> +		 * and capacity (which is < size).
> +		 */
> +		*free_entries = capacity + r->cons.tail - op.pos.head;
> +
> +		/* check that we have enough room in ring */
> +		if (unlikely(n > *free_entries))
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> +					0 : *free_entries;
> +
> +		if (n == 0)
> +			return 0;
> +
> +		np.pos.tail = op.pos.tail;
> +		np.pos.head = op.pos.head + n;
> +
> +	} while (rte_atomic64_cmpset(&r->hts_prod.ht.raw,
> +			op.raw, np.raw) == 0);
> +
> +	*old_head = op.pos.head;
> +	return n;
> +}
> +
> +/**
> + * @internal This function updates the consumer head for dequeue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sc
> + *   Indicates whether multi-consumer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where dequeue starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + *   Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
> +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> +	uint32_t *entries)
> +{
> +	uint32_t n;
> +	union rte_ring_ht_pos np, op;
> +
> +	/* move cons.head atomically */
> +	do {
> +		/* Restore n as it may change every loop */
> +		n = num;
> +
> +		/* wait for tail to be equal to head */
> +		__rte_ring_hts_head_wait(&r->hts_cons, &op);
> +
> +		/* add rmb barrier to avoid load/load reorder in weak
> +		 * memory model. It is noop on x86
> +		 */
> +		rte_smp_rmb();
> +
> +		/* The subtraction is done between two unsigned 32bits value
> +		 * (the result is always modulo 32 bits even if we have
> +		 * cons_head > prod_tail). So 'entries' is always between 0
> +		 * and size(ring)-1.
> +		 */
> +		*entries = r->prod.tail - op.pos.head;
> +
> +		/* Set the actual entries for dequeue */
> +		if (n > *entries)
> +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 :
> *entries;
> +
> +		if (unlikely(n == 0))
> +			return 0;
> +
> +		np.pos.tail = op.pos.tail;
> +		np.pos.head = op.pos.head + n;
> +
> +	} while (rte_atomic64_cmpset(&r->hts_cons.ht.raw,
> +			op.raw, np.raw) == 0);
> +
> +	*old_head = op.pos.head;
> +	return n;
> +}
> +
> +#endif /* _RTE_RING_HTS_GENERIC_H_ */
> --
> 2.17.1
Ananyev, Konstantin March 26, 2020, 12:26 p.m. UTC | #2
> > Introduce head/tail sync mode for MT ring synchronization.
> > In that mode enqueue/dequeue operation is fully serialized:
> > only one thread at a time is allowed to perform given op.
> > Suppose to reduce stall times in case when ring is used on overcommitted
> > cpus (multiple active threads on the same cpu).
> > As another enhancement provide ability to split enqueue/dequeue operation
> > into two phases:
> >   - enqueue/dequeue start
> >   - enqueue/dequeue finish
> > That allows user to inspect objects in the ring without removing them from it
> > (aka MT safe peek).
> >
> > Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> > ---
> >  lib/librte_ring/Makefile               |   1 +
> >  lib/librte_ring/meson.build            |   1 +
> >  lib/librte_ring/rte_ring.c             |  15 +-
> >  lib/librte_ring/rte_ring.h             | 259 ++++++++++++++++++++++++-
> >  lib/librte_ring/rte_ring_hts_generic.h | 228 ++++++++++++++++++++++
> >  5 files changed, 500 insertions(+), 4 deletions(-)  create mode 100644
> > lib/librte_ring/rte_ring_hts_generic.h
> >
> > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> > 4f90344f4..0c7f8f918 100644
> > --- a/lib/librte_ring/Makefile
> > +++ b/lib/librte_ring/Makefile
> > @@ -19,6 +19,7 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include :=
> > rte_ring.h \
> >  					rte_ring_elem.h \
> >  					rte_ring_generic.h \
> >  					rte_ring_c11_mem.h \
> > +					rte_ring_hts_generic.h \
> >  					rte_ring_rts_generic.h
> >
> >  include $(RTE_SDK)/mk/rte.lib.mk
> > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> > dc8d7dbea..5aa673199 100644
> > --- a/lib/librte_ring/meson.build
> > +++ b/lib/librte_ring/meson.build
> > @@ -6,6 +6,7 @@ headers = files('rte_ring.h',
> >  		'rte_ring_elem.h',
> >  		'rte_ring_c11_mem.h',
> >  		'rte_ring_generic.h',
> > +		'rte_ring_hts_generic.h',
> >  		'rte_ring_rts_generic.h')
> >
> >  # rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
> > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> > 1ce0af3e5..d3b948667 100644
> > --- a/lib/librte_ring/rte_ring.c
> > +++ b/lib/librte_ring/rte_ring.c
> > @@ -102,9 +102,9 @@ static int
> >  get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st)  {
> >  	static const uint32_t prod_st_flags =
> > -		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ);
> > +		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ |
> > RING_F_MP_HTS_ENQ);
> >  	static const uint32_t cons_st_flags =
> > -		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ);
> > +		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ |
> > RING_F_MC_HTS_DEQ);
> >
> >  	switch (flags & prod_st_flags) {
> >  	case 0:
> > @@ -116,6 +116,9 @@ get_sync_type(uint32_t flags, uint32_t *prod_st,
> > uint32_t *cons_st)
> >  	case RING_F_MP_RTS_ENQ:
> >  		*prod_st = RTE_RING_SYNC_MT_RTS;
> >  		break;
> > +	case RING_F_MP_HTS_ENQ:
> > +		*prod_st = RTE_RING_SYNC_MT_HTS;
> > +		break;
> >  	default:
> >  		return -EINVAL;
> >  	}
> > @@ -130,6 +133,9 @@ get_sync_type(uint32_t flags, uint32_t *prod_st,
> > uint32_t *cons_st)
> >  	case RING_F_MC_RTS_DEQ:
> >  		*cons_st = RTE_RING_SYNC_MT_RTS;
> >  		break;
> > +	case RING_F_MC_HTS_DEQ:
> > +		*cons_st = RTE_RING_SYNC_MT_HTS;
> > +		break;
> >  	default:
> >  		return -EINVAL;
> >  	}
> > @@ -151,6 +157,11 @@ rte_ring_init(struct rte_ring *r, const char *name,
> > unsigned count,
> >  	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
> >  			  RTE_CACHE_LINE_MASK) != 0);
> >
> > +	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
> > +		offsetof(struct rte_ring_hts_headtail, sync_type));
> > +	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
> > +		offsetof(struct rte_ring_hts_headtail, ht.pos.tail));
> > +
> >  	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
> >  		offsetof(struct rte_ring_rts_headtail, sync_type));
> >  	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != diff --git
> > a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> > a130aeb9d..52edcea11 100644
> > --- a/lib/librte_ring/rte_ring.h
> > +++ b/lib/librte_ring/rte_ring.h
> > @@ -66,11 +66,11 @@ enum {
> >  	RTE_RING_SYNC_MT,     /**< multi-thread safe (default mode) */
> >  	RTE_RING_SYNC_ST,     /**< single thread only */
> >  	RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */
> > +	RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */
> >  };
> >
> >  /**
> > - * structure to hold a pair of head/tail values and other metadata.
> > - * used by RTE_RING_SYNC_MT, RTE_RING_SYNC_ST sync types.
> > + * Structure to hold a pair of head/tail values and other metadata.
> >   * Depending on sync_type format of that structure might differ
> >   * depending on the sync mechanism selelcted, but offsets for
> >   * *sync_type* and *tail* values should always remain the same.
> > @@ -96,6 +96,19 @@ struct rte_ring_rts_headtail {
> >  	volatile union rte_ring_ht_poscnt head;  };
> >
> > +union rte_ring_ht_pos {
> > +	uint64_t raw;
> > +	struct {
> > +		uint32_t tail; /**< tail position */
> > +		uint32_t head; /**< head position */
> > +	} pos;
> > +};
> > +
> > +struct rte_ring_hts_headtail {
> > +	uint32_t sync_type; /**< sync type of prod/cons */
> > +	volatile union rte_ring_ht_pos ht __rte_aligned(8); };
> > +
> >  /**
> >   * An RTE ring structure.
> >   *
> > @@ -126,6 +139,7 @@ struct rte_ring {
> >  	RTE_STD_C11
> >  	union {
> >  		struct rte_ring_headtail prod;
> > +		struct rte_ring_hts_headtail hts_prod;
> >  		struct rte_ring_rts_headtail rts_prod;
> >  	}  __rte_cache_aligned;
> >
> > @@ -135,6 +149,7 @@ struct rte_ring {
> >  	RTE_STD_C11
> >  	union {
> >  		struct rte_ring_headtail cons;
> > +		struct rte_ring_hts_headtail hts_cons;
> >  		struct rte_ring_rts_headtail rts_cons;
> >  	}  __rte_cache_aligned;
> >
> > @@ -157,6 +172,9 @@ struct rte_ring {
> >  #define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS".
> > */  #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC
> > RTS". */
> >
> > +#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP
> > HTS".
> > +*/ #define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC
> > +HTS". */
> > +
> >  #define __IS_SP RTE_RING_SYNC_ST
> >  #define __IS_MP RTE_RING_SYNC_MT
> >  #define __IS_SC RTE_RING_SYNC_ST
> > @@ -513,6 +531,82 @@ __rte_ring_do_rts_dequeue(struct rte_ring *r, void
> > **obj_table,
> >  	return n;
> >  }
> >
> > +#include <rte_ring_hts_generic.h>
> > +
> > +/**
> > + * @internal Start to enqueue several objects on the HTS ring.
> > + * Note that user has to call appropriate enqueue_finish()
> > + * to complete given enqueue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> > ring
> > + * @param free_space
> > + *   returns the amount of space after the enqueue operation has finished
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_hts_enqueue_start(struct rte_ring *r, void * const *obj_table,
> > +		uint32_t n, enum rte_ring_queue_behavior behavior,
> > +		uint32_t *free_space)
> > +{
> > +	uint32_t free, head;
> > +
> > +	n =  __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
> > +
> > +	if (n != 0)
> > +		ENQUEUE_PTRS(r, &r[1], head, obj_table, n, void *);
> > +
> > +	if (free_space != NULL)
> > +		*free_space = free - n;
> > +	return n;
> > +}
> rte_ring.h is becoming too big. May be we should move these functions to another HTS specific file. But leave the top level API in rte_ring.h.
> Similarly for RTS.

Good point, will try in v1.

> 
> > +
> > +/**
> > + * @internal Start to dequeue several objects from the HTS ring.
> > + * Note that user has to call appropriate dequeue_finish()
> > + * to complete given dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to pull from the ring.
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> > + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> > ring
> > + * @param available
> > + *   returns the number of remaining ring entries after the dequeue has
> > finished
> > + * @return
> > + *   - Actual number of objects dequeued.
> > + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_do_hts_dequeue_start(struct rte_ring *r, void **obj_table,
> > +		unsigned int n, enum rte_ring_queue_behavior behavior,
> > +		unsigned int *available)
> > +{
> > +	uint32_t entries, head;
> > +
> > +	n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries);
> > +
> > +	if (n != 0)
> > +		DEQUEUE_PTRS(r, &r[1], head, obj_table, n, void *);
> > +
> > +	if (available != NULL)
> > +		*available = entries - n;
> > +	return n;
> > +}
> > +
> >  /**
> >   * Enqueue several objects on the ring (multi-producers safe).
> >   *
> > @@ -585,6 +679,47 @@ rte_ring_rts_enqueue_bulk(struct rte_ring *r, void *
> > const *obj_table,
> >  			free_space);
> >  }
> >
> > +/**
> > + * Start to enqueue several objects on the HTS ring (multi-producers safe).
> > + * Note that user has to call appropriate dequeue_finish()
> > + * to complete given dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   enqueue operation has finished.
> > + * @return
> > + *   The number of objects enqueued, either 0 or n
> > + */
> > +static __rte_always_inline unsigned int
> > +rte_ring_hts_enqueue_bulk_start(struct rte_ring *r, void * const *obj_table,
> > +			 unsigned int n, unsigned int *free_space) {
> > +	return __rte_ring_do_hts_enqueue_start(r, obj_table, n,
> > +		RTE_RING_QUEUE_FIXED, free_space);
> > +}
> I do not clearly understand the requirements on the enqueue_start and enqueue_finish in the form they are here.
> IMO, only requirement for these APIs is to provide the ability to avoid intermediate memcpys.

I think the main objective here is to provide 'MT safe peek' functionality.
The requirement is to split let say dequeue operation into two parts:
1. start - copy N elems into provided by user data buffer and guarantee that
    these elems will remain in the ring till finish().
2. finish - remove M(<=N) elems from the ring. 

For enqueue it a mirror:
1. start - reserve space for N elems in the ring.
2. finish - copy M (<=N) to the ring. 

> 
> > +
> > +static __rte_always_inline void
> > +rte_ring_hts_enqueue_finish(struct rte_ring *r, unsigned int n) {
> > +	__rte_ring_hts_update_tail(&r->hts_prod, n, 1); }
> > +
> > +static __rte_always_inline unsigned int
> > +rte_ring_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
> > +			 unsigned int n, unsigned int *free_space) {
> > +	n = rte_ring_hts_enqueue_bulk_start(r, obj_table, n, free_space);
> > +	if (n != 0)
> > +		rte_ring_hts_enqueue_finish(r, n);
> > +	return n;
> > +}
> > +
> >  /**
> >   * Enqueue several objects on a ring.
> >   *
> > @@ -615,6 +750,8 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const
> > *obj_table,
> >  		return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
> >  	case RTE_RING_SYNC_MT_RTS:
> >  		return rte_ring_rts_enqueue_bulk(r, obj_table, n, free_space);
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		return rte_ring_hts_enqueue_bulk(r, obj_table, n,
> > free_space);
> >  	}
> >
> >  	/* valid ring should never reach this point */ @@ -753,6 +890,47 @@
> > rte_ring_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
> >  			available);
> >  }
> >
> > +/**
> > + * Start to dequeue several objects from an HTS ring (multi-consumers safe).
> > + * Note that user has to call appropriate dequeue_finish()
> > + * to complete given dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects) that will be filled.
> > + * @param n
> > + *   The number of objects to dequeue from the ring to the obj_table.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after the
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects dequeued, either 0 or n
> > + */
> > +static __rte_always_inline unsigned int
> > +rte_ring_hts_dequeue_bulk_start(struct rte_ring *r, void **obj_table,
> > +		unsigned int n, unsigned int *available) {
> > +	return __rte_ring_do_hts_dequeue_start(r, obj_table, n,
> > +		RTE_RING_QUEUE_FIXED, available);
> > +}
> IMO, we should look to provide the ability to avoid intermediate copies when the data from the ring needs to be distributed to different
> locations.

As I said in other thread - I am not sure it would provide any gain in terms of performance.
Unless we have a case with bulk transfers and big size elems.
If you still strongly feel SG is needed here, I think it should be an add-on API not the main and only one.

> My proposal in its form is complicated. But, I am thinking that, if the return values are abstracted in a structure, it might look much simple.
> 
> > +
> > +static __rte_always_inline void
> > +rte_ring_hts_dequeue_finish(struct rte_ring *r, unsigned int n) {
> > +	__rte_ring_hts_update_tail(&r->hts_cons, n, 0); }
> > +
> > +static __rte_always_inline unsigned int
> > +rte_ring_hts_dequeue_bulk(struct rte_ring *r, void **obj_table,
> > +		unsigned int n, unsigned int *available) {
> > +	n = rte_ring_hts_dequeue_bulk_start(r, obj_table, n, available);
> > +	if (n != 0)
> > +		rte_ring_hts_dequeue_finish(r, n);
> > +	return n;
> > +}
> > +
> >  /**
> >   * Dequeue several objects from a ring.
> >   *
> > @@ -783,6 +961,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void
> > **obj_table, unsigned int n,
> >  		return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
> >  	case RTE_RING_SYNC_MT_RTS:
> >  		return rte_ring_rts_dequeue_bulk(r, obj_table, n, available);
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		return rte_ring_hts_dequeue_bulk(r, obj_table, n, available);
> >  	}
> >
> >  	/* valid ring should never reach this point */ @@ -1111,6 +1291,41
> > @@ rte_ring_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> >  			RTE_RING_QUEUE_VARIABLE, free_space);  }
> >
> > +/**
> > + * Start to enqueue several objects on the HTS ring (multi-producers safe).
> > + * Note that user has to call appropriate dequeue_finish()
> > + * to complete given dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects).
> > + * @param n
> > + *   The number of objects to add in the ring from the obj_table.
> > + * @param free_space
> > + *   if non-NULL, returns the amount of space in the ring after the
> > + *   enqueue operation has finished.
> > + * @return
> > + *   The number of objects enqueued, either 0 or n
> > + */
> > +static __rte_always_inline unsigned int
> > +rte_ring_hts_enqueue_burst_start(struct rte_ring *r, void * const *obj_table,
> > +			 unsigned int n, unsigned int *free_space) {
> > +	return __rte_ring_do_hts_enqueue_start(r, obj_table, n,
> > +		RTE_RING_QUEUE_VARIABLE, free_space); }
> > +
> rte_ring_hts_enqueue_burst_finish is not implemented.

No need to, finish() is identical for both _bulk and _burst.
That's why we have 2 starts:
rte_ring_hts_enqueue_bulk_start()
rte_ring_hts_enqueue_burst_start()
and one finish:
rte_ring_hts_enqueue_finish().

Same story for dequeue.

> It requires the 'n' returned from ' rte_ring_hts_enqueue_burst_start' to be passed.

Yes, it requires some m <= n to be passed.
That's the whole point of peek - we want to be able to inspect N elems
possibly without retrieving them from the ring.
I.E.: inspect N, retrieve M <=N.
 

> We can't completely avoid passing correct information between xxx_start and xxx_finish APIs.

Yes we can't.
But in that model we can check that provided by finish() value is valid,
plus we don't provide user direct access to the contents of the ring,
and don't require him to specify head/tail values directly. 

 
> > +static __rte_always_inline unsigned int
> > +rte_ring_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
> > +			 unsigned int n, unsigned int *free_space) {
> > +	n = rte_ring_hts_enqueue_burst_start(r, obj_table, n, free_space);
> > +	if (n != 0)
> > +		rte_ring_hts_enqueue_finish(r, n);
> > +	return n;
> > +}
> > +
> >  /**
> >   * Enqueue several objects on a ring.
> >   *
> > @@ -1141,6 +1356,8 @@ rte_ring_enqueue_burst(struct rte_ring *r, void *
> > const *obj_table,
> >  		return rte_ring_sp_enqueue_burst(r, obj_table, n,
> > free_space);
> >  	case RTE_RING_SYNC_MT_RTS:
> >  		return rte_ring_rts_enqueue_burst(r, obj_table, n,
> > free_space);
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		return rte_ring_hts_enqueue_burst(r, obj_table, n,
> > free_space);
> >  	}
> >
> >  	/* valid ring should never reach this point */ @@ -1225,6 +1442,42
> > @@ rte_ring_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
> >  	return __rte_ring_do_rts_dequeue(r, obj_table, n,
> >  			RTE_RING_QUEUE_VARIABLE, available);  }
> > +
> > +/**
> > + * Start to dequeue several objects from an HTS ring (multi-consumers safe).
> > + * Note that user has to call appropriate dequeue_finish()
> > + * to complete given dequeue operation.
> > + *
> > + * @param r
> > + *   A pointer to the ring structure.
> > + * @param obj_table
> > + *   A pointer to a table of void * pointers (objects) that will be filled.
> > + * @param n
> > + *   The number of objects to dequeue from the ring to the obj_table.
> > + * @param available
> > + *   If non-NULL, returns the number of remaining ring entries after the
> > + *   dequeue has finished.
> > + * @return
> > + *   The number of objects dequeued, either 0 or n
> > + */
> > +static __rte_always_inline unsigned int
> > +rte_ring_hts_dequeue_burst_start(struct rte_ring *r, void **obj_table,
> > +		unsigned int n, unsigned int *available) {
> > +	return __rte_ring_do_hts_dequeue_start(r, obj_table, n,
> > +		RTE_RING_QUEUE_VARIABLE, available);
> > +}
> > +
> > +static __rte_always_inline unsigned int
> > +rte_ring_hts_dequeue_burst(struct rte_ring *r, void **obj_table,
> > +		unsigned int n, unsigned int *available) {
> > +	n = rte_ring_hts_dequeue_burst_start(r, obj_table, n, available);
> > +	if (n != 0)
> > +		rte_ring_hts_dequeue_finish(r, n);
> > +	return n;
> > +}
> > +
> >  /**
> >   * Dequeue multiple objects from a ring up to a maximum number.
> >   *
> > @@ -1255,6 +1508,8 @@ rte_ring_dequeue_burst(struct rte_ring *r, void
> > **obj_table,
> >  		return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
> >  	case RTE_RING_SYNC_MT_RTS:
> >  		return rte_ring_rts_dequeue_burst(r, obj_table, n, available);
> > +	case RTE_RING_SYNC_MT_HTS:
> > +		return rte_ring_hts_dequeue_burst(r, obj_table, n, available);
> >  	}
> >
> >  	/* valid ring should never reach this point */ diff --git
> > a/lib/librte_ring/rte_ring_hts_generic.h
> > b/lib/librte_ring/rte_ring_hts_generic.h
> > new file mode 100644
> > index 000000000..7e447e30b
> > --- /dev/null
> > +++ b/lib/librte_ring/rte_ring_hts_generic.h
> > @@ -0,0 +1,228 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + *
> > + * Copyright (c) 2010-2020 Intel Corporation
> > + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
> > + * All rights reserved.
> > + * Derived from FreeBSD's bufring.h
> > + * Used as BSD-3 Licensed with permission from Kip Macy.
> > + */
> > +
> > +#ifndef _RTE_RING_HTS_GENERIC_H_
> > +#define _RTE_RING_HTS_GENERIC_H_
> > +
> > +/**
> > + * @file rte_ring_hts_generic.h
> > + * It is not recommended to include this file directly,
> > + * include <rte_ring.h> instead.
> > + * Contains internal helper functions for head/tail sync (HTS) ring mode.
> > + * In that mode enqueue/dequeue operation is fully serialized:
> > + * only one thread at a time is allowed to perform given op.
> > + * This is achieved by thread is allowed to proceed with changing
> > +head.value
> > + * only when head.value == tail.value.
> > + * Both head and tail values are updated atomically (as one 64-bit value).
> > + * As another enhancement that provides ability to split
> > +enqueue/dequeue
> > + * operation into two phases:
> > + * - enqueue/dequeue start
> > + * - enqueue/dequeue finish
> > + * That allows user to inspect objects in the ring without removing
> > + * them from it (aka MT safe peek).
> > + * As an example:
> > + * // read 1 elem from the ring:
> > + * n = rte_ring_hts_dequeue_bulk_start(ring, &obj, 1, NULL);
> > + * if (n != 0) {
> > + *    //examined object
> > + *    if (object_examine(obj) == KEEP)
> > + *       //decided to keep it in the ring.
> > + *       rte_ring_hts_dequeue_finish(ring, 0);
> > + *    else
> > + *       //decided to remove it in the ring.
> > + *       rte_ring_hts_dequeue_finish(ring, n);
> > + * }
> > + * Note that between _start_ and _finish_ the ring is sort of locked -
> > + * none other thread can proceed with enqueue(/dequeue) operation till
> > + * _finish_ will complete.
> This means it does not solve the problem for over committed systems. Do you agree?

I never stated that serialized ring fixes the problem completely.
I said that current approach mitigates is quite well.
And yes, I still believe that statement is correct.
See other thread for more detailed discussion.

> 
> > + */
> > +
> > +static __rte_always_inline void
> > +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num,
> > +	uint32_t enqueue)
> > +{
> > +	uint32_t n;
> > +	union rte_ring_ht_pos p;
> > +
> > +	if (enqueue)
> > +		rte_smp_wmb();
> > +	else
> > +		rte_smp_rmb();
> > +
> > +	p.raw = rte_atomic64_read((rte_atomic64_t *)(uintptr_t)&ht->ht.raw);
> > +
> > +	n = p.pos.head - p.pos.tail;
> > +	RTE_ASSERT(n >= num);
> > +	RTE_SET_USED(n);
> > +
> > +	p.pos.head = p.pos.tail + num;
> > +	p.pos.tail = p.pos.head;
> > +
> > +	rte_atomic64_set((rte_atomic64_t *)(uintptr_t)&ht->ht.raw, p.raw); }
> > +
> > +/**
> > + * @internal waits till tail will become equal to head.
> > + * Means no writer/reader is active for that ring.
> > + * Suppose to work as serialization point.
> > + */
> > +static __rte_always_inline void
> > +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
> > +		union rte_ring_ht_pos *p)
> > +{
> > +	p->raw = rte_atomic64_read((rte_atomic64_t *)
> > +			(uintptr_t)&ht->ht.raw);
> > +
> > +	while (p->pos.head != p->pos.tail) {
> > +		rte_pause();
> > +		p->raw = rte_atomic64_read((rte_atomic64_t *)
> > +				(uintptr_t)&ht->ht.raw);
> > +	}
> > +}
> > +
> > +/**
> > + * @internal This function updates the producer head for enqueue
> > + *
> > + * @param r
> > + *   A pointer to the ring structure
> > + * @param is_sp
> > + *   Indicates whether multi-producer path is needed or not
> > + * @param n
> > + *   The number of elements we will want to enqueue, i.e. how far should the
> > + *   head be moved
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> > + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> > ring
> > + * @param old_head
> > + *   Returns head value as it was before the move, i.e. where enqueue starts
> > + * @param new_head
> > + *   Returns the current/new head value i.e. where enqueue finishes
> > + * @param free_entries
> > + *   Returns the amount of free space in the ring BEFORE head was moved
> > + * @return
> > + *   Actual number of objects enqueued.
> > + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
> > +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> > +	uint32_t *free_entries)
> > +{
> > +	uint32_t n;
> > +	union rte_ring_ht_pos np, op;
> > +
> > +	const uint32_t capacity = r->capacity;
> > +
> > +	do {
> > +		/* Reset n to the initial burst count */
> > +		n = num;
> > +
> > +		/* wait for tail to be equal to head */
> > +		__rte_ring_hts_head_wait(&r->hts_prod, &op);
> > +
> > +		/* add rmb barrier to avoid load/load reorder in weak
> > +		 * memory model. It is noop on x86
> > +		 */
> > +		rte_smp_rmb();
> > +
> > +		/*
> > +		 *  The subtraction is done between two unsigned 32bits value
> > +		 * (the result is always modulo 32 bits even if we have
> > +		 * *old_head > cons_tail). So 'free_entries' is always between
> > 0
> > +		 * and capacity (which is < size).
> > +		 */
> > +		*free_entries = capacity + r->cons.tail - op.pos.head;
> > +
> > +		/* check that we have enough room in ring */
> > +		if (unlikely(n > *free_entries))
> > +			n = (behavior == RTE_RING_QUEUE_FIXED) ?
> > +					0 : *free_entries;
> > +
> > +		if (n == 0)
> > +			return 0;
> > +
> > +		np.pos.tail = op.pos.tail;
> > +		np.pos.head = op.pos.head + n;
> > +
> > +	} while (rte_atomic64_cmpset(&r->hts_prod.ht.raw,
> > +			op.raw, np.raw) == 0);
> > +
> > +	*old_head = op.pos.head;
> > +	return n;
> > +}
> > +
> > +/**
> > + * @internal This function updates the consumer head for dequeue
> > + *
> > + * @param r
> > + *   A pointer to the ring structure
> > + * @param is_sc
> > + *   Indicates whether multi-consumer path is needed or not
> > + * @param n
> > + *   The number of elements we will want to enqueue, i.e. how far should the
> > + *   head be moved
> > + * @param behavior
> > + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> > + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> > ring
> > + * @param old_head
> > + *   Returns head value as it was before the move, i.e. where dequeue starts
> > + * @param new_head
> > + *   Returns the current/new head value i.e. where dequeue finishes
> > + * @param entries
> > + *   Returns the number of entries in the ring BEFORE head was moved
> > + * @return
> > + *   - Actual number of objects dequeued.
> > + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> > + */
> > +static __rte_always_inline unsigned int
> > +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
> > +	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
> > +	uint32_t *entries)
> > +{
> > +	uint32_t n;
> > +	union rte_ring_ht_pos np, op;
> > +
> > +	/* move cons.head atomically */
> > +	do {
> > +		/* Restore n as it may change every loop */
> > +		n = num;
> > +
> > +		/* wait for tail to be equal to head */
> > +		__rte_ring_hts_head_wait(&r->hts_cons, &op);
> > +
> > +		/* add rmb barrier to avoid load/load reorder in weak
> > +		 * memory model. It is noop on x86
> > +		 */
> > +		rte_smp_rmb();
> > +
> > +		/* The subtraction is done between two unsigned 32bits value
> > +		 * (the result is always modulo 32 bits even if we have
> > +		 * cons_head > prod_tail). So 'entries' is always between 0
> > +		 * and size(ring)-1.
> > +		 */
> > +		*entries = r->prod.tail - op.pos.head;
> > +
> > +		/* Set the actual entries for dequeue */
> > +		if (n > *entries)
> > +			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 :
> > *entries;
> > +
> > +		if (unlikely(n == 0))
> > +			return 0;
> > +
> > +		np.pos.tail = op.pos.tail;
> > +		np.pos.head = op.pos.head + n;
> > +
> > +	} while (rte_atomic64_cmpset(&r->hts_cons.ht.raw,
> > +			op.raw, np.raw) == 0);
> > +
> > +	*old_head = op.pos.head;
> > +	return n;
> > +}
> > +
> > +#endif /* _RTE_RING_HTS_GENERIC_H_ */
> > --
> > 2.17.1

Patch
diff mbox series

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index 4f90344f4..0c7f8f918 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -19,6 +19,7 @@  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
 					rte_ring_elem.h \
 					rte_ring_generic.h \
 					rte_ring_c11_mem.h \
+					rte_ring_hts_generic.h \
 					rte_ring_rts_generic.h
 
 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build
index dc8d7dbea..5aa673199 100644
--- a/lib/librte_ring/meson.build
+++ b/lib/librte_ring/meson.build
@@ -6,6 +6,7 @@  headers = files('rte_ring.h',
 		'rte_ring_elem.h',
 		'rte_ring_c11_mem.h',
 		'rte_ring_generic.h',
+		'rte_ring_hts_generic.h',
 		'rte_ring_rts_generic.h')
 
 # rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c
index 1ce0af3e5..d3b948667 100644
--- a/lib/librte_ring/rte_ring.c
+++ b/lib/librte_ring/rte_ring.c
@@ -102,9 +102,9 @@  static int
 get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st)
 {
 	static const uint32_t prod_st_flags =
-		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ);
+		(RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ);
 	static const uint32_t cons_st_flags =
-		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ);
+		(RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ);
 
 	switch (flags & prod_st_flags) {
 	case 0:
@@ -116,6 +116,9 @@  get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st)
 	case RING_F_MP_RTS_ENQ:
 		*prod_st = RTE_RING_SYNC_MT_RTS;
 		break;
+	case RING_F_MP_HTS_ENQ:
+		*prod_st = RTE_RING_SYNC_MT_HTS;
+		break;
 	default:
 		return -EINVAL;
 	}
@@ -130,6 +133,9 @@  get_sync_type(uint32_t flags, uint32_t *prod_st, uint32_t *cons_st)
 	case RING_F_MC_RTS_DEQ:
 		*cons_st = RTE_RING_SYNC_MT_RTS;
 		break;
+	case RING_F_MC_HTS_DEQ:
+		*cons_st = RTE_RING_SYNC_MT_HTS;
+		break;
 	default:
 		return -EINVAL;
 	}
@@ -151,6 +157,11 @@  rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
 	RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) &
 			  RTE_CACHE_LINE_MASK) != 0);
 
+	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
+		offsetof(struct rte_ring_hts_headtail, sync_type));
+	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
+		offsetof(struct rte_ring_hts_headtail, ht.pos.tail));
+
 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) !=
 		offsetof(struct rte_ring_rts_headtail, sync_type));
 	RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) !=
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index a130aeb9d..52edcea11 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -66,11 +66,11 @@  enum {
 	RTE_RING_SYNC_MT,     /**< multi-thread safe (default mode) */
 	RTE_RING_SYNC_ST,     /**< single thread only */
 	RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */
+	RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */
 };
 
 /**
- * structure to hold a pair of head/tail values and other metadata.
- * used by RTE_RING_SYNC_MT, RTE_RING_SYNC_ST sync types.
+ * Structure to hold a pair of head/tail values and other metadata.
  * Depending on sync_type format of that structure might differ
  * depending on the sync mechanism selelcted, but offsets for
  * *sync_type* and *tail* values should always remain the same.
@@ -96,6 +96,19 @@  struct rte_ring_rts_headtail {
 	volatile union rte_ring_ht_poscnt head;
 };
 
+union rte_ring_ht_pos {
+	uint64_t raw;
+	struct {
+		uint32_t tail; /**< tail position */
+		uint32_t head; /**< head position */
+	} pos;
+};
+
+struct rte_ring_hts_headtail {
+	uint32_t sync_type; /**< sync type of prod/cons */
+	volatile union rte_ring_ht_pos ht __rte_aligned(8);
+};
+
 /**
  * An RTE ring structure.
  *
@@ -126,6 +139,7 @@  struct rte_ring {
 	RTE_STD_C11
 	union {
 		struct rte_ring_headtail prod;
+		struct rte_ring_hts_headtail hts_prod;
 		struct rte_ring_rts_headtail rts_prod;
 	}  __rte_cache_aligned;
 
@@ -135,6 +149,7 @@  struct rte_ring {
 	RTE_STD_C11
 	union {
 		struct rte_ring_headtail cons;
+		struct rte_ring_hts_headtail hts_cons;
 		struct rte_ring_rts_headtail rts_cons;
 	}  __rte_cache_aligned;
 
@@ -157,6 +172,9 @@  struct rte_ring {
 #define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */
 #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */
 
+#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */
+#define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */
+
 #define __IS_SP RTE_RING_SYNC_ST
 #define __IS_MP RTE_RING_SYNC_MT
 #define __IS_SC RTE_RING_SYNC_ST
@@ -513,6 +531,82 @@  __rte_ring_do_rts_dequeue(struct rte_ring *r, void **obj_table,
 	return n;
 }
 
+#include <rte_ring_hts_generic.h>
+
+/**
+ * @internal Start to enqueue several objects on the HTS ring.
+ * Note that user has to call appropriate enqueue_finish()
+ * to complete given enqueue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param free_space
+ *   returns the amount of space after the enqueue operation has finished
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_hts_enqueue_start(struct rte_ring *r, void * const *obj_table,
+		uint32_t n, enum rte_ring_queue_behavior behavior,
+		uint32_t *free_space)
+{
+	uint32_t free, head;
+
+	n =  __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
+
+	if (n != 0)
+		ENQUEUE_PTRS(r, &r[1], head, obj_table, n, void *);
+
+	if (free_space != NULL)
+		*free_space = free - n;
+	return n;
+}
+
+/**
+ * @internal Start to dequeue several objects from the HTS ring.
+ * Note that user has to call appropriate dequeue_finish()
+ * to complete given dequeue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to pull from the ring.
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param available
+ *   returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_do_hts_dequeue_start(struct rte_ring *r, void **obj_table,
+		unsigned int n, enum rte_ring_queue_behavior behavior,
+		unsigned int *available)
+{
+	uint32_t entries, head;
+
+	n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries);
+
+	if (n != 0)
+		DEQUEUE_PTRS(r, &r[1], head, obj_table, n, void *);
+
+	if (available != NULL)
+		*available = entries - n;
+	return n;
+}
+
 /**
  * Enqueue several objects on the ring (multi-producers safe).
  *
@@ -585,6 +679,47 @@  rte_ring_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 			free_space);
 }
 
+/**
+ * Start to enqueue several objects on the HTS ring (multi-producers safe).
+ * Note that user has to call appropriate dequeue_finish()
+ * to complete given dequeue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_hts_enqueue_bulk_start(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_hts_enqueue_start(r, obj_table, n,
+		RTE_RING_QUEUE_FIXED, free_space);
+}
+
+static __rte_always_inline void
+rte_ring_hts_enqueue_finish(struct rte_ring *r, unsigned int n)
+{
+	__rte_ring_hts_update_tail(&r->hts_prod, n, 1);
+}
+
+static __rte_always_inline unsigned int
+rte_ring_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	n = rte_ring_hts_enqueue_bulk_start(r, obj_table, n, free_space);
+	if (n != 0)
+		rte_ring_hts_enqueue_finish(r, n);
+	return n;
+}
+
 /**
  * Enqueue several objects on a ring.
  *
@@ -615,6 +750,8 @@  rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 		return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
 	case RTE_RING_SYNC_MT_RTS:
 		return rte_ring_rts_enqueue_bulk(r, obj_table, n, free_space);
+	case RTE_RING_SYNC_MT_HTS:
+		return rte_ring_hts_enqueue_bulk(r, obj_table, n, free_space);
 	}
 
 	/* valid ring should never reach this point */
@@ -753,6 +890,47 @@  rte_ring_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
 			available);
 }
 
+/**
+ * Start to dequeue several objects from an HTS ring (multi-consumers safe).
+ * Note that user has to call appropriate dequeue_finish()
+ * to complete given dequeue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_hts_dequeue_bulk_start(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_hts_dequeue_start(r, obj_table, n,
+		RTE_RING_QUEUE_FIXED, available);
+}
+
+static __rte_always_inline void
+rte_ring_hts_dequeue_finish(struct rte_ring *r, unsigned int n)
+{
+	__rte_ring_hts_update_tail(&r->hts_cons, n, 0);
+}
+
+static __rte_always_inline unsigned int
+rte_ring_hts_dequeue_bulk(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	n = rte_ring_hts_dequeue_bulk_start(r, obj_table, n, available);
+	if (n != 0)
+		rte_ring_hts_dequeue_finish(r, n);
+	return n;
+}
+
 /**
  * Dequeue several objects from a ring.
  *
@@ -783,6 +961,8 @@  rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
 		return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
 	case RTE_RING_SYNC_MT_RTS:
 		return rte_ring_rts_dequeue_bulk(r, obj_table, n, available);
+	case RTE_RING_SYNC_MT_HTS:
+		return rte_ring_hts_dequeue_bulk(r, obj_table, n, available);
 	}
 
 	/* valid ring should never reach this point */
@@ -1111,6 +1291,41 @@  rte_ring_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 			RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
+/**
+ * Start to enqueue several objects on the HTS ring (multi-producers safe).
+ * Note that user has to call appropriate dequeue_finish()
+ * to complete given dequeue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects).
+ * @param n
+ *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_hts_enqueue_burst_start(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	return __rte_ring_do_hts_enqueue_start(r, obj_table, n,
+		RTE_RING_QUEUE_VARIABLE, free_space);
+}
+
+static __rte_always_inline unsigned int
+rte_ring_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+			 unsigned int n, unsigned int *free_space)
+{
+	n = rte_ring_hts_enqueue_burst_start(r, obj_table, n, free_space);
+	if (n != 0)
+		rte_ring_hts_enqueue_finish(r, n);
+	return n;
+}
+
 /**
  * Enqueue several objects on a ring.
  *
@@ -1141,6 +1356,8 @@  rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 		return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
 	case RTE_RING_SYNC_MT_RTS:
 		return rte_ring_rts_enqueue_burst(r, obj_table, n, free_space);
+	case RTE_RING_SYNC_MT_HTS:
+		return rte_ring_hts_enqueue_burst(r, obj_table, n, free_space);
 	}
 
 	/* valid ring should never reach this point */
@@ -1225,6 +1442,42 @@  rte_ring_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
 	return __rte_ring_do_rts_dequeue(r, obj_table, n,
 			RTE_RING_QUEUE_VARIABLE, available);
 }
+
+/**
+ * Start to dequeue several objects from an HTS ring (multi-consumers safe).
+ * Note that user has to call appropriate dequeue_finish()
+ * to complete given dequeue operation.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @param obj_table
+ *   A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ *   The number of objects to dequeue from the ring to the obj_table.
+ * @param available
+ *   If non-NULL, returns the number of remaining ring entries after the
+ *   dequeue has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+static __rte_always_inline unsigned int
+rte_ring_hts_dequeue_burst_start(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	return __rte_ring_do_hts_dequeue_start(r, obj_table, n,
+		RTE_RING_QUEUE_VARIABLE, available);
+}
+
+static __rte_always_inline unsigned int
+rte_ring_hts_dequeue_burst(struct rte_ring *r, void **obj_table,
+		unsigned int n, unsigned int *available)
+{
+	n = rte_ring_hts_dequeue_burst_start(r, obj_table, n, available);
+	if (n != 0)
+		rte_ring_hts_dequeue_finish(r, n);
+	return n;
+}
+
 /**
  * Dequeue multiple objects from a ring up to a maximum number.
  *
@@ -1255,6 +1508,8 @@  rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
 		return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
 	case RTE_RING_SYNC_MT_RTS:
 		return rte_ring_rts_dequeue_burst(r, obj_table, n, available);
+	case RTE_RING_SYNC_MT_HTS:
+		return rte_ring_hts_dequeue_burst(r, obj_table, n, available);
 	}
 
 	/* valid ring should never reach this point */
diff --git a/lib/librte_ring/rte_ring_hts_generic.h b/lib/librte_ring/rte_ring_hts_generic.h
new file mode 100644
index 000000000..7e447e30b
--- /dev/null
+++ b/lib/librte_ring/rte_ring_hts_generic.h
@@ -0,0 +1,228 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2020 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_HTS_GENERIC_H_
+#define _RTE_RING_HTS_GENERIC_H_
+
+/**
+ * @file rte_ring_hts_generic.h
+ * It is not recommended to include this file directly,
+ * include <rte_ring.h> instead.
+ * Contains internal helper functions for head/tail sync (HTS) ring mode.
+ * In that mode enqueue/dequeue operation is fully serialized:
+ * only one thread at a time is allowed to perform given op.
+ * This is achieved by thread is allowed to proceed with changing head.value
+ * only when head.value == tail.value.
+ * Both head and tail values are updated atomically (as one 64-bit value).
+ * As another enhancement that provides ability to split enqueue/dequeue
+ * operation into two phases:
+ * - enqueue/dequeue start
+ * - enqueue/dequeue finish
+ * That allows user to inspect objects in the ring without removing
+ * them from it (aka MT safe peek).
+ * As an example:
+ * // read 1 elem from the ring:
+ * n = rte_ring_hts_dequeue_bulk_start(ring, &obj, 1, NULL);
+ * if (n != 0) {
+ *    //examined object
+ *    if (object_examine(obj) == KEEP)
+ *       //decided to keep it in the ring.
+ *       rte_ring_hts_dequeue_finish(ring, 0);
+ *    else
+ *       //decided to remove it in the ring.
+ *       rte_ring_hts_dequeue_finish(ring, n);
+ * }
+ * Note that between _start_ and _finish_ the ring is sort of locked -
+ * none other thread can proceed with enqueue(/dequeue) operation till
+ * _finish_ will complete.
+ */
+
+static __rte_always_inline void
+__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num,
+	uint32_t enqueue)
+{
+	uint32_t n;
+	union rte_ring_ht_pos p;
+
+	if (enqueue)
+		rte_smp_wmb();
+	else
+		rte_smp_rmb();
+
+	p.raw = rte_atomic64_read((rte_atomic64_t *)(uintptr_t)&ht->ht.raw);
+
+	n = p.pos.head - p.pos.tail;
+	RTE_ASSERT(n >= num);
+	RTE_SET_USED(n);
+
+	p.pos.head = p.pos.tail + num;
+	p.pos.tail = p.pos.head;
+
+	rte_atomic64_set((rte_atomic64_t *)(uintptr_t)&ht->ht.raw, p.raw);
+}
+
+/**
+ * @internal waits till tail will become equal to head.
+ * Means no writer/reader is active for that ring.
+ * Suppose to work as serialization point.
+ */
+static __rte_always_inline void
+__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
+		union rte_ring_ht_pos *p)
+{
+	p->raw = rte_atomic64_read((rte_atomic64_t *)
+			(uintptr_t)&ht->ht.raw);
+
+	while (p->pos.head != p->pos.tail) {
+		rte_pause();
+		p->raw = rte_atomic64_read((rte_atomic64_t *)
+				(uintptr_t)&ht->ht.raw);
+	}
+}
+
+/**
+ * @internal This function updates the producer head for enqueue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sp
+ *   Indicates whether multi-producer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where enqueue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where enqueue finishes
+ * @param free_entries
+ *   Returns the amount of free space in the ring BEFORE head was moved
+ * @return
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *free_entries)
+{
+	uint32_t n;
+	union rte_ring_ht_pos np, op;
+
+	const uint32_t capacity = r->capacity;
+
+	do {
+		/* Reset n to the initial burst count */
+		n = num;
+
+		/* wait for tail to be equal to head */
+		__rte_ring_hts_head_wait(&r->hts_prod, &op);
+
+		/* add rmb barrier to avoid load/load reorder in weak
+		 * memory model. It is noop on x86
+		 */
+		rte_smp_rmb();
+
+		/*
+		 *  The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * *old_head > cons_tail). So 'free_entries' is always between 0
+		 * and capacity (which is < size).
+		 */
+		*free_entries = capacity + r->cons.tail - op.pos.head;
+
+		/* check that we have enough room in ring */
+		if (unlikely(n > *free_entries))
+			n = (behavior == RTE_RING_QUEUE_FIXED) ?
+					0 : *free_entries;
+
+		if (n == 0)
+			return 0;
+
+		np.pos.tail = op.pos.tail;
+		np.pos.head = op.pos.head + n;
+
+	} while (rte_atomic64_cmpset(&r->hts_prod.ht.raw,
+			op.raw, np.raw) == 0);
+
+	*old_head = op.pos.head;
+	return n;
+}
+
+/**
+ * @internal This function updates the consumer head for dequeue
+ *
+ * @param r
+ *   A pointer to the ring structure
+ * @param is_sc
+ *   Indicates whether multi-consumer path is needed or not
+ * @param n
+ *   The number of elements we will want to enqueue, i.e. how far should the
+ *   head be moved
+ * @param behavior
+ *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
+ *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
+ * @param old_head
+ *   Returns head value as it was before the move, i.e. where dequeue starts
+ * @param new_head
+ *   Returns the current/new head value i.e. where dequeue finishes
+ * @param entries
+ *   Returns the number of entries in the ring BEFORE head was moved
+ * @return
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
+ */
+static __rte_always_inline unsigned int
+__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
+	enum rte_ring_queue_behavior behavior, uint32_t *old_head,
+	uint32_t *entries)
+{
+	uint32_t n;
+	union rte_ring_ht_pos np, op;
+
+	/* move cons.head atomically */
+	do {
+		/* Restore n as it may change every loop */
+		n = num;
+
+		/* wait for tail to be equal to head */
+		__rte_ring_hts_head_wait(&r->hts_cons, &op);
+
+		/* add rmb barrier to avoid load/load reorder in weak
+		 * memory model. It is noop on x86
+		 */
+		rte_smp_rmb();
+
+		/* The subtraction is done between two unsigned 32bits value
+		 * (the result is always modulo 32 bits even if we have
+		 * cons_head > prod_tail). So 'entries' is always between 0
+		 * and size(ring)-1.
+		 */
+		*entries = r->prod.tail - op.pos.head;
+
+		/* Set the actual entries for dequeue */
+		if (n > *entries)
+			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+		if (unlikely(n == 0))
+			return 0;
+
+		np.pos.tail = op.pos.tail;
+		np.pos.head = op.pos.head + n;
+
+	} while (rte_atomic64_cmpset(&r->hts_cons.ht.raw,
+			op.raw, np.raw) == 0);
+
+	*old_head = op.pos.head;
+	return n;
+}
+
+#endif /* _RTE_RING_HTS_GENERIC_H_ */