int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
{
- size_t head_ofs, tail_ofs;
+ size_t head_ofs, tail_ofs, prev_ofs;
size_t ring_size;
uint8_t *ring_area;
struct ist v1, v2;
head_ofs = *ofs_ptr;
BUG_ON(head_ofs >= ring_size);
- /* dec readers count */
- do {
- readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
- } while ((readers > RING_MAX_READERS ||
- !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax());
+ /* we keep track of where we were and we don't release it before
+ * we've protected the next place.
+ */
+ prev_ofs = head_ofs;
/* in this loop, head_ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs);
- /* inc readers count */
+ /* inc readers count on new place */
do {
readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
} while ((readers > RING_MAX_READERS ||
!_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
+ /* dec readers count on old place */
+ do {
+ readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs);
+ } while ((readers > RING_MAX_READERS ||
+ !_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax());
+
if (last_ofs_ptr)
*last_ofs_ptr = tail_ofs;
*ofs_ptr = head_ofs;