@@ -113,12 +113,12 @@ enum rte_distributor_match_function {
* There is a separate cacheline for returns in the burst API.
*/
struct rte_distributor_buffer {
- volatile int64_t bufptr64[RTE_DIST_BURST_SIZE]
+ volatile RTE_ATOMIC(int64_t) bufptr64[RTE_DIST_BURST_SIZE]
__rte_cache_aligned; /* <= outgoing to worker */
int64_t pad1 __rte_cache_aligned; /* <= one cache line */
- volatile int64_t retptr64[RTE_DIST_BURST_SIZE]
+ volatile RTE_ATOMIC(int64_t) retptr64[RTE_DIST_BURST_SIZE]
__rte_cache_aligned; /* <= incoming from worker */
int64_t pad2 __rte_cache_aligned; /* <= one cache line */
@@ -38,7 +38,7 @@
struct rte_distributor_buffer *buf = &(d->bufs[worker_id]);
unsigned int i;
- volatile int64_t *retptr64;
+ volatile RTE_ATOMIC(int64_t) *retptr64;
if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
rte_distributor_request_pkt_single(d->d_single,
@@ -50,7 +50,7 @@
/* Spin while handshake bits are set (scheduler clears it).
* Sync with worker on GET_BUF flag.
*/
- while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
+ while (unlikely(rte_atomic_load_explicit(retptr64, rte_memory_order_acquire)
& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
rte_pause();
uint64_t t = rte_rdtsc()+100;
@@ -78,8 +78,8 @@
* line is ready for processing
* Sync with distributor to release retptrs
*/
- __atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
- __ATOMIC_RELEASE);
+ rte_atomic_store_explicit(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
+ rte_memory_order_release);
}
int
@@ -102,7 +102,7 @@
* RETURN_BUF is set when distributor must retrieve in-flight packets
* Sync with distributor to acquire bufptrs
*/
- if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
+ if (rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire)
& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))
return -1;
@@ -120,8 +120,8 @@
* on the next cacheline while we're working.
* Sync with distributor on GET_BUF flag. Release bufptrs.
*/
- __atomic_store_n(&(buf->bufptr64[0]),
- buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
+ rte_atomic_store_explicit(&(buf->bufptr64[0]),
+ buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, rte_memory_order_release);
return count;
}
@@ -177,7 +177,7 @@
/* Spin while handshake bits are set (scheduler clears it).
* Sync with worker on GET_BUF flag.
*/
- while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED)
+ while (unlikely(rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_relaxed)
& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
rte_pause();
uint64_t t = rte_rdtsc()+100;
@@ -187,7 +187,7 @@
}
/* Sync with distributor to acquire retptrs */
- __atomic_thread_fence(__ATOMIC_ACQUIRE);
+ __atomic_thread_fence(rte_memory_order_acquire);
for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
/* Switch off the return bit first */
buf->retptr64[i] = 0;
@@ -200,15 +200,15 @@
* we won't read any mbufs from there even if GET_BUF is set.
* This allows distributor to retrieve in-flight already sent packets.
*/
- __atomic_fetch_or(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
- __ATOMIC_ACQ_REL);
+ rte_atomic_fetch_or_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
+ rte_memory_order_acq_rel);
/* set the RETURN_BUF on retptr64 even if we got no returns.
* Sync with distributor on RETURN_BUF flag. Release retptrs.
* Notify distributor that we don't request more packets any more.
*/
- __atomic_store_n(&(buf->retptr64[0]),
- buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE);
+ rte_atomic_store_explicit(&(buf->retptr64[0]),
+ buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, rte_memory_order_release);
return 0;
}
@@ -297,7 +297,7 @@
* to worker which does not require new packets.
* They must be retrieved and assigned to another worker.
*/
- if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
+ if (!(rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire)
& RTE_DISTRIB_GET_BUF))
for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
@@ -310,8 +310,8 @@
* with new packets if worker will make a new request.
* - clear RETURN_BUF to unlock reads on worker side.
*/
- __atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
- __ATOMIC_RELEASE);
+ rte_atomic_store_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
+ rte_memory_order_release);
/* Collect backlog packets from worker */
for (i = 0; i < d->backlog[wkr].count; i++)
@@ -348,7 +348,7 @@
unsigned int i;
/* Sync on GET_BUF flag. Acquire retptrs. */
- if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
+ if (rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_acquire)
& (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {
for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {
@@ -379,7 +379,7 @@
/* Clear for the worker to populate with more returns.
* Sync with distributor on GET_BUF flag. Release retptrs.
*/
- __atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE);
+ rte_atomic_store_explicit(&(buf->retptr64[0]), 0, rte_memory_order_release);
}
return count;
}
@@ -404,7 +404,7 @@
return 0;
/* Sync with worker on GET_BUF flag */
- while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
+ while (!(rte_atomic_load_explicit(&(d->bufs[wkr].bufptr64[0]), rte_memory_order_acquire)
& RTE_DISTRIB_GET_BUF)) {
handle_returns(d, wkr);
if (unlikely(!d->active[wkr]))
@@ -430,8 +430,8 @@
/* Clear the GET bit.
* Sync with worker on GET_BUF flag. Release bufptrs.
*/
- __atomic_store_n(&(buf->bufptr64[0]),
- buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
+ rte_atomic_store_explicit(&(buf->bufptr64[0]),
+ buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, rte_memory_order_release);
return buf->count;
}
@@ -463,8 +463,8 @@
/* Flush out all non-full cache-lines to workers. */
for (wid = 0 ; wid < d->num_workers; wid++) {
/* Sync with worker on GET_BUF flag. */
- if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
- __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
+ if (rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]),
+ rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF) {
d->bufs[wid].count = 0;
release(d, wid);
handle_returns(d, wid);
@@ -598,8 +598,8 @@
/* Flush out all non-full cache-lines to workers. */
for (wid = 0 ; wid < d->num_workers; wid++)
/* Sync with worker on GET_BUF flag. */
- if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
- __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
+ if ((rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]),
+ rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) {
d->bufs[wid].count = 0;
release(d, wid);
}
@@ -700,8 +700,8 @@
/* throw away returns, so workers can exit */
for (wkr = 0; wkr < d->num_workers; wkr++)
/* Sync with worker. Release retptrs. */
- __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
- __ATOMIC_RELEASE);
+ rte_atomic_store_explicit(&(d->bufs[wkr].retptr64[0]), 0,
+ rte_memory_order_release);
d->returns.start = d->returns.count = 0;
}