MINOR: listener: define callback for accept queue push
Extend API for connection thread rebind API by replacing single callback
set_affinity by three different ones. Each one of them is used at a
different stage of the operation :
* set_affinity1 is used similarly to previous set_affinity
* set_affinity2 is called directly from accept_queue_push_mp() when an
entry has been found in accept ring. This operation cannot fail.
* reset_affinity is called after set_affinity1 in case of failure from
accept_queue_push_mp() due to no space left in accept ring. This is
necessary for protocols which must reconfigure resources before
fallback on the current tid.
This patch does not have any functional changes. However, it will be
required to fix crashes for QUIC connections when accept queue ring is
full. As such, it must be backported with it.
(cherry picked from commit
1a43b9f32c71267e3cb514aa70a13c75adb20742)
[wt: backported for ease of maintenance, as suggested in commit
9fbe8b0334 ("CLEANUP: proto: rename TID affinity callbacks") also
marked for backporting]
Signed-off-by: Willy Tarreau <w@1wt.eu>
void (*ignore_events)(struct connection *conn, int event_type); /* unsubscribe from socket events */
int (*get_src)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's source address; -1=fail */
int (*get_dst)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's dest address; -1=fail */
- int (*set_affinity)(struct connection *conn, int new_tid);
+
+ /* functions related to thread affinity update */
+ /* prepare rebind connection on a new thread, may fail */
+ int (*set_affinity1)(struct connection *conn, int new_tid);
+ /* complete connection thread rebinding, no error possible */
+ void (*set_affinity2)(struct connection *conn);
+ /* cancel connection thread rebinding */
+ void (*reset_affinity)(struct connection *conn);
/* functions acting on the receiver */
int (*rx_suspend)(struct receiver *rx); /* temporarily suspend this receiver for a soft restart */
int qc_parse_hd_form(struct quic_rx_packet *pkt,
unsigned char **buf, const unsigned char *end);
-int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li);
+int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li);
void qc_finalize_affinity_rebind(struct quic_conn *qc);
int qc_handle_conn_migration(struct quic_conn *qc,
const struct sockaddr_storage *peer_addr,
}
-/* tries to push a new accepted connection <conn> into ring <ring>. Returns
- * non-zero if it succeeds, or zero if the ring is full. Supports multiple
- * producers.
+/* Tries to push a new accepted connection <conn> into ring <ring>.
+ * <accept_push_cb> is called if not NULL just prior to the push operation.
+ *
+ * Returns non-zero if it succeeds, or zero if the ring is full. Supports
+ * multiple producers.
*/
-int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn)
+int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn,
+ void (*accept_push_cb)(struct connection *))
{
unsigned int pos, next;
uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */
next |= (idx & 0xffff0000U);
} while (unlikely(!_HA_ATOMIC_CAS(&ring->idx, &idx, next) && __ha_cpu_relax()));
+ if (accept_push_cb)
+ accept_push_cb(conn);
+
ring->entry[pos] = conn;
__ha_barrier_store();
return 1;
*/
void listener_accept(struct listener *l)
{
+ void (*li_set_affinity2)(struct connection *);
struct connection *cli_conn;
struct proxy *p;
unsigned int max_accept;
int ret;
p = l->bind_conf->frontend;
+ li_set_affinity2 = l->rx.proto ? l->rx.proto->set_affinity2 : NULL;
/* if l->bind_conf->maxaccept is -1, then max_accept is UINT_MAX. It is
* not really illimited, but it is probably enough.
* reservation in the target ring.
*/
- if (l->rx.proto && l->rx.proto->set_affinity) {
- if (l->rx.proto->set_affinity(cli_conn, t)) {
+ if (l->rx.proto && l->rx.proto->set_affinity1) {
+ if (l->rx.proto->set_affinity1(cli_conn, t)) {
/* Failed migration, stay on the same thread. */
goto local_accept;
}
* when processing this loop.
*/
ring = &accept_queue_rings[t];
- if (accept_queue_push_mp(ring, cli_conn)) {
+ if (accept_queue_push_mp(ring, cli_conn, li_set_affinity2)) {
_HA_ATOMIC_INC(&activity[t].accq_pushed);
tasklet_wakeup(ring->tasklet);
+
continue;
}
/* If the ring is full we do a synchronous accept on
* the local thread here.
*/
_HA_ATOMIC_INC(&activity[t].accq_full);
+
+ if (l->rx.proto && l->rx.proto->reset_affinity)
+ l->rx.proto->reset_affinity(cli_conn);
}
#endif // USE_THREAD
static int quic_connect_server(struct connection *conn, int flags);
static void quic_enable_listener(struct listener *listener);
static void quic_disable_listener(struct listener *listener);
-static int quic_set_affinity(struct connection *conn, int new_tid);
+static int quic_set_affinity1(struct connection *conn, int new_tid);
/* Note: must not be declared <const> as its list will be overwritten */
struct protocol proto_quic4 = {
.get_src = quic_sock_get_src,
.get_dst = quic_sock_get_dst,
.connect = quic_connect_server,
- .set_affinity = quic_set_affinity,
+ .set_affinity1 = quic_set_affinity1,
/* binding layer */
.rx_suspend = udp_suspend_receiver,
.get_src = quic_sock_get_src,
.get_dst = quic_sock_get_dst,
.connect = quic_connect_server,
- .set_affinity = quic_set_affinity,
+ .set_affinity1 = quic_set_affinity1,
/* binding layer */
.rx_suspend = udp_suspend_receiver,
* target is a listener, and the caller is responsible for guaranteeing that
* the listener assigned to the connection is bound to the requested thread.
*/
-static int quic_set_affinity(struct connection *conn, int new_tid)
+static int quic_set_affinity1(struct connection *conn, int new_tid)
{
struct quic_conn *qc = conn->handle.qc;
- return qc_set_tid_affinity(qc, new_tid, objt_listener(conn->target));
+ return qc_set_tid_affinity1(qc, new_tid, objt_listener(conn->target));
}
static int quic_alloc_dghdlrs(void)
.unbind = rhttp_unbind_receiver,
.resume = default_resume_listener,
.accept_conn = rhttp_accept_conn,
- .set_affinity = rhttp_set_affinity,
+ .set_affinity1 = rhttp_set_affinity,
/* address family */
.fam = &proto_fam_rhttp,
*
* Returns 0 on success else non-zero.
*/
-int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li)
+int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li)
{
struct task *t1 = NULL, *t2 = NULL;
struct tasklet *t3 = NULL;