Hi Ola,
Is it possible to add burst enqueue and dequeue functions as well, so we can plug this into a mempool handler? Besides the mempool handler, I think the all-or-nothing semantics would be useful for applications. Besides that, this RFC looks good at a high level.
For a complete submission, a few more changes are needed:
- Builds: Need to add 'lfring' to lib/meson.build and mk/rte.app.mk
- Documentation: need to update release notes, add a new section in the programmer's guide, and update the doxygen configuration files
- Tests: This patchset should add a set of lfring tests as well
Code comments follow.
<snip>
> diff --git a/lib/Makefile b/lib/Makefile index d6239d2..cd46339 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -12,6 +12,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_PCI) += librte_pci
> DEPDIRS-librte_pci := librte_eal
> DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_ring DEPDIRS-librte_ring :=
> librte_eal
> +DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_lfring DEPDIRS-librte_lfring
> +:= librte_eal
LIBRTE_RING -> LIBRTE_LFRING
> DIRS-$(CONFIG_RTE_LIBRTE_MEMPOOL) += librte_mempool DEPDIRS-
> librte_mempool := librte_eal librte_ring
> DIRS-$(CONFIG_RTE_LIBRTE_MBUF) += librte_mbuf diff --git
> a/lib/librte_lfring/Makefile b/lib/librte_lfring/Makefile new file mode 100644
> index 0000000..c04d2e9
> --- /dev/null
> +++ b/lib/librte_lfring/Makefile
> @@ -0,0 +1,22 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2010-2014 Intel
> +Corporation
Need to correct the copyright
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# library name
> +LIB = librte_lfring.a
> +
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal
> +
> +EXPORT_MAP := rte_lfring_version.map
> +
> +LIBABIVER := 1
> +
> +# all source are stored in SRCS-y
> +SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_lfring.c
LIBRTE_RING -> LIBRTE_LFRING
> +
> +# install includes
> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_lfring.h lockfree16.h
LIBRTE_RING -> LIBRTE_LFRING
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_lfring/lockfree16.h b/lib/librte_lfring/lockfree16.h new
> file mode 100644 index 0000000..199add9
> --- /dev/null
> +++ b/lib/librte_lfring/lockfree16.h
This needs to be refactored to use the rte_atomic128_cmp_exchange interface.
<snip>
> diff --git a/lib/librte_lfring/meson.build b/lib/librte_lfring/meson.build new
> file mode 100644 index 0000000..c8b90cb
> --- /dev/null
> +++ b/lib/librte_lfring/meson.build
> @@ -0,0 +1,6 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2017 Intel
> +Corporation
> +
Correct copyright
> +version = 1
> +sources = files('rte_lfring.c')
> +headers = files('rte_lfring.h','lockfree16.h')
> diff --git a/lib/librte_lfring/rte_lfring.c b/lib/librte_lfring/rte_lfring.c new file
> mode 100644 index 0000000..e130f71
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring.c
> @@ -0,0 +1,264 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2015 Intel Corporation
> + * Copyright (c) 2019 Arm Ltd
> + * All rights reserved.
> + */
> +
> +#include <stdio.h>
> +#include <stdarg.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <errno.h>
> +#include <sys/queue.h>
> +
> +#include <rte_common.h>
> +#include <rte_log.h>
> +#include <rte_memory.h>
> +#include <rte_memzone.h>
> +#include <rte_malloc.h>
> +#include <rte_launch.h>
> +#include <rte_eal.h>
> +#include <rte_eal_memconfig.h>
> +#include <rte_atomic.h>
> +#include <rte_per_lcore.h>
> +#include <rte_lcore.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_errno.h>
> +#include <rte_string_fns.h>
> +#include <rte_spinlock.h>
> +
You can reduce the system and DPDK includes down to:
#include <inttypes.h>
#include <string.h>
#include <rte_atomic.h>
#include <rte_eal.h>
#include <rte_eal_memconfig.h>
#include <rte_errno.h>
#include <rte_malloc.h>
#include <rte_memzone.h>
#include <rte_rwlock.h>
#include <rte_tailq.h>
> +#include "rte_lfring.h"
> +
> +TAILQ_HEAD(rte_lfring_list, rte_tailq_entry);
> +
> +static struct rte_tailq_elem rte_lfring_tailq = {
> + .name = RTE_TAILQ_LFRING_NAME,
> +};
> +EAL_REGISTER_TAILQ(rte_lfring_tailq)
> +
> +/* true if x is a power of 2 */
> +#define POWEROF2(x) ((((x)-1) & (x)) == 0)
DPDK provides this functionality with RTE_IS_POWER_OF_2(), in rte_common.h
> +
> +/* return the size of memory occupied by a ring */ ssize_t
> +rte_lfring_get_memsize(unsigned int count) {
> + ssize_t sz;
> +
> + /* count must be a power of 2 */
> + if ((!POWEROF2(count)) || (count > 0x80000000U)) {
> + RTE_LOG(ERR, RING,
This library needs to replace the RING logging with its own LFRING logging.
<snip>
> +/* create the ring */
The comments before function definitions (e.g. "create the ring", "free the ring") probably aren't necessary, the names are self-explanatory.
> +struct rte_lfring *
> +rte_lfring_create(const char *name, unsigned int count, int socket_id,
> + unsigned int flags)
> +{
> + char mz_name[RTE_MEMZONE_NAMESIZE];
> + struct rte_lfring *r;
> + struct rte_tailq_entry *te;
> + const struct rte_memzone *mz;
> + ssize_t ring_size;
> + int mz_flags = 0;
> + struct rte_lfring_list *ring_list = NULL;
> + const unsigned int requested_count = count;
> + int ret;
> +
> + ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +
> + count = rte_align32pow2(count);
> +
> + ring_size = rte_lfring_get_memsize(count);
> + if (ring_size < 0) {
> + rte_errno = ring_size;
> + return NULL;
> + }
> +
> + ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
> + RTE_LFRING_MZ_PREFIX, name);
> + if (ret < 0 || ret >= (int)sizeof(mz_name)) {
> + rte_errno = ENAMETOOLONG;
> + return NULL;
> + }
> +
> + te = rte_zmalloc("LFRING_TAILQ_ENTRY", sizeof(*te), 0);
> + if (te == NULL) {
> + RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
> + rte_errno = ENOMEM;
> + return NULL;
> + }
> +
> + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> + /* reserve a memory zone for this ring. If we can't get rte_config or
> + * we are secondary process, the memzone_reserve function will set
> + * rte_errno for us appropriately - hence no check in this this
> + * function
> + */
The "or we are secondary process" comment is out-of-date; the memzone reserve functions can be called by a secondary process. (For that matter, the documentation in rte_memzone.h is out-of-date as well.) This restriction was removed in commit 916e4f4f4e45.
<snip>
> +/* free the ring */
> +void
> +rte_lfring_free(struct rte_lfring *r)
> +{
> + struct rte_lfring_list *ring_list = NULL;
> + struct rte_tailq_entry *te;
> +
> + if (r == NULL)
> + return;
> +
> + /*
> + * Ring was not created with rte_lfring_create,
> + * therefore, there is no memzone to free.
> + */
> + if (r->memzone == NULL) {
> + RTE_LOG(ERR, RING, "Cannot free ring (not created with
> rte_lfring_create()");
> + return;
> + }
> +
> + if (rte_memzone_free(r->memzone) != 0) {
> + RTE_LOG(ERR, RING, "Cannot free memory\n");
> + return;
> + }
Better to free the ring's memzone after taking it off the list, in the unlikely event another thread looks it up at the same time and attempts to use it.
> +
> + ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> + /* find out tailq entry */
> + TAILQ_FOREACH(te, ring_list, next) {
> + if (te->data == (void *) r)
> + break;
> + }
> +
> + if (te == NULL) {
> + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> + return;
> + }
> +
> + TAILQ_REMOVE(ring_list, te, next);
> +
> + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> + rte_free(te);
> +}
<snip>
> +/* search a ring from its name */
> +struct rte_lfring *
> +rte_lfring_lookup(const char *name)
> +{
> + struct rte_tailq_entry *te;
> + struct rte_lfring *r = NULL;
> + struct rte_lfring_list *ring_list;
> +
> + ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +
> + rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> + TAILQ_FOREACH(te, ring_list, next) {
> + r = (struct rte_lfring *) te->data;
> + if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)
Missing a NULL pointer check before dereferencing 'name'
> + break;
> + }
> +
> + rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> + if (te == NULL) {
> + rte_errno = ENOENT;
> + return NULL;
> + }
> +
> + return r;
> +}
> diff --git a/lib/librte_lfring/rte_lfring.h b/lib/librte_lfring/rte_lfring.h new file
> mode 100644 index 0000000..22d3e1c
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring.h
> @@ -0,0 +1,621 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2019 Arm Ltd
> + * All rights reserved.
> + */
> +
> +#ifndef _RTE_LFRING_H_
> +#define _RTE_LFRING_H_
> +
> +/**
> + * @file
> + * RTE Lfring
> + *
> + * The Lfring Manager is a fixed-size queue, implemented as a table of
> + * (pointers + counters).
> + *
> + * - FIFO (First In First Out)
> + * - Maximum size is fixed; the pointers are stored in a table.
> + * - Lock-free implementation.
> + * - Multi- or single-consumer dequeue.
> + * - Multi- or single-producer enqueue.
> + *
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
You should be able to clean up the includes here too, e.g. rte_pause.h doesn't appear to be used.
> +#include "lockfree16.h"
> +
> +#define RTE_TAILQ_LFRING_NAME "RTE_LFRING"
> +
> +#define RTE_LFRING_MZ_PREFIX "RG_"
> +/**< The maximum length of an lfring name. */ #define
Change "/**<" to "/**", otherwise doxygen applies the comment to RTE_LFRING_MZ_PREFIX
> +RTE_LFRING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> + sizeof(RTE_LFRING_MZ_PREFIX) + 1)
> +
> +struct rte_memzone; /* forward declaration, so as not to require
> +memzone.h */
This forward declaration is no longer necessary
<snip>
> +/**
> + * Calculate the memory size needed for a lfring
> + *
> + * This function returns the number of bytes needed for a lfring, given
> + * the number of elements in it. This value is the sum of the size of
> + * the structure rte_lfring and the size of the memory needed by the
> + * objects pointers. The value is aligned to a cache line size.
> + *
> + * @param count
> + * The number of elements in the lfring (must be a power of 2).
> + * @return
> + * - The memory size needed for the lfring on success.
> + * - -EINVAL if count is not a power of 2.
Missing error case: -EINVAL if size exceeds size limit
> + */
> +ssize_t rte_lfring_get_memsize(unsigned int count);
> +
> +/**
> + * Initialize a lfring structure.
> + *
> + * Initialize a lfring structure in memory pointed by "r". The size of
> +the
> + * memory area must be large enough to store the lfring structure and
> +the
> + * object table. It is advised to use rte_lfring_get_memsize() to get
> +the
> + * appropriate size.
> + *
> + * The lfring size is set to *count*, which must be a power of two.
> +Water
> + * marking is disabled by default. The real usable lfring size is
> + * *count-1* instead of *count* to differentiate a free lfring from an
> + * empty lfring.
"free lfring" -> should say "full lfring"? Same applies to rte_lfring_create().
> + *
> + * The lfring is not added in RTE_TAILQ_LFRING global list. Indeed, the
> + * memory given by the caller may not be shareable among dpdk
> + * processes.
Here and in rte_lfring_create(), the documentation mentions whether or not the lfring is added to the RTE_TAILQ_LFRING list. I don't think this makes sense in the documentation, as it pertains to the implementation and not the interface.
> + *
> + * @param r
> + * The pointer to the lfring structure followed by the objects table.
> + * @param name
> + * The name of the lfring.
> + * @param count
> + * The number of elements in the lfring (must be a power of 2).
> + * @param flags
> + * @return
> + * 0 on success, or a negative value on error.
> + */
> +int rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int count,
> + unsigned int flags);
> +
> +/**
> + * Create a new lfring named *name* in memory.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_lfring_init() to initialize an empty lfring.
> + *
> + * The new lfring size is set to *count*, which must be a power of
> + * two. Water marking is disabled by default. The real usable lfring
> +size
> + * is *count-1* instead of *count* to differentiate a free lfring from
> +an
> + * empty lfring.
> + *
> + * The lfring is added in RTE_TAILQ_LFRING list.
> + *
> + * @param name
> + * The name of the lfring.
> + * @param count
> + * The size of the lfring (must be a power of 2).
> + * @param socket_id
> + * The *socket_id* argument is the socket identifier in case of
> + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + * constraint for the reserved zone.
> + * @param flags
> + * An OR of the following:
> + * - LFRING_F_SP_ENQ: If this flag is set, the default behavior when
> + * using ``rte_lfring_enqueue()`` or ``rte_lfring_enqueue_bulk()``
> + * is "single-producer". Otherwise, it is "multi-producers".
> + * - LFRING_F_SC_DEQ: If this flag is set, the default behavior when
> + * using ``rte_lfring_dequeue()`` or ``rte_lfring_dequeue_bulk()``
> + * is "single-consumer". Otherwise, it is "multi-consumers".
> + * @return
> + * On success, the pointer to the new allocated lfring. NULL on error with
> + * rte_errno set appropriately. Possible errno values include:
> + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
> + * - E_RTE_SECONDARY - function was called from a secondary process
> instance
E_RTE_SECONDARY and E_RTE_NO_CONFIG are not possible error cases
<snip>
> +#define ENQ_RETRY_LIMIT 32
Per the coding guidelines (https://doc.dpdk.org/guides/contributing/coding_style.html#macros), macros should be prefixed with RTE_.
<snip>
> +/**
> + * Return the number of elements which can be stored in the lfring.
> + *
> + * @param r
> + * A pointer to the lfring structure.
> + * @return
> + * The usable size of the lfring.
> + */
> +static inline unsigned int
> +rte_lfring_get_capacity(const struct rte_lfring *r) {
> + return r->size;
I believe this should return r->mask, to account for the one unusable ring entry.
<snip>
> diff --git a/lib/librte_lfring/rte_lfring_version.map
> b/lib/librte_lfring/rte_lfring_version.map
> new file mode 100644
> index 0000000..d935efd
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring_version.map
> @@ -0,0 +1,19 @@
> +DPDK_2.0 {
> + global:
> +
> + rte_ring_create;
> + rte_ring_dump;
> + rte_ring_get_memsize;
> + rte_ring_init;
> + rte_ring_list_dump;
> + rte_ring_lookup;
Need to fix function names and DPDK version number
Thanks,
Gage
On Wed, 2019-06-05 at 18:21 +0000, Eads, Gage wrote:
> Hi Ola,
>
> Is it possible to add burst enqueue and dequeue functions as well, so we can
> plug this into a mempool handler?
Proper burst enqueue is difficult or at least not very efficient.
> Besides the mempool handler, I think the all-or-nothing semantics would be
> useful for applications. Besides that, this RFC looks good at a high level.
>
> For a complete submission, a few more changes are needed:
> - Builds: Need to add 'lfring' to lib/meson.build and mk/rte.app.mk
> - Documentation: need to update release notes, add a new section in the
> programmer's guide, and update the doxygen configuration files
> - Tests: This patchset should add a set of lfring tests as well
>
> Code comments follow.
Thanks for the review comments, I only had time to look at a few of them. I will
return with more complete answers and a new version of the patch.
-- Ola
>
> <snip>
>
>
> diff --git a/lib/Makefile b/lib/Makefile index d6239d2..cd46339 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -12,6 +12,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_PCI) += librte_pci
> DEPDIRS-librte_pci := librte_eal
> DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_ring DEPDIRS-librte_ring :=
> librte_eal
> +DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_lfring DEPDIRS-librte_lfring
> +:= librte_eal
>
> LIBRTE_RING -> LIBRTE_LFRING
>
>
> DIRS-$(CONFIG_RTE_LIBRTE_MEMPOOL) += librte_mempool DEPDIRS-
> librte_mempool := librte_eal librte_ring
> DIRS-$(CONFIG_RTE_LIBRTE_MBUF) += librte_mbuf diff --git
> a/lib/librte_lfring/Makefile b/lib/librte_lfring/Makefile new file mode 100644
> index 0000000..c04d2e9
> --- /dev/null
> +++ b/lib/librte_lfring/Makefile
> @@ -0,0 +1,22 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2010-2014 Intel
> +Corporation
>
> Need to correct the copyright
>
>
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# library name
> +LIB = librte_lfring.a
> +
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal
> +
> +EXPORT_MAP := rte_lfring_version.map
> +
> +LIBABIVER := 1
> +
> +# all source are stored in SRCS-y
> +SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_lfring.c
>
> LIBRTE_RING -> LIBRTE_LFRING
>
>
> +
> +# install includes
> +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_lfring.h lockfree16.h
>
> LIBRTE_RING -> LIBRTE_LFRING
>
>
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_lfring/lockfree16.h b/lib/librte_lfring/lockfree16.h
> new
> file mode 100644 index 0000000..199add9
> --- /dev/null
> +++ b/lib/librte_lfring/lockfree16.h
>
> This needs to be refactored to use the rte_atomic128_cmp_exchange interface.
>
> <snip>
>
>
> diff --git a/lib/librte_lfring/meson.build b/lib/librte_lfring/meson.build new
> file mode 100644 index 0000000..c8b90cb
> --- /dev/null
> +++ b/lib/librte_lfring/meson.build
> @@ -0,0 +1,6 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2017 Intel
> +Corporation
> +
>
> Correct copyright
>
>
> +version = 1
> +sources = files('rte_lfring.c')
> +headers = files('rte_lfring.h','lockfree16.h')
> diff --git a/lib/librte_lfring/rte_lfring.c b/lib/librte_lfring/rte_lfring.c
> new file
> mode 100644 index 0000000..e130f71
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring.c
> @@ -0,0 +1,264 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2015 Intel Corporation
> + * Copyright (c) 2019 Arm Ltd
> + * All rights reserved.
> + */
> +
> +#include <stdio.h>
> +#include <stdarg.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <errno.h>
> +#include <sys/queue.h>
> +
> +#include <rte_common.h>
> +#include <rte_log.h>
> +#include <rte_memory.h>
> +#include <rte_memzone.h>
> +#include <rte_malloc.h>
> +#include <rte_launch.h>
> +#include <rte_eal.h>
> +#include <rte_eal_memconfig.h>
> +#include <rte_atomic.h>
> +#include <rte_per_lcore.h>
> +#include <rte_lcore.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_errno.h>
> +#include <rte_string_fns.h>
> +#include <rte_spinlock.h>
> +
>
> You can reduce the system and DPDK includes down to:
>
> #include <inttypes.h>
> #include <string.h>
>
> #include <rte_atomic.h>
> #include <rte_eal.h>
> #include <rte_eal_memconfig.h>
> #include <rte_errno.h>
> #include <rte_malloc.h>
> #include <rte_memzone.h>
> #include <rte_rwlock.h>
> #include <rte_tailq.h>
>
>
> +#include "rte_lfring.h"
> +
> +TAILQ_HEAD(rte_lfring_list, rte_tailq_entry);
> +
> +static struct rte_tailq_elem rte_lfring_tailq = {
> + .name = RTE_TAILQ_LFRING_NAME,
> +};
> +EAL_REGISTER_TAILQ(rte_lfring_tailq)
> +
> +/* true if x is a power of 2 */
> +#define POWEROF2(x) ((((x)-1) & (x)) == 0)
>
> DPDK provides this functionality with RTE_IS_POWER_OF_2(), in rte_common.h
>
>
> +
> +/* return the size of memory occupied by a ring */ ssize_t
> +rte_lfring_get_memsize(unsigned int count) {
> + ssize_t sz;
> +
> + /* count must be a power of 2 */
> + if ((!POWEROF2(count)) || (count > 0x80000000U)) {
> + RTE_LOG(ERR, RING,
>
> This library needs to replace the RING logging with its own LFRING logging.
>
> <snip>
>
>
> +/* create the ring */
>
> The comments before function definitions (e.g. "create the ring", "free the
> ring") probably aren't necessary, the names are self-explanatory.
>
>
> +struct rte_lfring *
> +rte_lfring_create(const char *name, unsigned int count, int socket_id,
> + unsigned int flags)
> +{
> + char mz_name[RTE_MEMZONE_NAMESIZE];
> + struct rte_lfring *r;
> + struct rte_tailq_entry *te;
> + const struct rte_memzone *mz;
> + ssize_t ring_size;
> + int mz_flags = 0;
> + struct rte_lfring_list *ring_list = NULL;
> + const unsigned int requested_count = count;
> + int ret;
> +
> + ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +
> + count = rte_align32pow2(count);
> +
> + ring_size = rte_lfring_get_memsize(count);
> + if (ring_size < 0) {
> + rte_errno = ring_size;
> + return NULL;
> + }
> +
> + ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
> + RTE_LFRING_MZ_PREFIX, name);
> + if (ret < 0 || ret >= (int)sizeof(mz_name)) {
> + rte_errno = ENAMETOOLONG;
> + return NULL;
> + }
> +
> + te = rte_zmalloc("LFRING_TAILQ_ENTRY", sizeof(*te), 0);
> + if (te == NULL) {
> + RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
> + rte_errno = ENOMEM;
> + return NULL;
> + }
> +
> + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> + /* reserve a memory zone for this ring. If we can't get rte_config or
> + * we are secondary process, the memzone_reserve function will set
> + * rte_errno for us appropriately - hence no check in this this
> + * function
> + */
>
> The "or we are secondary process" comment is out-of-date; the memzone reserve
> functions can be called by a secondary process. (For that matter, the
> documentation in rte_memzone.h is out-of-date as well.) This restriction was
> removed in commit 916e4f4f4e45.
>
> <snip>
>
>
> +/* free the ring */
> +void
> +rte_lfring_free(struct rte_lfring *r)
> +{
> + struct rte_lfring_list *ring_list = NULL;
> + struct rte_tailq_entry *te;
> +
> + if (r == NULL)
> + return;
> +
> + /*
> + * Ring was not created with rte_lfring_create,
> + * therefore, there is no memzone to free.
> + */
> + if (r->memzone == NULL) {
> + RTE_LOG(ERR, RING, "Cannot free ring (not created with
> rte_lfring_create()");
> + return;
> + }
> +
> + if (rte_memzone_free(r->memzone) != 0) {
> + RTE_LOG(ERR, RING, "Cannot free memory\n");
> + return;
> + }
>
> Better to free the ring's memzone after taking it off the list, in the
> unlikely event another thread looks it up at the same time and attempts to use
> it.
>
>
> +
> + ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> + /* find out tailq entry */
> + TAILQ_FOREACH(te, ring_list, next) {
> + if (te->data == (void *) r)
> + break;
> + }
> +
> + if (te == NULL) {
> + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> + return;
> + }
> +
> + TAILQ_REMOVE(ring_list, te, next);
> +
> + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> + rte_free(te);
> +}
>
> <snip>
>
>
> +/* search a ring from its name */
> +struct rte_lfring *
> +rte_lfring_lookup(const char *name)
> +{
> + struct rte_tailq_entry *te;
> + struct rte_lfring *r = NULL;
> + struct rte_lfring_list *ring_list;
> +
> + ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
> +
> + rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
> +
> + TAILQ_FOREACH(te, ring_list, next) {
> + r = (struct rte_lfring *) te->data;
> + if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)
>
> Missing a NULL pointer check before dereferencing 'name'
Why shouldn't the program crash if someone passes a NULL pointer parameter?
Callers will be internal, external users should be able to control whether NULL
is passed instead of a valid pointer.
A crash and a core dump is the best way to detect and debug errors.
>
>
> + break;
> + }
> +
> + rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
> +
> + if (te == NULL) {
> + rte_errno = ENOENT;
> + return NULL;
> + }
> +
> + return r;
> +}
> diff --git a/lib/librte_lfring/rte_lfring.h b/lib/librte_lfring/rte_lfring.h
> new file
> mode 100644 index 0000000..22d3e1c
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring.h
> @@ -0,0 +1,621 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2019 Arm Ltd
> + * All rights reserved.
> + */
> +
> +#ifndef _RTE_LFRING_H_
> +#define _RTE_LFRING_H_
> +
> +/**
> + * @file
> + * RTE Lfring
> + *
> + * The Lfring Manager is a fixed-size queue, implemented as a table of
> + * (pointers + counters).
> + *
> + * - FIFO (First In First Out)
> + * - Maximum size is fixed; the pointers are stored in a table.
> + * - Lock-free implementation.
> + * - Multi- or single-consumer dequeue.
> + * - Multi- or single-producer enqueue.
> + *
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
>
> You should be able to clean up the includes here too, e.g. rte_pause.h doesn't
> appear to be used.
>
>
> +#include "lockfree16.h"
> +
> +#define RTE_TAILQ_LFRING_NAME "RTE_LFRING"
> +
> +#define RTE_LFRING_MZ_PREFIX "RG_"
> +/**< The maximum length of an lfring name. */ #define
>
> Change "/**<" to "/**", otherwise doxygen applies the comment to
> RTE_LFRING_MZ_PREFIX
>
>
> +RTE_LFRING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
> + sizeof(RTE_LFRING_MZ_PREFIX) + 1)
> +
> +struct rte_memzone; /* forward declaration, so as not to require
> +memzone.h */
>
> This forward declaration is no longer necessary
>
> <snip>
>
>
> +/**
> + * Calculate the memory size needed for a lfring
> + *
> + * This function returns the number of bytes needed for a lfring, given
> + * the number of elements in it. This value is the sum of the size of
> + * the structure rte_lfring and the size of the memory needed by the
> + * objects pointers. The value is aligned to a cache line size.
> + *
> + * @param count
> + * The number of elements in the lfring (must be a power of 2).
> + * @return
> + * - The memory size needed for the lfring on success.
> + * - -EINVAL if count is not a power of 2.
>
> Missing error case: -EINVAL if size exceeds size limit
>
>
> + */
> +ssize_t rte_lfring_get_memsize(unsigned int count);
> +
> +/**
> + * Initialize a lfring structure.
> + *
> + * Initialize a lfring structure in memory pointed by "r". The size of
> +the
> + * memory area must be large enough to store the lfring structure and
> +the
> + * object table. It is advised to use rte_lfring_get_memsize() to get
> +the
> + * appropriate size.
> + *
> + * The lfring size is set to *count*, which must be a power of two.
> +Water
> + * marking is disabled by default. The real usable lfring size is
> + * *count-1* instead of *count* to differentiate a free lfring from an
> + * empty lfring.
>
> "free lfring" -> should say "full lfring"? Same applies to
> rte_lfring_create().
>
>
> + *
> + * The lfring is not added in RTE_TAILQ_LFRING global list. Indeed, the
> + * memory given by the caller may not be shareable among dpdk
> + * processes.
>
> Here and in rte_lfring_create(), the documentation mentions whether or not the
> lfring is added to the RTE_TAILQ_LFRING list. I don't think this makes sense
> in the documentation, as it pertains to the implementation and not the
> interface.
>
>
> + *
> + * @param r
> + * The pointer to the lfring structure followed by the objects table.
> + * @param name
> + * The name of the lfring.
> + * @param count
> + * The number of elements in the lfring (must be a power of 2).
> + * @param flags
> + * @return
> + * 0 on success, or a negative value on error.
> + */
> +int rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int
> count,
> + unsigned int flags);
> +
> +/**
> + * Create a new lfring named *name* in memory.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_lfring_init() to initialize an empty lfring.
> + *
> + * The new lfring size is set to *count*, which must be a power of
> + * two. Water marking is disabled by default. The real usable lfring
> +size
> + * is *count-1* instead of *count* to differentiate a free lfring from
> +an
> + * empty lfring.
> + *
> + * The lfring is added in RTE_TAILQ_LFRING list.
> + *
> + * @param name
> + * The name of the lfring.
> + * @param count
> + * The size of the lfring (must be a power of 2).
> + * @param socket_id
> + * The *socket_id* argument is the socket identifier in case of
> + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + * constraint for the reserved zone.
> + * @param flags
> + * An OR of the following:
> + * - LFRING_F_SP_ENQ: If this flag is set, the default behavior when
> + * using ``rte_lfring_enqueue()`` or ``rte_lfring_enqueue_bulk()``
> + * is "single-producer". Otherwise, it is "multi-producers".
> + * - LFRING_F_SC_DEQ: If this flag is set, the default behavior when
> + * using ``rte_lfring_dequeue()`` or ``rte_lfring_dequeue_bulk()``
> + * is "single-consumer". Otherwise, it is "multi-consumers".
> + * @return
> + * On success, the pointer to the new allocated lfring. NULL on error with
> + * rte_errno set appropriately. Possible errno values include:
> + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
> + * - E_RTE_SECONDARY - function was called from a secondary process
> instance
>
> E_RTE_SECONDARY and E_RTE_NO_CONFIG are not possible error cases
>
> <snip>
>
>
> +#define ENQ_RETRY_LIMIT 32
>
> Per the coding guidelines (
> https://doc.dpdk.org/guides/contributing/coding_style.html#macros), macros
> should be prefixed with RTE_.
>
> <snip>
>
>
> +/**
> + * Return the number of elements which can be stored in the lfring.
> + *
> + * @param r
> + * A pointer to the lfring structure.
> + * @return
> + * The usable size of the lfring.
> + */
> +static inline unsigned int
> +rte_lfring_get_capacity(const struct rte_lfring *r) {
> + return r->size;
>
> I believe this should return r->mask, to account for the one unusable ring
> entry.
I think this is a mistake, all ring entries should be usable.
>
> <snip>
>
>
> diff --git a/lib/librte_lfring/rte_lfring_version.map
> b/lib/librte_lfring/rte_lfring_version.map
> new file mode 100644
> index 0000000..d935efd
> --- /dev/null
> +++ b/lib/librte_lfring/rte_lfring_version.map
> @@ -0,0 +1,19 @@
> +DPDK_2.0 {
> + global:
> +
> + rte_ring_create;
> + rte_ring_dump;
> + rte_ring_get_memsize;
> + rte_ring_init;
> + rte_ring_list_dump;
> + rte_ring_lookup;
>
> Need to fix function names and DPDK version number
>
> Thanks,
> Gage
>
>
@@ -719,6 +719,11 @@ CONFIG_RTE_LIBRTE_PMD_IFPGA_RAWDEV=y
CONFIG_RTE_LIBRTE_RING=y
#
+# Compile librte_lfring
+#
+CONFIG_RTE_LIBRTE_LFRING=y
+
+#
# Compile librte_mempool
#
CONFIG_RTE_LIBRTE_MEMPOOL=y
@@ -12,6 +12,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_PCI) += librte_pci
DEPDIRS-librte_pci := librte_eal
DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_ring
DEPDIRS-librte_ring := librte_eal
+DIRS-$(CONFIG_RTE_LIBRTE_RING) += librte_lfring
+DEPDIRS-librte_lfring := librte_eal
DIRS-$(CONFIG_RTE_LIBRTE_MEMPOOL) += librte_mempool
DEPDIRS-librte_mempool := librte_eal librte_ring
DIRS-$(CONFIG_RTE_LIBRTE_MBUF) += librte_mbuf
new file mode 100644
@@ -0,0 +1,22 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_lfring.a
+
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+LDLIBS += -lrte_eal
+
+EXPORT_MAP := rte_lfring_version.map
+
+LIBABIVER := 1
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_lfring.c
+
+# install includes
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_lfring.h lockfree16.h
+
+include $(RTE_SDK)/mk/rte.lib.mk
new file mode 100644
@@ -0,0 +1,143 @@
+//Copyright (c) 2018-2019, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier: BSD-3-Clause
+
+#ifndef _LOCKFREE16_H
+#define _LOCKFREE16_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#define HAS_ACQ(mo) ((mo) != __ATOMIC_RELAXED && (mo) != __ATOMIC_RELEASE)
+#define HAS_RLS(mo) ((mo) == __ATOMIC_RELEASE || \
+ (mo) == __ATOMIC_ACQ_REL || \
+ (mo) == __ATOMIC_SEQ_CST)
+
+#define MO_LOAD(mo) (HAS_ACQ((mo)) ? __ATOMIC_ACQUIRE : __ATOMIC_RELAXED)
+#define MO_STORE(mo) (HAS_RLS((mo)) ? __ATOMIC_RELEASE : __ATOMIC_RELAXED)
+
+#if defined __aarch64__
+
+static inline __int128 ldx128(const __int128 *var, int mm)
+{
+ __int128 old;
+ if (mm == __ATOMIC_ACQUIRE)
+ __asm volatile("ldaxp %0, %H0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : "memory");
+ else if (mm == __ATOMIC_RELAXED)
+ __asm volatile("ldxp %0, %H0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : );
+ else
+ __builtin_trap();
+ return old;
+}
+
+//Return 0 on success, 1 on failure
+static inline uint32_t stx128(__int128 *var, __int128 neu, int mm)
+{
+ uint32_t ret;
+ if (mm == __ATOMIC_RELEASE)
+ __asm volatile("stlxp %w0, %1, %H1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : "memory");
+ else if (mm == __ATOMIC_RELAXED)
+ __asm volatile("stxp %w0, %1, %H1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : );
+ else
+ __builtin_trap();
+ return ret;
+}
+
+//Weak compare-exchange and non-atomic load on failure
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+ __int128 *exp,
+ register __int128 neu,
+ int mo_success,
+ int mo_failure)
+{
+ (void)mo_failure;//Ignore memory ordering for failure, memory order for
+ //success must be stronger or equal
+ int ldx_mo = MO_LOAD(mo_success);
+ int stx_mo = MO_STORE(mo_success);
+ register __int128 expected = *exp;
+ __asm __volatile("" ::: "memory");
+ //Atomicity of LDXP is not guaranteed
+ register __int128 old = ldx128(var, ldx_mo);
+ if (old == expected && !stx128(var, neu, stx_mo)) {
+ //Right value and STX succeeded
+ __asm __volatile("" ::: "memory");
+ return 1;
+ }
+ __asm __volatile("" ::: "memory");
+ //Wrong value or STX failed
+ *exp = old;//Old possibly torn value (OK for 'frail' flavour)
+ return 0;//Failure, *exp updated
+}
+
+#elif defined __x86_64__
+
+union u128 {
+ struct {
+ uint64_t lo, hi;
+ } s;
+ __int128 ui;
+};
+
+static inline bool
+cmpxchg16b(__int128 *src, union u128 *cmp, union u128 with)
+{
+ bool result;
+ __asm__ __volatile__
+ ("lock cmpxchg16b %1\n\tsetz %0"
+ : "=q" (result), "+m" (*src), "+d" (cmp->s.hi), "+a" (cmp->s.lo)
+ : "c" (with.s.hi), "b" (with.s.lo)
+ : "cc", "memory"
+ );
+ return result;
+}
+
+static inline bool
+lockfree_compare_exchange_16(register __int128 *var,
+ __int128 *exp,
+ register __int128 neu,
+ bool weak,
+ int mo_success,
+ int mo_failure)
+{
+ (void)weak;
+ (void)mo_success;
+ (void)mo_failure;
+ union u128 cmp, with;
+ cmp.ui = *exp;
+ with.ui = neu;
+ if (cmpxchg16b(var, &cmp, with))
+ return true;
+ *exp = cmp.ui;
+ return false;
+}
+
+static inline bool
+lockfree_compare_exchange_16_frail(register __int128 *var,
+ __int128 *exp,
+ register __int128 neu,
+ int mo_s,
+ int mo_f)
+{
+ return lockfree_compare_exchange_16(var, exp, neu, true, mo_s, mo_f);
+}
+
+#else
+
+#error Unsupported architecture
+
+#endif
+
+#endif
new file mode 100644
@@ -0,0 +1,6 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2017 Intel Corporation
+
+version = 1
+sources = files('rte_lfring.c')
+headers = files('rte_lfring.h','lockfree16.h')
new file mode 100644
@@ -0,0 +1,264 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2015 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memzone.h>
+#include <rte_malloc.h>
+#include <rte_launch.h>
+#include <rte_eal.h>
+#include <rte_eal_memconfig.h>
+#include <rte_atomic.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_spinlock.h>
+
+#include "rte_lfring.h"
+
+TAILQ_HEAD(rte_lfring_list, rte_tailq_entry);
+
+static struct rte_tailq_elem rte_lfring_tailq = {
+ .name = RTE_TAILQ_LFRING_NAME,
+};
+EAL_REGISTER_TAILQ(rte_lfring_tailq)
+
+/* true if x is a power of 2 */
+#define POWEROF2(x) ((((x)-1) & (x)) == 0)
+
+/* return the size of memory occupied by a ring */
+ssize_t
+rte_lfring_get_memsize(unsigned int count)
+{
+ ssize_t sz;
+
+ /* count must be a power of 2 */
+ if ((!POWEROF2(count)) || (count > 0x80000000U)) {
+ RTE_LOG(ERR, RING,
+ "Requested size is invalid, must be power of 2, and "
+ "do not exceed the size limit %u\n", 0x80000000U);
+ return -EINVAL;
+ }
+
+ sz = sizeof(struct rte_lfring) + count * sizeof(struct rte_lfr_element);
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+ return sz;
+}
+
+int
+rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int count,
+ unsigned int flags)
+{
+ int ret;
+
+ /* compilation-time checks */
+ RTE_BUILD_BUG_ON((sizeof(struct rte_lfring) &
+ RTE_CACHE_LINE_MASK) != 0);
+
+ /* init the ring structure */
+ memset(r, 0, sizeof(*r));
+ ret = snprintf(r->name, sizeof(r->name), "%s", name);
+ if (ret < 0 || ret >= (int)sizeof(r->name))
+ return -ENAMETOOLONG;
+ r->flags = flags;
+
+ r->size = rte_align32pow2(count);
+ r->mask = r->size - 1;
+ r->log2 = rte_log2_u64(r->size);
+ r->head = 0;
+ r->tail = 0;
+ for (uint64_t i = 0; i < r->size; i++) {
+ r->ring[i].ptr = NULL;
+ r->ring[i].lap = (i - r->size) >> r->log2;
+ }
+
+ return 0;
+}
+
+/* create the ring */
+struct rte_lfring *
+rte_lfring_create(const char *name, unsigned int count, int socket_id,
+ unsigned int flags)
+{
+ char mz_name[RTE_MEMZONE_NAMESIZE];
+ struct rte_lfring *r;
+ struct rte_tailq_entry *te;
+ const struct rte_memzone *mz;
+ ssize_t ring_size;
+ int mz_flags = 0;
+ struct rte_lfring_list *ring_list = NULL;
+ const unsigned int requested_count = count;
+ int ret;
+
+ ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+ count = rte_align32pow2(count);
+
+ ring_size = rte_lfring_get_memsize(count);
+ if (ring_size < 0) {
+ rte_errno = ring_size;
+ return NULL;
+ }
+
+ ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+ RTE_LFRING_MZ_PREFIX, name);
+ if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+ rte_errno = ENAMETOOLONG;
+ return NULL;
+ }
+
+ te = rte_zmalloc("LFRING_TAILQ_ENTRY", sizeof(*te), 0);
+ if (te == NULL) {
+ RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
+ rte_errno = ENOMEM;
+ return NULL;
+ }
+
+ rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+ /* reserve a memory zone for this ring. If we can't get rte_config or
+ * we are secondary process, the memzone_reserve function will set
+ * rte_errno for us appropriately - hence no check in this this
+ * function
+ */
+ mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id,
+ mz_flags, __alignof__(*r));
+ if (mz != NULL) {
+ r = mz->addr;
+ /* no need to check return value here, we already checked the
+ * arguments above
+ */
+ rte_lfring_init(r, name, requested_count, flags);
+
+ te->data = (void *) r;
+ r->memzone = mz;
+
+ TAILQ_INSERT_TAIL(ring_list, te, next);
+ } else {
+ r = NULL;
+ RTE_LOG(ERR, RING, "Cannot reserve memory\n");
+ rte_free(te);
+ }
+ rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+ return r;
+}
+
+/* free the ring */
+void
+rte_lfring_free(struct rte_lfring *r)
+{
+ struct rte_lfring_list *ring_list = NULL;
+ struct rte_tailq_entry *te;
+
+ if (r == NULL)
+ return;
+
+ /*
+ * Ring was not created with rte_lfring_create,
+ * therefore, there is no memzone to free.
+ */
+ if (r->memzone == NULL) {
+ RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_lfring_create()");
+ return;
+ }
+
+ if (rte_memzone_free(r->memzone) != 0) {
+ RTE_LOG(ERR, RING, "Cannot free memory\n");
+ return;
+ }
+
+ ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+ rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+
+ /* find out tailq entry */
+ TAILQ_FOREACH(te, ring_list, next) {
+ if (te->data == (void *) r)
+ break;
+ }
+
+ if (te == NULL) {
+ rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+ return;
+ }
+
+ TAILQ_REMOVE(ring_list, te, next);
+
+ rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+ rte_free(te);
+}
+
+/* dump the status of the ring on the console */
+void
+rte_lfring_dump(FILE *f, const struct rte_lfring *r)
+{
+ fprintf(f, "ring <%s>@%p\n", r->name, r);
+ fprintf(f, " flags=%x\n", r->flags);
+ fprintf(f, " size=%"PRIu64"\n", r->size);
+ fprintf(f, " tail=%"PRIu64"\n", r->tail);
+ fprintf(f, " head=%"PRIu64"\n", r->head);
+ fprintf(f, " used=%u\n", rte_lfring_count(r));
+ fprintf(f, " avail=%u\n", rte_lfring_free_count(r));
+}
+
+/* dump the status of all rings on the console */
+void
+rte_lfring_list_dump(FILE *f)
+{
+ const struct rte_tailq_entry *te;
+ struct rte_lfring_list *ring_list;
+
+ ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+ rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+ TAILQ_FOREACH(te, ring_list, next) {
+ rte_lfring_dump(f, (struct rte_lfring *) te->data);
+ }
+
+ rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+}
+
+/* search a ring from its name */
+struct rte_lfring *
+rte_lfring_lookup(const char *name)
+{
+ struct rte_tailq_entry *te;
+ struct rte_lfring *r = NULL;
+ struct rte_lfring_list *ring_list;
+
+ ring_list = RTE_TAILQ_CAST(rte_lfring_tailq.head, rte_lfring_list);
+
+ rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK);
+
+ TAILQ_FOREACH(te, ring_list, next) {
+ r = (struct rte_lfring *) te->data;
+ if (strncmp(name, r->name, RTE_LFRING_NAMESIZE) == 0)
+ break;
+ }
+
+ rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+ if (te == NULL) {
+ rte_errno = ENOENT;
+ return NULL;
+ }
+
+ return r;
+}
new file mode 100644
@@ -0,0 +1,621 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2017 Intel Corporation
+ * Copyright (c) 2019 Arm Ltd
+ * All rights reserved.
+ */
+
+#ifndef _RTE_LFRING_H_
+#define _RTE_LFRING_H_
+
+/**
+ * @file
+ * RTE Lfring
+ *
+ * The Lfring Manager is a fixed-size queue, implemented as a table of
+ * (pointers + counters).
+ *
+ * - FIFO (First In First Out)
+ * - Maximum size is fixed; the pointers are stored in a table.
+ * - Lock-free implementation.
+ * - Multi- or single-consumer dequeue.
+ * - Multi- or single-producer enqueue.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+
+#include "lockfree16.h"
+
+#define RTE_TAILQ_LFRING_NAME "RTE_LFRING"
+
+#define RTE_LFRING_MZ_PREFIX "RG_"
+/**< The maximum length of an lfring name. */
+#define RTE_LFRING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+ sizeof(RTE_LFRING_MZ_PREFIX) + 1)
+
+struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+
+struct rte_lfr_element {
+ void *ptr;
+ uintptr_t lap;
+};
+
+#if __SIZEOF_POINTER__ == 8
+typedef __int128 dintptr_t;
+#elif __SIZEOF_POINTER__ == 4
+typedef uint64_t dintptr_t;
+#else
+#error
+#endif
+
+/**
+ * An RTE lfring structure.
+ *
+ */
+struct rte_lfring {
+ char name[RTE_LFRING_NAMESIZE] __rte_cache_aligned; /**< Name of the lfring. */
+ const struct rte_memzone *memzone;
+ /**< Memzone, if any, containing the rte_lfring */
+
+ char pad0 __rte_cache_aligned; /**< empty cache line */
+
+ /** Modified by producer. */
+ uint64_t tail __rte_cache_aligned;
+ uint64_t size; /**< Size of ring. */
+ uint64_t mask; /**< Mask (size-1) of lfring. */
+ uint32_t log2; /**< log2 size. */
+ uint32_t flags; /**< Flags supplied at creation. */
+ char pad1 __rte_cache_aligned; /**< empty cache line */
+
+ /** Modified by consumer. */
+ uint64_t head __rte_cache_aligned;
+ char pad2 __rte_cache_aligned; /**< empty cache line */
+
+ struct rte_lfr_element ring[] __rte_cache_aligned;
+};
+
+#define LFRING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
+#define LFRING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+
+/**
+ * Calculate the memory size needed for a lfring
+ *
+ * This function returns the number of bytes needed for a lfring, given
+ * the number of elements in it. This value is the sum of the size of
+ * the structure rte_lfring and the size of the memory needed by the
+ * objects pointers. The value is aligned to a cache line size.
+ *
+ * @param count
+ * The number of elements in the lfring (must be a power of 2).
+ * @return
+ * - The memory size needed for the lfring on success.
+ * - -EINVAL if count is not a power of 2.
+ */
+ssize_t rte_lfring_get_memsize(unsigned int count);
+
+/**
+ * Initialize a lfring structure.
+ *
+ * Initialize a lfring structure in memory pointed by "r". The size of the
+ * memory area must be large enough to store the lfring structure and the
+ * object table. It is advised to use rte_lfring_get_memsize() to get the
+ * appropriate size.
+ *
+ * The lfring size is set to *count*, which must be a power of two. Water
+ * marking is disabled by default. The real usable lfring size is
+ * *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is not added in RTE_TAILQ_LFRING global list. Indeed, the
+ * memory given by the caller may not be shareable among dpdk
+ * processes.
+ *
+ * @param r
+ * The pointer to the lfring structure followed by the objects table.
+ * @param name
+ * The name of the lfring.
+ * @param count
+ * The number of elements in the lfring (must be a power of 2).
+ * @param flags
+ * @return
+ * 0 on success, or a negative value on error.
+ */
+int rte_lfring_init(struct rte_lfring *r, const char *name, unsigned int count,
+ unsigned int flags);
+
+/**
+ * Create a new lfring named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_lfring_init() to initialize an empty lfring.
+ *
+ * The new lfring size is set to *count*, which must be a power of
+ * two. Water marking is disabled by default. The real usable lfring size
+ * is *count-1* instead of *count* to differentiate a free lfring from an
+ * empty lfring.
+ *
+ * The lfring is added in RTE_TAILQ_LFRING list.
+ *
+ * @param name
+ * The name of the lfring.
+ * @param count
+ * The size of the lfring (must be a power of 2).
+ * @param socket_id
+ * The *socket_id* argument is the socket identifier in case of
+ * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ * constraint for the reserved zone.
+ * @param flags
+ * An OR of the following:
+ * - LFRING_F_SP_ENQ: If this flag is set, the default behavior when
+ * using ``rte_lfring_enqueue()`` or ``rte_lfring_enqueue_bulk()``
+ * is "single-producer". Otherwise, it is "multi-producers".
+ * - LFRING_F_SC_DEQ: If this flag is set, the default behavior when
+ * using ``rte_lfring_dequeue()`` or ``rte_lfring_dequeue_bulk()``
+ * is "single-consumer". Otherwise, it is "multi-consumers".
+ * @return
+ * On success, the pointer to the new allocated lfring. NULL on error with
+ * rte_errno set appropriately. Possible errno values include:
+ * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
+ * - E_RTE_SECONDARY - function was called from a secondary process instance
+ * - EINVAL - count provided is not a power of 2
+ * - ENOSPC - the maximum number of memzones has already been allocated
+ * - EEXIST - a memzone with the same name already exists
+ * - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+struct rte_lfring *rte_lfring_create(const char *name, unsigned int count,
+ int socket_id, unsigned int flags);
+/**
+ * De-allocate all memory used by the lfring.
+ *
+ * @param r
+ * Lfring to free
+ */
+void rte_lfring_free(struct rte_lfring *r);
+
+/**
+ * Dump the status of the lfring to a file.
+ *
+ * @param f
+ * A pointer to a file for output
+ * @param r
+ * A pointer to the lfring structure.
+ */
+void rte_lfring_dump(FILE *f, const struct rte_lfring *r);
+
+/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */
+static __rte_always_inline bool
+_rte_lfring_before(uint64_t a, uint64_t b)
+{
+ return (int64_t)(a - b) < 0;
+}
+
+static __rte_always_inline uint64_t
+_rte_lfring_update(uint64_t *loc, uint64_t neu)
+{
+ uint64_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
+ do {
+ if (_rte_lfring_before(neu, old)) {
+ /* neu < old */
+ return old;
+ }
+ /* Else neu > old, need to update *loc */
+ } while (!__atomic_compare_exchange_n(loc,
+ &old,
+ neu,
+ /*weak=*/true,
+ __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED));
+ return neu;
+}
+
+static __rte_always_inline uint64_t
+_rte_lfring_reload(const uint64_t *loc, uint64_t idx)
+{
+ uint64_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
+ if (_rte_lfring_before(idx, fresh)) {
+ /* fresh is after idx, use this instead */
+ idx = fresh;
+ } else {
+ /* Continue with next slot */
+ idx++;
+ }
+ return idx;
+}
+
+static __rte_always_inline void
+_rte_lfring_enq_elems(void *const *elems,
+ struct rte_lfr_element *ring,
+ uint64_t mask,
+ uint32_t log2,
+ uint64_t idx,
+ uint64_t num)
+{
+ uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+ struct rte_lfr_element *ring0 = &ring[idx & mask];
+ for (uint64_t i = 0; i < seg0; i++) {
+ ring0[i].ptr = *elems++;
+ ring0[i].lap = (idx++) >> log2;
+ }
+ if (num > seg0) {
+ uint64_t seg1 = num - seg0;
+ for (uint64_t i = 0; i < seg1; i++) {
+ ring[i].ptr = *elems++;
+ ring[i].lap = (idx++) >> log2;
+ }
+ }
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_sp_enqueue_bulk(struct rte_lfring *r,
+ void * const *elems,
+ unsigned int nelems,
+ unsigned int *free_space)
+{
+ /* Single-producer */
+ uint64_t size = r->mask + 1;
+ uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+ uint64_t head = __atomic_load_n(&r->head, __ATOMIC_ACQUIRE);
+ int64_t actual = RTE_MIN((int64_t)(head + size - tail), (int64_t)nelems);
+ if (actual <= 0)
+ return 0;
+ _rte_lfring_enq_elems(elems, r->ring, r->mask, r->log2, tail, actual);
+ __atomic_store_n(&r->tail, tail + actual, __ATOMIC_RELEASE);
+ if (free_space != NULL)
+ *free_space = head + size - (tail + actual);
+ return actual;
+}
+
+static inline bool
+lockfree_compare_exchange_frail(dintptr_t *loc,
+ dintptr_t *old,
+ dintptr_t neu,
+ int mo_success,
+ int mo_failure)
+{
+#if __SIZEOF_POINTER__ == 8
+ return lockfree_compare_exchange_16_frail(loc,
+ old,
+ neu,
+ mo_success,
+ mo_failure);
+#elif __SIZEOF_POINTER__ == 4
+ return lockfree_compare_exchange_8_frail(loc,
+ old,
+ neu,
+ mo_success,
+ mo_failure);
+#else
+#error
+#endif
+}
+
+#define ENQ_RETRY_LIMIT 32
+
+static __rte_always_inline unsigned int
+rte_lfring_mp_enqueue_bulk(struct rte_lfring *r,
+ void * const *elems,
+ unsigned int nelems,
+ unsigned int *free_space)
+{
+ /* Lock-free multi-producer */
+ uint64_t mask = r->mask;
+ uint64_t size = mask + 1;
+ uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+ uint64_t actual = 0;
+ uint32_t retries = 0;
+restart:
+ while (actual < nelems &&
+ _rte_lfring_before(tail,
+ __atomic_load_n(&r->head, __ATOMIC_ACQUIRE)) + size) {
+ union {
+ struct rte_lfr_element e;
+ dintptr_t ui;
+ } old, neu;
+ struct rte_lfr_element *slot = &r->ring[tail & mask];
+ old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
+ old.e.lap = __atomic_load_n(&slot->lap, __ATOMIC_RELAXED);
+ do {
+ if (old.e.lap != (tail - size) >> r->log2) {
+ if (old.e.lap != tail >> r->log2 ||
+ ++retries == ENQ_RETRY_LIMIT) {
+ /* We are far behind,
+ * restart with fresh index
+ */
+ tail = _rte_lfring_reload(&r->tail,
+ tail);
+ retries = 0;
+ goto restart;
+ } else {
+ /* Slot already enqueued,
+ * try next slot
+ */
+ tail++;
+ goto restart;
+ }
+ }
+ /* Found slot that was used one lap back,
+ * try to enqueue next element
+ */
+ neu.e.ptr = elems[actual];
+ neu.e.lap = tail >> r->log2;/* Set lap on enqueue */
+ } while (!lockfree_compare_exchange_frail((dintptr_t *)slot,
+ &old.ui,
+ neu.ui,
+ __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED));
+ /* Enqueue succeeded */
+ actual++;
+ /* Continue with next slot */
+ tail++;
+ retries = 0;
+ }
+ tail = _rte_lfring_update(&r->tail, tail);
+ if (free_space != NULL)
+ *free_space = __atomic_load_n(&r->head,
+ __ATOMIC_RELAXED) + size - tail;
+ return actual;
+}
+
+/**
+ * Enqueue several objects on an lfring.
+ *
+ * @param r
+ * A pointer to the lfring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to enqueue on the lfring from the obj_table.
+ * @param free_space
+ * if non-NULL, returns the amount of space in the lfring after the
+ * enqueue operation has finished.
+ * @return
+ * The number of objects enqueued, between 0 and n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_enqueue_bulk(struct rte_lfring *r,
+ void * const *elems,
+ unsigned int nelems,
+ unsigned int *free_space)
+{
+ if (r->flags & LFRING_F_SP_ENQ)
+ return rte_lfring_sp_enqueue_bulk(r, elems, nelems, free_space);
+ else
+ return rte_lfring_mp_enqueue_bulk(r, elems, nelems, free_space);
+}
+
+static __rte_always_inline void
+_rte_lfring_deq_elems(void **elems,
+ const struct rte_lfr_element *ring,
+ uint64_t mask,
+ uint64_t idx,
+ uint64_t num)
+{
+ uint64_t seg0 = RTE_MIN(mask + 1 - (idx & mask), num);
+ const struct rte_lfr_element *ring0 = &ring[idx & mask];
+ for (uint64_t i = 0; i < seg0; i++)
+ *elems++ = ring0[i].ptr;
+ if (num > seg0) {
+ uint64_t seg1 = num - seg0;
+ for (uint64_t i = 0; i < seg1; i++)
+ *elems++ = ring[i].ptr;
+ }
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_sc_dequeue_bulk(struct rte_lfring *r,
+ void **elems,
+ unsigned int nelems,
+ unsigned int *available)
+{
+ /* Single-consumer */
+ int64_t actual;
+ uint64_t mask = r->mask;
+ uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+ uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+ actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+ if (unlikely(actual < nelems))
+ return 0;
+ _rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+ /* Use RMB + STORE-RELAXED instead of STORE-RELEASE as we only
+ * read shared data, no need to order writes.
+ */
+ rte_smp_rmb();
+ __atomic_store_n(&r->head, head + actual, __ATOMIC_RELAXED);
+ if (available != NULL)
+ *available = tail - (head + actual);
+ return (unsigned int)actual;
+}
+
+static __rte_always_inline unsigned int
+rte_lfring_mc_dequeue_bulk(struct rte_lfring *r,
+ void **elems,
+ unsigned int nelems,
+ unsigned int *available)
+{
+ /* Lock-free multi-consumer */
+ int64_t actual;
+ uint64_t mask = r->mask;
+ uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+ uint64_t tail;
+ do {
+ /* Load tail on every iteration to avoid spurious queue
+ * empty situations
+ */
+ tail = __atomic_load_n(&r->tail, __ATOMIC_ACQUIRE);
+ actual = RTE_MIN((int64_t)(tail - head), (int64_t)nelems);
+ if (unlikely(actual < nelems))
+ return 0;
+ _rte_lfring_deq_elems(elems, r->ring, mask, head, actual);
+ /* Use RMB + CAS-RELAXED instead of CAS-RELEASE as we only
+ * read shared data, no need to order writes.
+ */
+ } while (!__atomic_compare_exchange_n(&r->head,
+ &head,
+ head + actual,
+ /*weak*/false,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED));
+ if (available != NULL)
+ *available = tail - (head + actual);
+ return (unsigned int)actual;
+}
+
+/**
+ * Dequeue several objects from an lfring.
+ *
+ * @param r
+ * A pointer to the lfring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to dequeue from the lfring to the obj_table.
+ * @param available
+ * Returns the number of remaining ring entries after the dequeue has finished
+ * @return
+ * The number of objects dequeued, from 0 to n (inclusive)
+ */
+static __rte_always_inline unsigned int
+rte_lfring_dequeue_bulk(struct rte_lfring *r,
+ void **elems,
+ unsigned int nelems,
+ unsigned int *available)
+{
+ if (r->flags & LFRING_F_SC_DEQ)
+ return rte_lfring_sc_dequeue_bulk(r, elems, nelems, available);
+ else
+ return rte_lfring_mc_dequeue_bulk(r, elems, nelems, available);
+}
+
+/**
+ * Return the number of entries in a lfring.
+ *
+ * @param r
+ * A pointer to the lfring structure.
+ * @return
+ * The number of entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_count(const struct rte_lfring *r)
+{
+ uint64_t head = __atomic_load_n(&r->head, __ATOMIC_RELAXED);
+ uint64_t tail = __atomic_load_n(&r->tail, __ATOMIC_RELAXED);
+ uint64_t avail = tail - head;
+ return (int64_t)avail > 0 ? avail : 0;
+}
+
+/**
+ * Return the number of free entries in a lfring.
+ *
+ * @param r
+ * A pointer to the lfring structure.
+ * @return
+ * The number of free entries in the lfring.
+ */
+static inline unsigned
+rte_lfring_free_count(const struct rte_lfring *r)
+{
+ return r->size - rte_lfring_count(r);
+}
+
+/**
+ * Test if a lfring is full.
+ *
+ * @param r
+ * A pointer to the lfring structure.
+ * @return
+ * - 1: The lfring is full.
+ * - 0: The lfring is not full.
+ */
+static inline int
+rte_lfring_full(const struct rte_lfring *r)
+{
+ return rte_lfring_free_count(r) == 0;
+}
+
+/**
+ * Test if a lfring is empty.
+ *
+ * @param r
+ * A pointer to the lfring structure.
+ * @return
+ * - 1: The lfring is empty.
+ * - 0: The lfring is not empty.
+ */
+static inline int
+rte_lfring_empty(const struct rte_lfring *r)
+{
+ return rte_lfring_count(r) == 0;
+}
+
+/**
+ * Return the size of the lfring.
+ *
+ * @param r
+ * A pointer to the lfring structure.
+ * @return
+ * The size of the data store used by the lfring.
+ * NOTE: this is not the same as the usable space in the lfring. To query that
+ * use ``rte_lfring_get_capacity()``.
+ */
+static inline unsigned int
+rte_lfring_get_size(const struct rte_lfring *r)
+{
+ return r->size;
+}
+
+/**
+ * Return the number of elements which can be stored in the lfring.
+ *
+ * @param r
+ * A pointer to the lfring structure.
+ * @return
+ * The usable size of the lfring.
+ */
+static inline unsigned int
+rte_lfring_get_capacity(const struct rte_lfring *r)
+{
+ return r->size;
+}
+
+/**
+ * Dump the status of all lfrings on the console
+ *
+ * @param f
+ * A pointer to a file for output
+ */
+void rte_lfring_list_dump(FILE *f);
+
+/**
+ * Search a lfring from its name
+ *
+ * @param name
+ * The name of the lfring.
+ * @return
+ * The pointer to the lfring matching the name, or NULL if not found,
+ * with rte_errno set appropriately. Possible rte_errno values include:
+ * - ENOENT - required entry not available to return.
+ */
+struct rte_lfring *rte_lfring_lookup(const char *name);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_LFRING_H_ */
new file mode 100644
@@ -0,0 +1,19 @@
+DPDK_2.0 {
+ global:
+
+ rte_ring_create;
+ rte_ring_dump;
+ rte_ring_get_memsize;
+ rte_ring_init;
+ rte_ring_list_dump;
+ rte_ring_lookup;
+
+ local: *;
+};
+
+DPDK_2.2 {
+ global:
+
+ rte_ring_free;
+
+} DPDK_2.0;