MEDIUM: ring: remove the struct buffer from the ring
authorWilly Tarreau <w@1wt.eu>
Tue, 27 Feb 2024 08:17:45 +0000 (09:17 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
The purpose is to store a head and a tail that are independent so that
we can further improve the API to update them independently from each
other.

The struct was arranged like the original one so that as long as a ring
has its head set to zero (i.e. no recycling) it will continue to work.
The new format is already detectable thanks to the "rsvd" field which
indicates the number of reserved bytes at the beginning. It's located
where the buffer's area pointer previously was, so that older versions
of haring can continue to open the ring in repair mode, and newer ones
can use the fact that the upper bits of that variable are zero to guess
that it's working with the new format instead of the old one. Also let's
keep in mind that the layout will further change to place some alignment
constraints.

The haring tool will thus updated based on this and it detects that the
rsvd field is smaller than a page and that the sum of it with the size
equals the mapped size, in which case it uses the new dump_v2() function
instead of dump_v1(). The new function also creates a buffer from the
ring's area, size, head and tail and calls the generic one so that no
other code had to be adapted.

dev/haring/haring.c
include/haproxy/ring-t.h
include/haproxy/ring.h
src/ring.c
src/sink.c

index 724e598..0a66238 100644 (file)
@@ -46,6 +46,15 @@ struct ring_v1 {
        struct buffer buf;   // storage area
 };
 
+// ring v2 format (not aligned)
+struct ring_v2 {
+       size_t size;         // storage size
+       size_t rsvd;         // header length (used for file-backed maps)
+       size_t tail;         // storage tail
+       size_t head;         // storage head
+       char area[0];        // storage area begins immediately here
+};
+
 /* display the message and exit with the code */
 __attribute__((noreturn)) void die(int code, const char *format, ...)
 {
@@ -180,6 +189,32 @@ int dump_ring_v1(struct ring_v1 *ring, size_t ofs, int flags)
        return dump_ring_as_buf(buf, ofs, flags);
 }
 
+/* This function dumps all events from the ring <ring> from offset <ofs> and
+ * with flags <flags>.
+ */
+int dump_ring_v2(struct ring_v2 *ring, size_t ofs, int flags)
+{
+       size_t size, head, tail, data;
+       struct buffer buf;
+
+       /* In ring v2 format, we have in this order:
+        *    - size
+        *    - hdr len (reserved bytes)
+        *    - tail
+        *    - head
+        * We can rebuild an equivalent buffer from these info for the function
+        * to dump.
+        */
+
+       /* Now make our own buffer pointing to that area */
+       size = ring->size;
+       head = ring->head;
+       tail = ring->tail;
+       data = (head <= tail ? 0 : size) + tail - head;
+       buf = b_make((void *)ring + ring->rsvd, size, head, data);
+       return dump_ring_as_buf(buf, ofs, flags);
+}
+
 int main(int argc, char **argv)
 {
        void *ring;
@@ -224,7 +259,11 @@ int main(int argc, char **argv)
                return 1;
        }
 
-       return dump_ring_v1(ring, 0, 0);
+       if (((struct ring_v2 *)ring)->rsvd < 4096 && // not a pointer (v1), must be ringv2's rsvd
+           ((struct ring_v2 *)ring)->rsvd + ((struct ring_v2 *)ring)->size == statbuf.st_size)
+               return dump_ring_v2(ring, 0, 0);
+       else
+               return dump_ring_v1(ring, 0, 0);
 }
 
 
index 60448a8..aa0973b 100644 (file)
 
 /* this is the mmapped part */
 struct ring_storage {
-       struct buffer buf;            // storage layout
-       char area[0];                 // storage area begins immediately here
+       size_t size;         // storage size
+       size_t rsvd;         // header length (used for file-backed maps)
+       size_t tail;         // storage tail
+       size_t head;         // storage head
+       char area[0];        // storage area begins immediately here
 };
 
 /* this is the ring definition, config, waiters etc */
index 057693c..53b0b22 100644 (file)
@@ -25,6 +25,7 @@
 #include <stdlib.h>
 #include <import/ist.h>
 #include <haproxy/ring-t.h>
+#include <haproxy/vecpair.h>
 
 struct appctx;
 
@@ -47,31 +48,32 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
 /* returns the ring storage's area */
 static inline void *ring_area(const struct ring *ring)
 {
-       return b_orig(&ring->storage->buf);
+       return ring->storage->area;
 }
 
 /* returns the number of bytes in the ring */
 static inline size_t ring_data(const struct ring *ring)
 {
-       return b_data(&ring->storage->buf);
+       return ((ring->storage->head <= ring->storage->tail) ?
+               0 : ring->storage->size) + ring->storage->tail - ring->storage->head;
 }
 
 /* returns the allocated size in bytes for the ring */
 static inline size_t ring_size(const struct ring *ring)
 {
-       return b_size(&ring->storage->buf);
+       return ring->storage->size;
 }
 
 /* returns the head offset of the ring */
 static inline size_t ring_head(const struct ring *ring)
 {
-       return b_head_ofs(&ring->storage->buf);
+       return ring->storage->head;
 }
 
 /* returns the tail offset of the ring */
 static inline size_t ring_tail(const struct ring *ring)
 {
-       return b_tail_ofs(&ring->storage->buf);
+       return ring->storage->tail;
 }
 
 /* duplicates ring <src> over ring <dst> for no more than <max> bytes or no
@@ -81,11 +83,16 @@ static inline size_t ring_tail(const struct ring *ring)
  */
 static inline size_t ring_dup(struct ring *dst, const struct ring *src, size_t max)
 {
+       struct ist v1, v2;
+
+       vp_ring_to_data(&v1, &v2, ring_area(src), ring_size(src), ring_head(src), ring_tail(src));
+
        if (max > ring_data(src))
                max = ring_data(src);
 
-       b_reset(&dst->storage->buf);
-       b_ncat(&dst->storage->buf, &src->storage->buf, max);
+       vp_peek_ofs(v1, v2, 0, ring_area(dst), max);
+       dst->storage->head = 0;
+       dst->storage->tail = max;
        return max;
 }
 
