Historically this function would try to wake the most accurate number of
process_stream() waiters. But since the introduction of filters which could
also require buffers (e.g. for compression), things started not to be as
accurate anymore. Nowadays muxes and transport layers also use buffers, so
the runqueue size has nothing to do anymore with the number of supposed
users to come.
In addition to this, the threshold was compared to the number of free buffer
calculated as allocated minus used, but this didn't work anymore with local
pools since these counts are not updated upon alloc/free!
Let's clean this up and pass the number of released buffers instead, and
consider that each waiter successfully called counts as one buffer. This
is not rocket science and will not suddenly fix everything, but at least
it cannot be as wrong as it is today.
This could have been marked as a bug given that the current situation is
totally broken regarding this, but this probably doesn't completely fix
it, it only goes in a better direction. It is possible however that it
makes sense in the future to backport this as part of a larger series if
the situation significantly improves.
(cherry picked from commit
4d77bbf8560b5d5b32409be131e4975811bfec28)
Signed-off-by: Willy Tarreau <w@1wt.eu>
{
if (c_size(chn) && c_empty(chn)) {
b_free(&chn->buf);
- offer_buffers(wait->target, tasks_run_queue);
+ offer_buffers(wait->target, 1);
}
}
}
-/* Offer a buffer currently belonging to target <from> to whoever needs one.
- * Any pointer is valid for <from>, including NULL. Its purpose is to avoid
- * passing a buffer to oneself in case of failed allocations (e.g. need two
- * buffers, get one, fail, release it and wake up self again). In case of
- * normal buffer release where it is expected that the caller is not waiting
+/* Offer one or multiple buffer currently belonging to target <from> to whoever
+ * needs one. Any pointer is valid for <from>, including NULL. Its purpose is
+ * to avoid passing a buffer to oneself in case of failed allocations (e.g.
+ * need two buffers, get one, fail, release it and wake up self again). In case
+ * of normal buffer release where it is expected that the caller is not waiting
* for a buffer, NULL is fine. It will wake waiters on the current thread only.
*/
-void __offer_buffer(void *from, unsigned int threshold);
+void __offer_buffers(void *from, unsigned int count);
-static inline void offer_buffers(void *from, unsigned int threshold)
+static inline void offer_buffers(void *from, unsigned int count)
{
if (!LIST_ISEMPTY(&ti->buffer_wq))
- __offer_buffer(from, threshold);
+ __offer_buffers(from, count);
}
{
if (bptr->size) {
b_free(bptr);
- offer_buffers(check->buf_wait.target, tasks_run_queue);
+ offer_buffers(check->buf_wait.target, 1);
}
}
fflush(o);
}
-/* see offer_buffer() for details */
-void __offer_buffer(void *from, unsigned int threshold)
+/* see offer_buffers() for details */
+void __offer_buffers(void *from, unsigned int count)
{
struct buffer_wait *wait, *wait_back;
- int avail;
/* For now, we consider that all objects need 1 buffer, so we can stop
* waking up them once we have enough of them to eat all the available
* buffers. Note that we don't really know if they are streams or just
* other tasks, but that's a rough estimate. Similarly, for each cached
- * event we'll need 1 buffer. If no buffer is currently used, always
- * wake up the number of tasks we can offer a buffer based on what is
- * allocated, and in any case at least one task per two reserved
- * buffers.
+ * event we'll need 1 buffer.
*/
- avail = pool_head_buffer->allocated - pool_head_buffer->used - global.tune.reserved_bufs / 2;
-
list_for_each_entry_safe(wait, wait_back, &ti->buffer_wq, list) {
- if (avail <= threshold)
+ if (!count)
break;
if (wait->target == from || !wait->wakeup_cb(wait->target))
continue;
LIST_DEL_INIT(&wait->list);
- avail--;
+ count--;
}
}
/* Release the buffer if needed */
if (buf->size) {
b_free(buf);
- offer_buffers(buffer_wait->target, tasks_run_queue);
+ offer_buffers(buffer_wait->target, 1);
}
}
{
if (bptr->size) {
b_free(bptr);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
}
count++;
}
if (count)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, count);
}
/* Returns the number of allocatable outgoing streams for the connection taking
eb32_delete(&fstrm->by_id);
if (b_size(&fstrm->rxbuf)) {
b_free(&fstrm->rxbuf);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
if (fstrm->subs)
fstrm->subs->events = 0;
}
if (released)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, released);
/* wrote at least one byte, the buffer is not full anymore */
if (fconn->flags & (FCGI_CF_MUX_MFULL | FCGI_CF_DEM_MROOM))
}
if (released)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, released);
}
end:
{
if (bptr->size) {
b_free(bptr);
- offer_buffers(h1c->buf_wait.target, tasks_run_queue);
+ offer_buffers(h1c->buf_wait.target, 1);
}
}
{
if (bptr->size) {
b_free(bptr);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
}
count++;
}
if (count)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, count);
}
/* returns the number of allocatable outgoing streams for the connection taking
eb32_delete(&h2s->by_id);
if (b_size(&h2s->rxbuf)) {
b_free(&h2s->rxbuf);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
if (h2s->subs)
}
if (released)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, released);
/* wrote at least one byte, the buffer is not full anymore */
if (sent)
}
if (released)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, released);
}
/* in any case this connection must not be considered idle anymore */
cs->flags |= CS_FL_ERROR;
if (b_size(&h2s->rxbuf)) {
b_free(&h2s->rxbuf);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
}
LIST_DEL_INIT(&s->buffer_wait.list);
if (s->req.buf.size || s->res.buf.size) {
+ int count = !!s->req.buf.size + !!s->res.buf.size;
+
b_free(&s->req.buf);
b_free(&s->res.buf);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, count);
}
pool_free(pool_head_uniqueid, s->unique_id.ptr);
int offer = 0;
if (c_size(&s->req) && c_empty(&s->req)) {
- offer = 1;
+ offer++;
b_free(&s->req.buf);
}
if (c_size(&s->res) && c_empty(&s->res)) {
- offer = 1;
+ offer++;
b_free(&s->res.buf);
}
* someone waiting, we can wake up a waiter and offer them.
*/
if (offer)
- offer_buffers(s, tasks_run_queue);
+ offer_buffers(s, offer);
}
void stream_process_counters(struct stream *s)