@@ -30,7 +30,13 @@ pipeline_queue_worker_single_stage_tx(void *arg)
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
pipeline_event_tx(dev, port, &ev);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visible to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
} else {
ev.queue_id++;
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
@@ -59,7 +65,13 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
pipeline_event_enqueue(dev, port, &ev);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visible to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
}
return 0;
@@ -84,7 +96,13 @@ pipeline_queue_worker_single_stage_burst_tx(void *arg)
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
pipeline_event_tx(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visible to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
} else {
ev[i].queue_id++;
pipeline_fwd_event(&ev[i],
@@ -121,7 +139,13 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
}
pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
- w->processed_pkts += nb_rx;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visible to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), nb_rx,
+ __ATOMIC_RELEASE);
}
return 0;
@@ -146,7 +170,13 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
if (ev.queue_id == tx_queue[ev.mbuf->port]) {
pipeline_event_tx(dev, port, &ev);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visible to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
continue;
}
@@ -180,7 +210,13 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
ev.queue_id = tx_queue[ev.mbuf->port];
rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visible to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
} else {
ev.queue_id++;
pipeline_fwd_event(&ev, sched_type_list[cq_id]);
@@ -214,7 +250,13 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
pipeline_event_tx(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visible to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
continue;
}
@@ -254,7 +296,13 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
pipeline_fwd_event(&ev[i],
RTE_SCHED_TYPE_ATOMIC);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visible to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
} else {
ev[i].queue_id++;
pipeline_fwd_event(&ev[i],