From c73060687919fcadc5f1d767842c1762153854c1 Mon Sep 17 00:00:00 2001 From: Emeric Brun Date: Mon, 26 Jun 2017 16:36:53 +0200 Subject: [PATCH] MAJOR: applet: applet scheduler rework. In order to authorize call of appctx_wakeup on running task: - from within the task handler itself. - in futur, from another thread. The appctx is considered paused as default after running the handler. The handler should explicitly call appctx_wakeup to be re-called. When the appctx_free is called on a running handler. The real free is postponed at the end of the handler process. --- include/proto/applet.h | 40 +++++++++++++++++++++++++++++----------- include/types/applet.h | 7 +++++++ src/applet.c | 37 +++++++++++++++++++++++++------------ src/stream.c | 1 - src/stream_interface.c | 12 ------------ 5 files changed, 61 insertions(+), 36 deletions(-) diff --git a/include/proto/applet.h b/include/proto/applet.h index 653be31..3cf8578 100644 --- a/include/proto/applet.h +++ b/include/proto/applet.h @@ -48,6 +48,7 @@ static inline void appctx_init(struct appctx *appctx) { appctx->st0 = appctx->st1 = appctx->st2 = 0; appctx->io_release = NULL; + appctx->state = APPLET_SLEEPING; } /* Tries to allocate a new appctx and initialize its main fields. The appctx @@ -76,7 +77,7 @@ static inline struct appctx *appctx_new(struct applet *applet) /* Releases an appctx previously allocated by appctx_new(). Note that * we share the connection pool. */ -static inline void appctx_free(struct appctx *appctx) +static inline void __appctx_free(struct appctx *appctx) { if (!LIST_ISEMPTY(&appctx->runq)) { LIST_DEL(&appctx->runq); @@ -89,9 +90,17 @@ static inline void appctx_free(struct appctx *appctx) pool_free2(pool2_connection, appctx); nb_applets--; } +static inline void appctx_free(struct appctx *appctx) +{ + if (appctx->state & APPLET_RUNNING) { + appctx->state |= APPLET_WANT_DIE; + return; + } + __appctx_free(appctx); +} /* wakes up an applet when conditions have changed */ -static inline void appctx_wakeup(struct appctx *appctx) +static inline void __appctx_wakeup(struct appctx *appctx) { if (LIST_ISEMPTY(&appctx->runq)) { LIST_ADDQ(&applet_active_queue, &appctx->runq); @@ -99,25 +108,34 @@ static inline void appctx_wakeup(struct appctx *appctx) } } -/* removes an applet from the list of active applets */ -static inline void appctx_pause(struct appctx *appctx) +static inline void appctx_wakeup(struct appctx *appctx) { - if (!LIST_ISEMPTY(&appctx->runq)) { - LIST_DEL(&appctx->runq); - LIST_INIT(&appctx->runq); - applets_active_queue--; + if (appctx->state & APPLET_RUNNING) { + appctx->state |= APPLET_WOKEN_UP; + return; } + __appctx_wakeup(appctx); } /* Callback used to wake up an applet when a buffer is available. The applet * is woken up is if it is not already in the list of "active" * applets. This functions returns 1 is the stream is woken up, otherwise it - * returns 0. */ + * returns 0. If task is running we request we check if woken was already + * requested */ static inline int appctx_res_wakeup(struct appctx *appctx) { - if (!LIST_ISEMPTY(&appctx->runq)) + if (appctx->state & APPLET_RUNNING) { + if (appctx->state & APPLET_WOKEN_UP) { + return 0; + } + appctx->state |= APPLET_WOKEN_UP; + return 1; + } + + if (!LIST_ISEMPTY(&appctx->runq)) { return 0; - appctx_wakeup(appctx); + } + __appctx_wakeup(appctx); return 1; } diff --git a/include/types/applet.h b/include/types/applet.h index 90484f6..71976d8 100644 --- a/include/types/applet.h +++ b/include/types/applet.h @@ -44,11 +44,17 @@ struct applet { unsigned int timeout; /* execution timeout. */ }; +#define APPLET_SLEEPING 0x00 /* applet is currently sleeping or pending in active queue */ +#define APPLET_RUNNING 0x01 /* applet is currently running */ +#define APPLET_WOKEN_UP 0x02 /* applet was running and requested to woken up again */ +#define APPLET_WANT_DIE 0x04 /* applet was running and requested to die */ + /* Context of a running applet. */ struct appctx { struct list runq; /* chaining in the applet run queue */ enum obj_type obj_type; /* OBJ_TYPE_APPCTX */ /* 3 unused bytes here */ + unsigned short state; /* Internal appctx state */ unsigned int st0; /* CLI state for stats, session state for peers */ unsigned int st1; /* prompt for stats, session error for peers */ unsigned int st2; /* output state for stats, unused by peers */ @@ -59,6 +65,7 @@ struct appctx { void (*io_release)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK, if the command is terminated or the session released */ struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */ + unsigned long process_mask; /* mask of thread IDs authorized to process the applet */ union { struct { diff --git a/src/applet.c b/src/applet.c index f5bc79d..324dfd3 100644 --- a/src/applet.c +++ b/src/applet.c @@ -24,22 +24,22 @@ unsigned int nb_applets = 0; unsigned int applets_active_queue = 0; struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue); -struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue); void applet_run_active() { - struct appctx *curr; + struct appctx *curr, *next; struct stream_interface *si; + struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue); - if (LIST_ISEMPTY(&applet_active_queue)) - return; - - /* move active queue to run queue */ - applet_active_queue.n->p = &applet_cur_queue; - applet_active_queue.p->n = &applet_cur_queue; - - applet_cur_queue = applet_active_queue; - LIST_INIT(&applet_active_queue); + curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq); + while (&curr->runq != &applet_active_queue) { + next = LIST_NEXT(&curr->runq, typeof(next), runq); + LIST_DEL(&curr->runq); + curr->state = APPLET_RUNNING; + LIST_ADDQ(&applet_cur_queue, &curr->runq); + applets_active_queue--; + curr = next; + } /* The list is only scanned from the head. This guarantees that if any * applet removes another one, there is no side effect while walking @@ -70,7 +70,20 @@ void applet_run_active() if (applet_cur_queue.n == &curr->runq) { /* curr was left in the list, move it back to the active list */ LIST_DEL(&curr->runq); - LIST_ADDQ(&applet_active_queue, &curr->runq); + LIST_INIT(&curr->runq); + if (curr->state & APPLET_WANT_DIE) { + curr->state = APPLET_SLEEPING; + __appctx_free(curr); + } + else { + if (curr->state & APPLET_WOKEN_UP) { + curr->state = APPLET_SLEEPING; + __appctx_wakeup(curr); + } + else { + curr->state = APPLET_SLEEPING; + } + } } } } diff --git a/src/stream.c b/src/stream.c index 3781ac7..4e34f38 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1061,7 +1061,6 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px, /* Stops the applet sheduling, in case of the init function miss * some data. */ - appctx_pause(appctx); si_applet_stop_get(&s->si[1]); /* Call initialisation. */ diff --git a/src/stream_interface.c b/src/stream_interface.c index 47ba8c1..52e2df4 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -1369,16 +1369,6 @@ void si_applet_wake_cb(struct stream_interface *si) /* update the stream-int, channels, and possibly wake the stream up */ stream_int_notify(si); - - /* Get away from the active list if we can't work anymore. - * We also do that if the main task has already scheduled, because it - * saves a useless wakeup/pause/wakeup cycle causing one useless call - * per session on average. - */ - if (task_in_rq(si_task(si)) || - (((si->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) != SI_FL_WANT_PUT) && - ((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) != SI_FL_WANT_GET))) - appctx_pause(si_appctx(si)); } @@ -1393,8 +1383,6 @@ void stream_int_update_applet(struct stream_interface *si) if (((si->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) || ((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET)) appctx_wakeup(si_appctx(si)); - else - appctx_pause(si_appctx(si)); } /* -- 1.7.10.4