MEDIUM: stconn/channel: Move pipes used for the splicing in the SE descriptors
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 22 Jun 2023 09:39:29 +0000 (11:39 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 17 Oct 2023 16:51:13 +0000 (18:51 +0200)
The pipes used to put data when the kernel splicing is in used are moved in
the SE descriptors. For now, it is just a simple remplacement but there is a
major difference with the pipes in the channel. The data are pushed in the
consumer's pipe while it was pushed in the producer's pipe. So it means the
request data are now pushed in the pipe of the backend SE descriptor and
response data are pushed in the pipe of the frontend SE descriptor.

The idea is to hide the pipe from the channel/SC side and to be able to
handle fast-forwading in pipe but also in buffer. To do so, the pipe is
inside a new entity, called iobuf. This entity will be extended.

include/haproxy/channel-t.h
include/haproxy/channel.h
include/haproxy/stconn-t.h
src/applet.c
src/stconn.c
src/stream.c

index f876d91..8629f5f 100644 (file)
@@ -202,7 +202,6 @@ struct channel {
        unsigned int flags;             /* CF_* */
        unsigned int analysers;         /* bit field indicating what to do on the channel */
        struct buffer buf;              /* buffer attached to the channel, always present but may move */
-       struct pipe *pipe;              /* non-NULL only when data present */
        size_t output;                  /* part of buffer which is to be forwarded */
        unsigned int to_forward;        /* number of bytes to forward after out without a wake-up */
        unsigned short last_read;       /* 16 lower bits of last read date (max pause=65s) */
index 36199b1..154017b 100644 (file)
@@ -323,7 +323,6 @@ static inline void channel_init(struct channel *chn)
        chn->last_read = now_ms;
        chn->xfer_small = chn->xfer_large = 0;
        chn->total = 0;
-       chn->pipe = NULL;
        chn->analysers = 0;
        chn->flags = 0;
        chn->output = 0;
@@ -404,13 +403,13 @@ static inline void channel_htx_forward_forever(struct channel *chn, struct htx *
 /*********************************************************************/
 
 /* Reports non-zero if the channel is empty, which means both its
- * buffer and pipe are empty. The construct looks strange but is
- * jump-less and much more efficient on both 32 and 64-bit than
- * the boolean test.
+ * buffer and pipe on the opposite SE are empty. The construct looks
+ * strange but is jump-less and much more efficient on both 32 and
+ * 64-bit than the boolean test.
  */
 static inline unsigned int channel_is_empty(const struct channel *c)
 {
-       return !(co_data(c) | (long)c->pipe);
+       return !(co_data(c) | (long)chn_cons(c)->sedesc->iobuf.pipe);
 }
 
 /* Returns non-zero if the channel is rewritable, which means that the buffer
index e507e1e..1d9888d 100644 (file)
 
 #include <haproxy/obj_type-t.h>
 #include <haproxy/connection-t.h>
+#include <haproxy/pipe-t.h>
 #include <haproxy/show_flags-t.h>
 #include <haproxy/xref-t.h>
 
+enum iobuf_flags {
+       IOBUF_FL_NONE             = 0x00000000, /* For initialization purposes */
+};
+
+struct iobuf {
+       struct pipe *pipe;     /* non-NULL only when data present */
+       unsigned int flags;
+};
+
 /* Stream Endpoint Flags.
  * Please also update the se_show_flags() function below in case of changes.
  */
@@ -246,11 +256,13 @@ struct stconn;
 
  * <fsb> should be updated when the first send of a series is blocked and reset
  *       when a successful send is reported.
+ *
  */
 struct sedesc {
        void *se;                  /* the stream endpoint, i.e. the mux stream or the appctx */
        struct connection *conn;   /* the connection for connection-based streams */
        struct stconn *sc;         /* the stream connector we're attached to, or NULL */
+       struct iobuf iobuf;        /* contains data forwarded by the other side and that must be sent by the stream endpoint */
        unsigned int flags;        /* SE_FL_* */
        unsigned int lra;          /* the last read activity */
        unsigned int fsb;          /* the first send blocked */
index cdcbc25..95fc903 100644 (file)
@@ -383,7 +383,7 @@ int appctx_buf_available(void *arg)
        sc_have_buff(sc);
 
        /* was already allocated another way ? if so, don't take this one */
-       if (c_size(sc_ic(sc)) || sc_ic(sc)->pipe)
+       if (c_size(sc_ic(sc)) || sc_opposite(sc)->sedesc->iobuf.pipe)
                return 0;
 
        /* allocation possible now ? */
index fbfebc7..89face2 100644 (file)
@@ -97,6 +97,9 @@ void sedesc_init(struct sedesc *sedesc)
        sedesc->fsb = TICK_ETERNITY;
        sedesc->xref.peer = NULL;
        se_fl_setall(sedesc, SE_FL_NONE);
+
+       sedesc->iobuf.pipe = NULL;
+       sedesc->iobuf.flags = IOBUF_FL_NONE;
 }
 
 /* Tries to alloc an endpoint and initialize it. Returns NULL on failure. */
@@ -117,7 +120,11 @@ struct sedesc *sedesc_new()
  */
 void sedesc_free(struct sedesc *sedesc)
 {
-       pool_free(pool_head_sedesc, sedesc);
+       if (sedesc) {
+               if (sedesc->iobuf.pipe)
+                       put_pipe(sedesc->iobuf.pipe);
+               pool_free(pool_head_sedesc, sedesc);
+       }
 }
 
 /* Tries to allocate a new stconn and initialize its main fields. On
@@ -622,9 +629,7 @@ static void sc_app_shut(struct stconn *sc)
 /* default chk_rcv function for scheduled tasks */
 static void sc_app_chk_rcv(struct stconn *sc)
 {
-       struct channel *ic = sc_ic(sc);
-
-       if (ic->pipe) {
+       if (sc_opposite(sc)->sedesc->iobuf.pipe) {
                /* stop reading */
                sc_need_room(sc, -1);
        }
@@ -795,7 +800,7 @@ static void sc_app_chk_snd_conn(struct stconn *sc)
        if (unlikely(channel_is_empty(oc)))  /* called with nothing to send ! */
                return;
 
-       if (!oc->pipe &&                          /* spliced data wants to be forwarded ASAP */
+       if (!sc->sedesc->iobuf.pipe &&                    /* spliced data wants to be forwarded ASAP */
            !sc_ep_test(sc, SE_FL_WAIT_DATA))       /* not waiting for data */
                return;
 
@@ -939,11 +944,9 @@ static void sc_app_shut_applet(struct stconn *sc)
 /* chk_rcv function for applets */
 static void sc_app_chk_rcv_applet(struct stconn *sc)
 {
-       struct channel *ic = sc_ic(sc);
-
        BUG_ON(!sc_appctx(sc));
 
-       if (!ic->pipe) {
+       if (!sc_opposite(sc)->sedesc->iobuf.pipe) {
                /* (re)start reading */
                appctx_wakeup(__sc_appctx(sc));
        }
@@ -1087,18 +1090,18 @@ static void sc_notify(struct stconn *sc)
         */
        if (!channel_is_empty(ic) &&
            sc_ep_test(sco, SE_FL_WAIT_DATA) &&
-           (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) {
+           (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || sco->sedesc->iobuf.pipe)) {
                int new_len, last_len;
 
                last_len = co_data(ic);
-               if (ic->pipe)
-                       last_len += ic->pipe->data;
+               if (sco->sedesc->iobuf.pipe)
+                       last_len += sco->sedesc->iobuf.pipe->data;
 
                sc_chk_snd(sco);
 
                new_len = co_data(ic);
-               if (ic->pipe)
-                       new_len += ic->pipe->data;
+               if (sco->sedesc->iobuf.pipe)
+                       new_len += sco->sedesc->iobuf.pipe->data;
 
                /* check if the consumer has freed some space either in the
                 * buffer or in the pipe.
@@ -1263,7 +1266,7 @@ static int sc_conn_recv(struct stconn *sc)
         * using a buffer.
         */
        if (sc_ep_test(sc, SE_FL_MAY_SPLICE) &&
-           (ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
+           (sc_opposite(sc)->sedesc->iobuf.pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
            ic->flags & CF_KERN_SPLICING) {
                if (c_data(ic)) {
                        /* We're embarrassed, there are already data pending in
@@ -1275,14 +1278,14 @@ static int sc_conn_recv(struct stconn *sc)
                        goto abort_splice;
                }
 
-               if (unlikely(ic->pipe == NULL)) {
-                       if (pipes_used >= global.maxpipes || !(ic->pipe = get_pipe())) {
+               if (unlikely(sc_opposite(sc)->sedesc->iobuf.pipe == NULL)) {
+                       if (pipes_used >= global.maxpipes || !(sc_opposite(sc)->sedesc->iobuf.pipe = get_pipe())) {
                                ic->flags &= ~CF_KERN_SPLICING;
                                goto abort_splice;
                        }
                }
 
-               ret = conn->mux->rcv_pipe(sc, ic->pipe, ic->to_forward);
+               ret = conn->mux->rcv_pipe(sc, sc_opposite(sc)->sedesc->iobuf.pipe, ic->to_forward);
                if (ret < 0) {
                        /* splice not supported on this end, let's disable it */
                        ic->flags &= ~CF_KERN_SPLICING;
@@ -1312,12 +1315,13 @@ static int sc_conn_recv(struct stconn *sc)
        }
 
  abort_splice:
-       if (ic->pipe && unlikely(!ic->pipe->data)) {
-               put_pipe(ic->pipe);
-               ic->pipe = NULL;
+       if (sc_opposite(sc)->sedesc->iobuf.pipe && unlikely(!sc_opposite(sc)->sedesc->iobuf.pipe->data)) {
+               put_pipe(sc_opposite(sc)->sedesc->iobuf.pipe);
+               sc_opposite(sc)->sedesc->iobuf.pipe = NULL;
        }
 
-       if (ic->pipe && ic->to_forward && !(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_SPLICE)) {
+       if (sc_opposite(sc)->sedesc->iobuf.pipe && ic->to_forward &&
+           !(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_SPLICE)) {
                /* don't break splicing by reading, but still call rcv_buf()
                 * to pass the flag.
                 */
@@ -1597,17 +1601,17 @@ static int sc_conn_send(struct stconn *sc)
        if (!conn->mux)
                return 0;
 
-       if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
-               ret = conn->mux->snd_pipe(sc, oc->pipe);
+       if (sc->sedesc->iobuf.pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
+               ret = conn->mux->snd_pipe(sc, sc->sedesc->iobuf.pipe);
                if (ret > 0)
                        did_send = 1;
 
-               if (!oc->pipe->data) {
-                       put_pipe(oc->pipe);
-                       oc->pipe = NULL;
+               if (!sc->sedesc->iobuf.pipe->data) {
+                       put_pipe(sc->sedesc->iobuf.pipe);
+                       sc->sedesc->iobuf.pipe = NULL;
                }
 
-               if (oc->pipe)
+               if (sc->sedesc->iobuf.pipe)
                        goto end;
        }
 
index 45a0ad7..4c300a3 100644 (file)
@@ -320,10 +320,10 @@ int stream_buf_available(void *arg)
 {
        struct stream *s = arg;
 
-       if (!s->req.buf.size && !s->req.pipe && s->scf->flags & SC_FL_NEED_BUFF &&
+       if (!s->req.buf.size && !s->scb->sedesc->iobuf.pipe && s->scf->flags & SC_FL_NEED_BUFF &&
            b_alloc(&s->req.buf))
                sc_have_buff(s->scf);
-       else if (!s->res.buf.size && !s->res.pipe && s->scb->flags & SC_FL_NEED_BUFF &&
+       else if (!s->res.buf.size && !s->scf->sedesc->iobuf.pipe && s->scb->flags & SC_FL_NEED_BUFF &&
                 b_alloc(&s->res.buf))
                sc_have_buff(s->scb);
        else
@@ -631,12 +631,6 @@ void stream_free(struct stream *s)
                sess_change_server(s, NULL);
        }
 
-       if (s->req.pipe)
-               put_pipe(s->req.pipe);
-
-       if (s->res.pipe)
-               put_pipe(s->res.pipe);
-
        /* We may still be present in the buffer wait queue */
        if (LIST_INLIST(&s->buffer_wait.list))
                LIST_DEL_INIT(&s->buffer_wait.list);
@@ -3419,7 +3413,7 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch
                     pfx,
                     &strm->req,
                     strm->req.flags, strm->req.analysers,
-                    strm->req.pipe ? strm->req.pipe->data : 0,
+                    strm->scb->sedesc->iobuf.pipe ? strm->scb->sedesc->iobuf.pipe->data : 0,
                     strm->req.to_forward, strm->req.total,
                     pfx,
                     strm->req.analyse_exp ?
@@ -3452,7 +3446,7 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch
                     pfx,
                     &strm->res,
                     strm->res.flags, strm->res.analysers,
-                    strm->res.pipe ? strm->res.pipe->data : 0,
+                    strm->scf->sedesc->iobuf.pipe ? strm->scf->sedesc->iobuf.pipe->data : 0,
                     strm->res.to_forward, strm->res.total,
                     pfx,
                     strm->res.analyse_exp ?