MINOR: task: add thread safe notification_new and notification_wake variants
authorAurelien DARRAGON <adarragon@haproxy.com>
Tue, 1 Apr 2025 08:07:50 +0000 (10:07 +0200)
committerAurelien DARRAGON <adarragon@haproxy.com>
Tue, 15 Apr 2025 16:37:48 +0000 (18:37 +0200)
notification_new and notification_wake were historically meant to be
called by a single thread doing both the init and the wakeup for other
tasks waiting on the signals.

In this patch, we extend the API so that notification_new and
notification_wake have thread-safe variants that can safely be used with
multiple threads registering on the same list of events and multiple
threads pushing updates on the list.

(cherry picked from commit b77b1a2c3ac70a719a2a06964e56a206ab9cc6ec)
Signed-off-by: Aurelien DARRAGON <adarragon@haproxy.com>

include/haproxy/task-t.h
include/haproxy/task.h

index 72f0d13..a50d54e 100644 (file)
@@ -107,8 +107,13 @@ enum {
 struct notification {
        struct list purge_me; /* Part of the list of signals to be purged in the
                                 case of the LUA execution stack crash. */
-       struct list wake_me; /* Part of list of signals to be targeted if an
-                               event occurs. */
+       union {
+               struct list wake_me; /* Part of list of signals to be targeted if an
+                                       event occurs. */
+               struct mt_list wake_me_mt; /* thread safe signal list */
+       } wake;
+# define wake_me wake.wake_me
+# define wake_me_mt wake.wake_me_mt
        struct task *task; /* The task to be wake if an event occurs. */
        __decl_thread(HA_SPINLOCK_T lock);
 };
index 5749253..c9007bb 100644 (file)
@@ -772,6 +772,17 @@ static inline const char *task_wakeup_type_str(uint t)
        }
 }
 
+static inline struct notification *_notification_new(struct list *purge, struct task *wakeup)
+{
+       struct notification *com = pool_alloc(pool_head_notification);
+       if (!com)
+               return NULL;
+       LIST_APPEND(purge, &com->purge_me);
+       HA_SPIN_INIT(&com->lock);
+       com->task = wakeup;
+       return com;
+}
+
 /* This function register a new signal. "lua" is the current lua
  * execution context. It contains a pointer to the associated task.
  * "link" is a list head attached to an other task that must be wake
@@ -781,13 +792,20 @@ static inline const char *task_wakeup_type_str(uint t)
  */
 static inline struct notification *notification_new(struct list *purge, struct list *event, struct task *wakeup)
 {
-       struct notification *com = pool_alloc(pool_head_notification);
+       struct notification *com = _notification_new(purge, wakeup);
        if (!com)
                return NULL;
-       LIST_APPEND(purge, &com->purge_me);
        LIST_APPEND(event, &com->wake_me);
-       HA_SPIN_INIT(&com->lock);
-       com->task = wakeup;
+       return com;
+}
+
+/* thread safe variant */
+static inline struct notification *notification_new_mt(struct list *purge, struct mt_list *event, struct task *wakeup)
+{
+       struct notification *com = _notification_new(purge, wakeup);
+       if (!com)
+               return NULL;
+       MT_LIST_APPEND(event, &com->wake_me_mt);
        return com;
 }
 
@@ -836,6 +854,19 @@ static inline void notification_gc(struct list *purge)
        }
 }
 
+static inline void _notification_wake(struct notification *com)
+{
+       HA_SPIN_LOCK(NOTIF_LOCK, &com->lock);
+       if (!com->task) {
+               HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
+               pool_free(pool_head_notification, com);
+               return;
+       }
+       task_wakeup(com->task, TASK_WOKEN_MSG);
+       com->task = NULL;
+       HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
+
+}
 /* This function sends signals. It wakes all the tasks attached
  * to a list head, and remove the signal, and free the used
  * memory. The wake list is not locked because it is owned by
@@ -848,16 +879,21 @@ static inline void notification_wake(struct list *wake)
 
        /* Wake task and delete all pending communication signals. */
        list_for_each_entry_safe(com, back, wake, wake_me) {
-               HA_SPIN_LOCK(NOTIF_LOCK, &com->lock);
                LIST_DELETE(&com->wake_me);
-               if (!com->task) {
-                       HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
-                       pool_free(pool_head_notification, com);
-                       continue;
-               }
-               task_wakeup(com->task, TASK_WOKEN_MSG);
-               com->task = NULL;
-               HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
+               _notification_wake(com);
+       }
+}
+
+/* thread safe variant */
+static inline void notification_wake_mt(struct mt_list *wake)
+{
+       struct notification *com;
+       struct mt_list back;
+
+       /* Wake task and delete all pending communication signals. */
+       MT_LIST_FOR_EACH_ENTRY_UNLOCKED(com, wake, wake_me_mt, back) {
+               _notification_wake(com);
+               com = NULL;
        }
 }