MINOR: task: provide 3 task_new_* wrappers to simplify the API
authorWilly Tarreau <w@1wt.eu>
Fri, 1 Oct 2021 16:23:30 +0000 (18:23 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 1 Oct 2021 16:36:29 +0000 (18:36 +0200)
We'll need to improve the API to pass other arguments in the future, so
let's start to adapt better to the current use cases. task_new() is used:
  - 18 times as task_new(tid_bit)
  - 18 times as task_new(MAX_THREADS_MASK)
  - 2 times with a single bit (in a loop)
  - 1 in the debug code that uses a mask

This patch provides 3 new functions to achieve this:
  - task_new_here()     to create a task on the calling thread
  - task_new_anywhere() to create a task to be run anywhere
  - task_new_on()       to create a task to run on a specific thread

The change is trivial and will allow us to later concentrate the
required adaptations to these 3 functions only. It's still possible
to call task_new() if needed but a comment was added to encourage the
use of the new ones instead. The debug code was not changed and still
uses it.

23 files changed:
include/haproxy/applet.h
include/haproxy/task.h
src/cfgparse.c
src/check.c
src/connection.c
src/dns.c
src/flt_spoe.c
src/hlua.c
src/listener.c
src/mailers.c
src/mux_fcgi.c
src/mux_h1.c
src/mux_h2.c
src/mux_quic.c
src/peers.c
src/proxy.c
src/resolvers.c
src/server.c
src/session.c
src/sink.c
src/stick_table.c
src/stream.c
src/xprt_quic.c

index 717e017..97b9c34 100644 (file)
@@ -68,7 +68,7 @@ static inline struct appctx *appctx_new(struct applet *applet)
                appctx->obj_type = OBJ_TYPE_APPCTX;
                appctx->applet = applet;
                appctx_init(appctx);
-               appctx->t = task_new(tid_bit);
+               appctx->t = task_new_here();
                if (unlikely(appctx->t == NULL)) {
                        pool_free(pool_head_appctx, appctx);
                        return NULL;
index aa9e3b2..7b9b4e6 100644 (file)
@@ -462,8 +462,9 @@ static inline struct tasklet *tasklet_new(void)
 
 /*
  * Allocate and initialise a new task. The new task is returned, or NULL in
- * case of lack of memory. The task count is incremented. Tasks should only
- * be allocated this way, and must be freed using task_free().
+ * case of lack of memory. The task count is incremented. This API might change
+ * in the near future, so prefer one of the task_new_*() wrappers below which
+ * are usually more suitable. Tasks must be freed using task_free().
  */
 static inline struct task *task_new(unsigned long thread_mask)
 {
@@ -475,6 +476,33 @@ static inline struct task *task_new(unsigned long thread_mask)
        return t;
 }
 
+/* Allocate and initialize a new task, to run on global thread <thr>. The new
+ * task is returned, or NULL in case of lack of memory. It's up to the caller
+ * to pass a valid thread number (in tid space, 0 to nbthread-1). The task
+ * count is incremented.
+ */
+static inline struct task *task_new_on(uint thr)
+{
+       return task_new(1UL << thr);
+}
+
+/* Allocate and initialize a new task, to run on the calling thread. The new
+ * task is returned, or NULL in case of lack of memory. The task count is
+ * incremented.
+ */
+static inline struct task *task_new_here()
+{
+       return task_new(tid_bit);
+}
+
+/* Allocate and initialize a new task, to run on any thread. The new task is
+ * returned, or NULL in case of lack of memory. The task count is incremented.
+ */
+static inline struct task *task_new_anywhere()
+{
+       return task_new(MAX_THREADS_MASK);
+}
+
 /*
  * Free a task. Its context must have been freed since it will be lost. The
  * task count is decremented. It it is the current task, this one is reset.
index 250e4ed..2ef62af 100644 (file)
@@ -3680,7 +3680,7 @@ out_uri_auth_compat:
                }
        }
 
-       idle_conn_task = task_new(MAX_THREADS_MASK);
+       idle_conn_task = task_new_anywhere();
        if (!idle_conn_task) {
                ha_alert("parsing : failed to allocate global idle connection task.\n");
                cfgerr++;
@@ -3690,7 +3690,7 @@ out_uri_auth_compat:
                idle_conn_task->context = NULL;
 
                for (i = 0; i < global.nbthread; i++) {
-                       idle_conns[i].cleanup_task = task_new(1UL << i);
+                       idle_conns[i].cleanup_task = task_new_on(i);
                        if (!idle_conns[i].cleanup_task) {
                                ha_alert("parsing : failed to allocate idle connection tasks for thread '%d'.\n", i);
                                cfgerr++;
@@ -3769,7 +3769,7 @@ out_uri_auth_compat:
                }
 
                /* create the task associated with the proxy */
-               curproxy->task = task_new(MAX_THREADS_MASK);
+               curproxy->task = task_new_anywhere();
                if (curproxy->task) {
                        curproxy->task->context = curproxy;
                        curproxy->task->process = manage_proxy;
index aedbed1..9ac66a5 100644 (file)
@@ -1388,13 +1388,14 @@ int start_check_task(struct check *check, int mininter,
                            int nbcheck, int srvpos)
 {
        struct task *t;
-       unsigned long thread_mask = MAX_THREADS_MASK;
 
+       /* task for the check. Process-based checks exclusively run on thread 1. */
        if (check->type == PR_O2_EXT_CHK)
-               thread_mask = 1;
+               t = task_new_on(1);
+       else
+               t = task_new_anywhere();
 
-       /* task for the check */
-       if ((t = task_new(thread_mask)) == NULL) {
+       if (!t) {
                ha_alert("Starting [%s:%s] check: out of memory.\n",
                         check->server->proxy->id, check->server->id);
                return 0;
index bf8c60e..a4a8a8b 100644 (file)
@@ -1686,7 +1686,7 @@ static struct task *mux_stopping_process(struct task *t, void *ctx, unsigned int
 static int allocate_mux_cleanup(void)
 {
        /* allocates the thread bound mux_stopping_data task */
-       mux_stopping_data[tid].task = task_new(tid_bit);
+       mux_stopping_data[tid].task = task_new_here();
        if (!mux_stopping_data[tid].task) {
                ha_alert("Failed to allocate the task for connection cleanup on thread %d.\n", tid);
                return 0;
index bc3310a..fa6f2b9 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
@@ -1027,7 +1027,7 @@ struct dns_session *dns_session_new(struct dns_stream_server *dss)
        /* never fail because it is the first watcher attached to the ring */
        DISGUISE(ring_attach(&ds->ring));
 
-       if ((ds->task_exp = task_new(tid_bit)) == NULL)
+       if ((ds->task_exp = task_new_here()) == NULL)
                goto error;
 
        ds->task_exp->process = dns_process_query_exp;
@@ -1223,7 +1223,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
                goto out;
        }
        /* Create the task associated to the resolver target handling conns */
-       if ((dss->task_req = task_new(MAX_THREADS_MASK)) == NULL) {
+       if ((dss->task_req = task_new_anywhere()) == NULL) {
                ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
                goto out;
        }
@@ -1240,7 +1240,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
        }
 
        /* Create the task associated to the resolver target handling conns */
-       if ((dss->task_rsp = task_new(MAX_THREADS_MASK)) == NULL) {
+       if ((dss->task_rsp = task_new_anywhere()) == NULL) {
                ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
                goto out;
        }
@@ -1250,7 +1250,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
        dss->task_rsp->context = ns;
 
        /* Create the task associated to the resolver target handling conns */
-       if ((dss->task_idle = task_new(MAX_THREADS_MASK)) == NULL) {
+       if ((dss->task_idle = task_new_anywhere()) == NULL) {
                ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
                goto out;
        }
index 70aa869..3262fd0 100644 (file)
@@ -1998,7 +1998,7 @@ spoe_create_appctx(struct spoe_config *conf)
                goto out_free_appctx;
 
        appctx->st0 = SPOE_APPCTX_ST_CONNECT;
-       if ((SPOE_APPCTX(appctx)->task = task_new(tid_bit)) == NULL)
+       if ((SPOE_APPCTX(appctx)->task = task_new_here()) == NULL)
                goto out_free_spoe_appctx;
 
        SPOE_APPCTX(appctx)->owner           = appctx;
index df46349..baf503b 100644 (file)
@@ -8251,9 +8251,9 @@ static int hlua_register_task(lua_State *L)
         * otherwise, inherit the current thread identifier
         */
        if (state_id == 0)
-               task = task_new(MAX_THREADS_MASK);
+               task = task_new_anywhere();
        else
-               task = task_new(tid_bit);
+               task = task_new_here();
        if (!task)
                goto alloc_error;
 
@@ -8941,7 +8941,7 @@ static int hlua_applet_tcp_init(struct appctx *ctx, struct proxy *px, struct str
        ctx->ctx.hlua_apptcp.flags = 0;
 
        /* Create task used by signal to wakeup applets. */
-       task = task_new(tid_bit);
+       task = task_new_here();
        if (!task) {
                SEND_ERR(px, "Lua applet tcp '%s': out of memory.\n",
                         ctx->rule->arg.hlua_rule->fcn->name);
@@ -9134,7 +9134,7 @@ static int hlua_applet_http_init(struct appctx *ctx, struct proxy *px, struct st
                ctx->ctx.hlua_apphttp.flags |= APPLET_HTTP11;
 
        /* Create task used by signal to wakeup applets. */
-       task = task_new(tid_bit);
+       task = task_new_here();
        if (!task) {
                SEND_ERR(px, "Lua applet http '%s': out of memory.\n",
                         ctx->rule->arg.hlua_rule->fcn->name);
@@ -9753,7 +9753,7 @@ static int hlua_cli_parse_fct(char **args, char *payload, struct appctx *appctx,
         * We use the same wakeup function than the Lua applet_tcp and
         * applet_http. It is absolutely compatible.
         */
-       appctx->ctx.hlua_cli.task = task_new(tid_bit);
+       appctx->ctx.hlua_cli.task = task_new_here();
        if (!appctx->ctx.hlua_cli.task) {
                SEND_ERR(NULL, "Lua cli '%s': out of memory.\n", fcn->name);
                goto error;
index 9f408a6..bfe3216 100644 (file)
@@ -1134,7 +1134,7 @@ void listener_release(struct listener *l)
 /* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */
 static int listener_queue_init()
 {
-       global_listener_queue_task = task_new(MAX_THREADS_MASK);
+       global_listener_queue_task = task_new_anywhere();
        if (!global_listener_queue_task) {
                ha_alert("Out of memory when initializing global listener queue\n");
                return ERR_FATAL|ERR_ABORT;
index 3df02f0..3d01d75 100644 (file)
@@ -133,7 +133,7 @@ int init_email_alert(struct mailers *mls, struct proxy *p, char **err)
                check->addr = mailer->addr;
                check->port = get_host_port(&mailer->addr);
 
-               if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
+               if ((t = task_new_anywhere()) == NULL) {
                        memprintf(err, "out of memory while allocating mailer alerts task");
                        goto error;
                }
index 3f127c7..5fb1c5e 100644 (file)
@@ -734,7 +734,7 @@ static int fcgi_init(struct connection *conn, struct proxy *px, struct session *
        fconn->app = app;
        fconn->task = NULL;
        if (tick_isset(fconn->timeout)) {
-               t = task_new(tid_bit);
+               t = task_new_here();
                if (!t) {
                        TRACE_ERROR("fconn task allocation failure", FCGI_EV_FCONN_NEW|FCGI_EV_FCONN_END|FCGI_EV_FCONN_ERR);
                        goto fail;
@@ -4247,7 +4247,7 @@ static int fcgi_takeover(struct connection *conn, int orig_tid)
                __ha_barrier_store();
                task_kill(task);
 
-               fcgi->task = task_new(tid_bit);
+               fcgi->task = task_new_here();
                if (!fcgi->task) {
                        fcgi_release(fcgi);
                        return -1;
index dcfa3ee..5dfd26c 100644 (file)
@@ -808,7 +808,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
                            &h1c->conn->stopping_list);
        }
        if (tick_isset(h1c->timeout)) {
-               t = task_new(tid_bit);
+               t = task_new_here();
                if (!t) {
                        TRACE_ERROR("H1C task allocation failure", H1_EV_H1C_NEW|H1_EV_H1C_END|H1_EV_H1C_ERR);
                        goto fail;
@@ -3738,7 +3738,7 @@ static int h1_takeover(struct connection *conn, int orig_tid)
                __ha_barrier_store();
                task_kill(task);
 
-               h1c->task = task_new(tid_bit);
+               h1c->task = task_new_here();
                if (!h1c->task) {
                        h1_release(h1c);
                        return -1;
index dfe0b37..ffdafc8 100644 (file)
@@ -945,7 +945,7 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s
        h2c->proxy = prx;
        h2c->task = NULL;
        if (tick_isset(h2c->timeout)) {
-               t = task_new(tid_bit);
+               t = task_new_here();
                if (!t)
                        goto fail;
 
@@ -6636,7 +6636,7 @@ static int h2_takeover(struct connection *conn, int orig_tid)
                __ha_barrier_store();
                task_kill(task);
 
-               h2c->task = task_new(tid_bit);
+               h2c->task = task_new_here();
                if (!h2c->task) {
                        h2_release(h2c);
                        return -1;
index 320f612..c4673ee 100644 (file)
@@ -602,7 +602,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
        qcc->proxy = prx;
        qcc->task = NULL;
        if (tick_isset(qcc->timeout)) {
-               t = task_new(tid_bit);
+               t = task_new_here();
                if (!t)
                        goto fail;
 
@@ -2107,7 +2107,7 @@ static int qc_takeover(struct connection *conn, int orig_tid)
                __ha_barrier_store();
                task_kill(task);
 
-               qcc->task = task_new(tid_bit);
+               qcc->task = task_new_here();
                if (!qcc->task) {
                        qc_release(qcc);
                        return -1;
index 8af7475..ab4d412 100644 (file)
@@ -3503,7 +3503,7 @@ int peers_init_sync(struct peers *peers)
                peers->peers_fe->maxconn += 3;
        }
 
-       peers->sync_task = task_new(MAX_THREADS_MASK);
+       peers->sync_task = task_new_anywhere();
        if (!peers->sync_task)
                return 0;
 
index ad5120a..db876e6 100644 (file)
@@ -2039,7 +2039,7 @@ static void do_soft_stop_now()
 
        /* schedule a hard-stop after a delay if needed */
        if (tick_isset(global.hard_stop_after)) {
-               task = task_new(MAX_THREADS_MASK);
+               task = task_new_anywhere();
                if (task) {
                        task->process = hard_stop;
                        task_schedule(task, tick_add(now_ms, global.hard_stop_after));
@@ -2077,7 +2077,7 @@ void soft_stop(void)
        stopping = 1;
 
        if (tick_isset(global.grace_delay)) {
-               task = task_new(MAX_THREADS_MASK);
+               task = task_new_anywhere();
                if (task) {
                        ha_notice("Scheduling a soft-stop in %u ms.\n", global.grace_delay);
                        send_log(NULL, LOG_WARNING, "Scheduling a soft-stop in %u ms.\n", global.grace_delay);
index 3b9a246..fe7b6a8 100644 (file)
@@ -2412,7 +2412,7 @@ static int resolvers_finalize_config(void)
                }
 
                /* Create the task associated to the resolvers section */
-               if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
+               if ((t = task_new_anywhere()) == NULL) {
                        ha_alert("resolvers '%s' : out of memory.\n", resolvers->id);
                        err_code |= (ERR_ALERT|ERR_ABORT);
                        goto err;
@@ -2453,7 +2453,7 @@ static int resolvers_finalize_config(void)
                                        }
                                }
 
-                               srv->srvrq_check = task_new(MAX_THREADS_MASK);
+                               srv->srvrq_check = task_new_anywhere();
                                if (!srv->srvrq_check) {
                                        ha_alert("%s '%s' : unable to create SRVRQ task for server '%s'.\n",
                                                 proxy_type_str(px), px->id, srv->id);
index 213d106..b44b164 100644 (file)
@@ -4480,7 +4480,7 @@ static int init_srv_slowstart(struct server *srv)
        struct task *t;
 
        if (srv->slowstart) {
-               if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
+               if ((t = task_new_anywhere()) == NULL) {
                        ha_alert("Cannot activate slowstart for server %s/%s: out of memory.\n", srv->proxy->id, srv->id);
                        return ERR_ALERT | ERR_FATAL;
                }
index e3601cb..d913d56 100644 (file)
@@ -248,7 +248,7 @@ int session_accept_fd(struct connection *cli_conn)
         *          conn -- owner ---> task <-----+
         */
        if (cli_conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) {
-               if (unlikely((sess->task = task_new(tid_bit)) == NULL))
+               if (unlikely((sess->task = task_new_here()) == NULL))
                        goto out_free_sess;
 
                sess->task->context = sess;
index b869d2e..d694e58 100644 (file)
@@ -731,7 +731,7 @@ static struct task *process_sink_forward(struct task * task, void *context, unsi
  */
 int sink_init_forward(struct sink *sink)
 {
-       sink->forward_task = task_new(MAX_THREADS_MASK);
+       sink->forward_task = task_new_anywhere();
        if (!sink->forward_task)
                return 0;
 
index 6f07080..f5d7632 100644 (file)
@@ -648,7 +648,7 @@ int stktable_init(struct stktable *t)
 
                t->exp_next = TICK_ETERNITY;
                if ( t->expire ) {
-                       t->exp_task = task_new(MAX_THREADS_MASK);
+                       t->exp_task = task_new_anywhere();
                        if (!t->exp_task)
                                return 0;
                        t->exp_task->process = process_table_expire;
index 89e85d8..e4d5ac9 100644 (file)
@@ -429,7 +429,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu
        s->pcli_flags = 0;
        s->unique_id = IST_NULL;
 
-       if ((t = task_new(tid_bit)) == NULL)
+       if ((t = task_new_here()) == NULL)
                goto out_fail_alloc;
 
        s->task = t;
index d1bfea8..32b8fce 100644 (file)
@@ -3046,7 +3046,7 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4,
  */
 static int quic_conn_init_timer(struct quic_conn *qc)
 {
-       qc->timer_task = task_new(MAX_THREADS_MASK);
+       qc->timer_task = task_new_anywhere();
        if (!qc->timer_task)
                return 0;