MEDIUM: threads/queue: Make queues thread-safe
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 27 Jun 2017 13:43:53 +0000 (15:43 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:32 +0000 (13:58 +0100)
The list of pending connections are now protected using the proxy or server
lock, depending on the context.

src/queue.c

index 95b8eda..93d3e94 100644 (file)
@@ -13,6 +13,7 @@
 #include <common/config.h>
 #include <common/memory.h>
 #include <common/time.h>
+#include <common/hathreads.h>
 
 #include <proto/queue.h>
 #include <proto/server.h>
@@ -23,6 +24,8 @@
 
 struct pool_head *pool2_pendconn;
 
+static void __pendconn_free(struct pendconn *p);
+
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_pendconn()
 {
@@ -116,7 +119,7 @@ static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *p
                        ps = pp;
        }
        strm = ps->strm;
-       pendconn_free(ps);
+       __pendconn_free(ps);
 
        /* we want to note that the stream has now been assigned a server */
        strm->flags |= SF_ASSIGNED;
@@ -139,10 +142,12 @@ void process_srv_queue(struct server *s)
        struct proxy  *p = s->proxy;
        int maxconn;
 
+       SPIN_LOCK(PROXY_LOCK,  &p->lock);
+       SPIN_LOCK(SERVER_LOCK, &s->lock);
+
        /* First, check if we can handle some connections queued at the proxy. We
         * will take as many as we can handle.
         */
-
        maxconn = srv_dynamic_maxconn(s);
        while (s->served < maxconn) {
                struct stream *strm = pendconn_get_next_strm(s, p);
@@ -151,6 +156,8 @@ void process_srv_queue(struct server *s)
                        break;
                task_wakeup(strm->task, TASK_WOKEN_RES);
        }
+       SPIN_UNLOCK(SERVER_LOCK, &s->lock);
+       SPIN_UNLOCK(PROXY_LOCK,  &p->lock);
 }
 
 /* Adds the stream <strm> to the pending connection list of server <strm>->srv
@@ -163,6 +170,7 @@ struct pendconn *pendconn_add(struct stream *strm)
 {
        struct pendconn *p;
        struct server *srv;
+       int count;
 
        p = pool_alloc2(pool2_pendconn);
        if (!p)
@@ -170,21 +178,26 @@ struct pendconn *pendconn_add(struct stream *strm)
 
        strm->pend_pos = p;
        p->strm = strm;
-       p->srv = srv = objt_server(strm->target);
+       srv = objt_server(strm->target);
 
-       if (strm->flags & SF_ASSIGNED && srv) {
+       if ((strm->flags & SF_ASSIGNED) && srv) {
+               p->srv = srv;
+               SPIN_LOCK(SERVER_LOCK, &srv->lock);
                LIST_ADDQ(&srv->pendconns, &p->list);
-               srv->nbpend++;
-               strm->logs.srv_queue_size += srv->nbpend;
-               if (srv->nbpend > srv->counters.nbpend_max)
-                       srv->counters.nbpend_max = srv->nbpend;
+               SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
+               count = HA_ATOMIC_ADD(&srv->nbpend, 1);
+               strm->logs.srv_queue_size += count;
+               HA_ATOMIC_UPDATE_MAX(&srv->counters.nbpend_max, count);
        } else {
+               p->srv = NULL;
+               SPIN_LOCK(PROXY_LOCK, &strm->be->lock);
                LIST_ADDQ(&strm->be->pendconns, &p->list);
-               strm->be->nbpend++;
-               strm->logs.prx_queue_size += strm->be->nbpend;
-               HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, strm->be->nbpend);
+               SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock);
+               count = HA_ATOMIC_ADD(&strm->be->nbpend, 1);
+               strm->logs.prx_queue_size += count;
+               HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, count);
        }
-       strm->be->totpend++;
+       HA_ATOMIC_ADD(&strm->be->totpend, 1);
        return p;
 }
 
@@ -196,6 +209,7 @@ int pendconn_redistribute(struct server *s)
        struct pendconn *pc, *pc_bck;
        int xferred = 0;
 
+       SPIN_LOCK(SERVER_LOCK, &s->lock);
        list_for_each_entry_safe(pc, pc_bck, &s->pendconns, list) {
                struct stream *strm = pc->strm;
 
@@ -208,11 +222,12 @@ int pendconn_redistribute(struct server *s)
                        /* it's left to the dispatcher to choose a server */
                        strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
 
-                       pendconn_free(pc);
+                       __pendconn_free(pc);
                        task_wakeup(strm->task, TASK_WOKEN_RES);
                        xferred++;
                }
        }
+       SPIN_UNLOCK(SERVER_LOCK, &s->lock);
        return xferred;
 }
 
@@ -228,6 +243,7 @@ int pendconn_grab_from_px(struct server *s)
        if (!srv_currently_usable(s))
                return 0;
 
+       SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
        for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) {
                struct stream *strm;
                struct pendconn *p;
@@ -237,9 +253,10 @@ int pendconn_grab_from_px(struct server *s)
                        break;
                p->strm->target = &s->obj_type;
                strm = p->strm;
-               pendconn_free(p);
+               __pendconn_free(p);
                task_wakeup(strm->task, TASK_WOKEN_RES);
        }
+       SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock);
        return xferred;
 }
 
@@ -250,16 +267,38 @@ int pendconn_grab_from_px(struct server *s)
  */
 void pendconn_free(struct pendconn *p)
 {
-       LIST_DEL(&p->list);
+       if (p->srv) {
+               SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
+               LIST_DEL(&p->list);
+               SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
+               HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+       }
+       else {
+               SPIN_LOCK(SERVER_LOCK, &p->strm->be->lock);
+               LIST_DEL(&p->list);
+               SPIN_UNLOCK(SERVER_LOCK, &p->strm->be->lock);
+               HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+       }
        p->strm->pend_pos = NULL;
-       if (p->srv)
-               p->srv->nbpend--;
-       else
-               p->strm->be->nbpend--;
-       p->strm->be->totpend--;
+       HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
        pool_free2(pool2_pendconn, p);
 }
 
+/* Lock-free version of pendconn_free. */
+static void __pendconn_free(struct pendconn *p)
+{
+       if (p->srv) {
+               LIST_DEL(&p->list);
+               HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+       }
+       else {
+               LIST_DEL(&p->list);
+               HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+       }
+       p->strm->pend_pos = NULL;
+       HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
+       pool_free2(pool2_pendconn, p);
+}
 
 /*
  * Local variables: