MINOR: mux-spop: Report EOI on the SE when a ACK is received for a stream
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 4 Feb 2025 09:53:20 +0000 (10:53 +0100)
committerWilly Tarreau <w@1wt.eu>
Wed, 19 Feb 2025 10:43:06 +0000 (11:43 +0100)
The spop stream now reports the end of input when the ACK is transferred to
the SPOE applet. To do so, the flag SPOP_SF_ACK_RCVD was added. It is set on
the SPOP stream when its ACK is received by the SPOP connection.

In addition when SPOP stream flags are propagated to the SE, the error is
now reported if end of input was not reached instead of testing the
connection error code. It is more accurate.

This patch should be backported to 3.1.

(cherry picked from commit d16c534511ca9e75437839bf654d45faf2992e0e)
Signed-off-by: Willy Tarreau <w@1wt.eu>

include/haproxy/mux_spop-t.h
src/mux_spop.c

index 181ef51..2e6c3c5 100644 (file)
@@ -59,7 +59,7 @@ static forceinline char *spop_conn_show_flags(char *buf, size_t len, const char
 
 /**** SPOP stream flags (32 bit), in spop_strm->flags ****/
 #define SPOP_SF_NONE           0x00000000
-// #define SPOP_SF_ACK_RCVD       0x00000001 /* ACK freme received */
+#define SPOP_SF_ACK_RCVD       0x00000001 /* ACK freme received */
 //#define SPOP_SF_ES_SENT        0x00000002 /* end-of-stream sent */
 //#define SPOP_SF_EP_SENT        0x00000004 /* end-of-param sent */
 //#define SPOP_SF_DISCON_SENT      0x00000008 /* disconnect sent */
@@ -83,7 +83,7 @@ static forceinline char *spop_strm_show_flags(char *buf, size_t len, const char
        /* prologue */
        _(0);
        /* flags */
-       _(SPOP_SF_BLK_MBUSY, _(SPOP_SF_BLK_MROOM, _(SPOP_SF_NOTIFIED)));
+       _(SPOP_SF_ACK_RCVD, _(SPOP_SF_BLK_MBUSY, _(SPOP_SF_BLK_MROOM, _(SPOP_SF_NOTIFIED))));
        /* epilogue */
        _(~0U);
        return buf;
index 5766889..04e6384 100644 (file)
@@ -1114,9 +1114,12 @@ static inline void spop_strm_close(struct spop_strm *spop_strm)
  */
 static inline void spop_strm_propagate_term_flags(struct spop_conn *spop_conn, struct spop_strm *spop_strm)
 {
+       if (spop_strm->flags & SPOP_SF_ACK_RCVD) {
+               se_fl_set(spop_strm->sd, SE_FL_EOI);
+       }
        if (spop_conn_read0_pending(spop_conn) || spop_strm->state == SPOP_SS_CLOSED) {
                se_fl_set(spop_strm->sd, SE_FL_EOS);
-               if (spop_conn->errcode)
+               if (!se_fl_test(spop_strm->sd, SE_FL_EOI))
                        se_fl_set(spop_strm->sd, SE_FL_ERROR);
        }
        if (se_fl_test(spop_strm->sd, SE_FL_ERR_PENDING))
@@ -1270,9 +1273,8 @@ static void spop_strm_wake_one_stream(struct spop_strm *spop_strm)
        }
 
        if (spop_conn->state == SPOP_CS_CLOSED || (spop_conn->flags & (SPOP_CF_ERR_PENDING|SPOP_CF_ERROR))) {
-               if (spop_conn->state == SPOP_CS_CLOSED || (spop_conn->flags & SPOP_CF_ERROR))
-                       se_fl_set(spop_strm->sd, SE_FL_EOS);
                se_fl_set_error(spop_strm->sd);
+               spop_strm_propagate_term_flags(spop_conn, spop_strm);
                if (!spop_strm->sd->abort_info.info) {
                        spop_strm->sd->abort_info.info = (SE_ABRT_SRC_MUX_SPOP << SE_ABRT_SRC_SHIFT);
                        spop_strm->sd->abort_info.code = spop_conn->errcode;
@@ -1920,6 +1922,7 @@ static int spop_conn_handle_ack(struct spop_conn *spop_conn, struct spop_strm *s
                spop_strm_close(spop_strm);
 
   end:
+       spop_strm->flags |= SPOP_SF_ACK_RCVD;
        TRACE_PROTO("SPOP AGENT ACK frame rcvd", SPOP_EV_RX_FRAME|SPOP_EV_RX_ACK, spop_conn->conn, spop_strm, 0, (size_t[]){sent});
        spop_conn->state = SPOP_CS_FRAME_H;
        TRACE_LEAVE(SPOP_EV_RX_FRAME|SPOP_EV_RX_ACK, spop_conn->conn, spop_strm);
@@ -2101,6 +2104,7 @@ static void spop_process_demux(struct spop_conn *spop_conn)
                    (b_data(&spop_strm->rxbuf) ||
                     spop_conn_read0_pending(spop_conn) ||
                     spop_strm->state == SPOP_SS_CLOSED ||
+                    (spop_strm->flags & SPOP_SF_ACK_RCVD) ||
                     se_fl_test(spop_strm->sd, SE_FL_ERROR | SE_FL_ERR_PENDING | SE_FL_EOS))) {
                        /* we may have to signal the upper layers */
                        TRACE_DEVEL("notifying stream before switching SID", SPOP_EV_RX_FRAME|SPOP_EV_STRM_WAKE, spop_conn->conn, spop_strm);
@@ -2205,6 +2209,7 @@ static void spop_process_demux(struct spop_conn *spop_conn)
            (b_data(&spop_strm->rxbuf) ||
             spop_conn_read0_pending(spop_conn) ||
             spop_strm->state == SPOP_SS_CLOSED ||
+            (spop_strm->flags & SPOP_SF_ACK_RCVD) ||
             se_fl_test(spop_strm->sd, SE_FL_ERROR | SE_FL_ERR_PENDING | SE_FL_EOS))) {
                /* we may have to signal the upper layers */
                TRACE_DEVEL("notifying stream before switching SID", SPOP_EV_RX_FRAME|SPOP_EV_STRM_WAKE, spop_conn->conn, spop_strm);