index 442e98a..13d75df 100644 (file)
@@ -50,15 +50,16 @@ void ring_init(struct ring *ring, void *area, size_t size, int reset)
        ring->readers_count = 0;
        ring->flags = 0;
        ring->storage = area;
-       ring->storage->buf.area = ring->storage->area;
 
        if (reset) {
-               ring->storage->buf = b_make(ring->storage->area,
-                                           size - sizeof(*ring->storage),
-                                           0, 0);
+               ring->storage->size = size - sizeof(*ring->storage);
+               ring->storage->rsvd = sizeof(*ring->storage);
+               ring->storage->head = 0;
+               ring->storage->tail = 0;
 
                /* write the initial RC byte */
-               b_putchr(&ring->storage->buf, 0);
+               *ring->storage->area = 0;
+               ring->storage->tail = 1;
        }
 }
 
@@ -114,11 +115,12 @@ struct ring *ring_new(size_t size)
  */
 struct ring *ring_resize(struct ring *ring, size_t size)
 {
-       struct ring_storage *new;
+       struct ring_storage *old, *new;
 
        if (size <= ring_data(ring) + sizeof(*ring->storage))
                return ring;
 
+       old = ring->storage;
        new = malloc(size);
        if (!new)
                return NULL;
@@ -128,11 +130,17 @@ struct ring *ring_resize(struct ring *ring, size_t size)
        /* recheck the ring's size, it may have changed during the malloc */
        if (size > ring_data(ring) + sizeof(*ring->storage)) {
                /* copy old contents */
-               new->buf = b_make(new->area, size - sizeof(*ring->storage), 0, 0);
-               b_getblk(&ring->storage->buf, new->area, ring_data(ring), 0);
-               new->buf.data = ring_data(ring);
+               struct ist v1, v2;
+               size_t len;
+
+               vp_ring_to_data(&v1, &v2, old->area, old->size, old->head, old->tail);
+               len = vp_size(v1, v2);
+               vp_peek_ofs(v1, v2, 0, new->area, len);
+               new->size = size - sizeof(*ring->storage);
+               new->rsvd = sizeof(*ring->storage);
+               new->head = 0;
+               new->tail = len;
                new = HA_ATOMIC_XCHG(&ring->storage, new);
-               /* new is now the old one */
        }
 
        thread_release();
@@ -164,7 +172,6 @@ void ring_free(struct ring *ring)
  */
 ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
 {
-       struct buffer *buf = &ring->storage->buf;
        size_t head_ofs, tail_ofs;
        size_t ring_size;
        char *ring_area;
@@ -210,15 +217,15 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
        /* these ones do not change under us (only resize affects them and it
         * must be done under thread isolation).
         */
-       ring_area = b_orig(buf);
-       ring_size = b_size(buf);
+       ring_area = ring->storage->area;
+       ring_size = ring->storage->size;
 
        HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
        if (needed + 1 > ring_size)
                goto leave;
 
-       head_ofs = b_head_ofs(buf);
-       tail_ofs = b_tail_ofs(buf);
+       head_ofs = ring_head(ring);
+       tail_ofs = ring_tail(ring);
 
        vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
 
@@ -281,8 +288,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
 
  done_update_buf:
        /* update the new space in the buffer */
-       buf->head = head_ofs;
-       buf->data = ((tail_ofs >= head_ofs) ? 0 : ring_size) + tail_ofs - head_ofs;
+       ring->storage->head = head_ofs;
+       ring->storage->tail = tail_ofs;
 
        /* notify potential readers */
        if (sent) {
@@ -373,7 +380,6 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
 int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
                           ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
 {
-       struct buffer *buf = &ring->storage->buf;
        size_t head_ofs, tail_ofs;
        size_t ring_size;
        char *ring_area;
@@ -383,13 +389,13 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
        ssize_t copied;
        int ret;
 
-       ring_area = b_orig(buf);
-       ring_size = b_size(buf);
+       ring_area = ring->storage->area;
+       ring_size = ring->storage->size;
 
        HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
 
-       head_ofs = b_head_ofs(buf);
-       tail_ofs = b_tail_ofs(buf);
+       head_ofs = ring->storage->head;
+       tail_ofs = ring->storage->tail;
 
        /* explanation for the initialization below: it would be better to do
         * this in the parsing function but this would occasionally result in
index 7225ddd..d43f7fb 100644 (file)
@@ -666,10 +666,14 @@ void sink_rotate_file_backed_ring(const char *name)
        if (ret != sizeof(storage))
                goto rotate;
 
+       /* check that it's the expected format before touching it */
+       if (storage.rsvd != sizeof(storage))
+               return;
+
        /* contents are present, we want to keep them => rotate. Note that
         * an empty ring buffer has one byte (the marker).
         */
-       if (storage.buf.data > 1)
+       if (storage.head != 0 || storage.tail != 1)
                goto rotate;
 
        /* nothing to keep, let's scratch the file and preserve the backup */