MEDIUM: ring: protect the reader's positions against writers
authorWilly Tarreau <w@1wt.eu>
Wed, 28 Feb 2024 16:18:34 +0000 (17:18 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
The reader now needs to protect the positions it's reading. This is
already done via the readers counter at the beginning of messages,
but as long as the lock is present, this counter is decremented
before starting to parse messages, and incremented at the end.

We must now do that in reverse, first protect the end of the messages,
and only then remove ourselves from the already processed messages, so
that at no point could a writer pass over and possibly overwrite data
we're currently watching.

src/ring.c

index d798f27..f56c8f1 100644 (file)
@@ -402,7 +402,7 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
 int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
                           ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
 {
-       size_t head_ofs, tail_ofs;
+       size_t head_ofs, tail_ofs, prev_ofs;
        size_t ring_size;
        uint8_t *ring_area;
        struct ist v1, v2;
@@ -450,11 +450,10 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
        head_ofs = *ofs_ptr;
        BUG_ON(head_ofs >= ring_size);
 
-       /* dec readers count */
-       do {
-               readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
-       } while ((readers > RING_MAX_READERS ||
-                 !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax());
+       /* we keep track of where we were and we don't release it before
+        * we've protected the next place.
+        */
+       prev_ofs = head_ofs;
 
        /* in this loop, head_ofs always points to the counter byte that precedes
         * the message so that we can take our reference there if we have to
@@ -494,12 +493,18 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
 
        vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs);
 
-       /* inc readers count */
+       /* inc readers count on new place */
        do {
                readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
        } while ((readers > RING_MAX_READERS ||
                  !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
 
+       /* dec readers count on old place */
+       do {
+               readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs);
+       } while ((readers > RING_MAX_READERS ||
+                 !_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax());
+
        if (last_ofs_ptr)
                *last_ofs_ptr = tail_ofs;
        *ofs_ptr = head_ofs;