#define CF_WRITE_ERROR 0x00000800 /* unrecoverable error on consumer side */
#define CF_WRITE_ACTIVITY (CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_WRITE_ERROR)
-/* unused: 0x00001000 */
+#define CF_WAKE_WRITE 0x00001000 /* wake the task up when there's write activity */
#define CF_SHUTW 0x00002000 /* consumer has already shut down */
#define CF_SHUTW_NOW 0x00004000 /* the consumer must shut down for writes ASAP */
#define CF_AUTO_CLOSE 0x00008000 /* producer can forward shutdown to other side */
#define CF_WAKE_ONCE 0x10000000 /* pretend there is activity on this channel (one-shoot) */
/* unused: 0x20000000, 0x40000000, 0x80000000 */
-/* Use these masks to clear the flags before going back to lower layers */
-#define CF_CLEAR_READ (~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ERROR|CF_READ_ATTACHED))
-#define CF_CLEAR_WRITE (~(CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_WRITE_ERROR))
-#define CF_CLEAR_TIMEOUT (~(CF_READ_TIMEOUT|CF_WRITE_TIMEOUT|CF_ANA_TIMEOUT))
-
/* Masks which define input events for stream analysers */
#define CF_MASK_ANALYSER (CF_READ_ATTACHED|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_ANA_TIMEOUT|CF_WRITE_ACTIVITY|CF_WAKE_ONCE)
* input is closed, -2 is returned. If there is not enough room left in the
* buffer, -1 is returned. Otherwise the number of bytes copied is returned
* (1). Channel flag READ_PARTIAL is updated if some data can be transferred.
+ * Channel flag CF_WAKE_WRITE is set if the write fails because the buffer is
+ * full.
*/
int bi_putchr(struct channel *chn, char c)
{
if (unlikely(channel_input_closed(chn)))
return -2;
- if (channel_full(chn))
+ if (channel_full(chn)) {
+ chn->flags |= CF_WAKE_WRITE;
return -1;
+ }
*bi_end(chn->buf) = c;
* -3 is returned. If there is not enough room left in the buffer, -1 is
* returned. Otherwise the number of bytes copied is returned (0 being a valid
* number). Channel flag READ_PARTIAL is updated if some data can be
- * transferred.
+ * transferred. Channel flag CF_WAKE_WRITE is set if the write fails because
+ * the buffer is full.
*/
int bi_putblk(struct channel *chn, const char *blk, int len)
{
if (len > max)
return -3;
+ chn->flags |= CF_WAKE_WRITE;
return -1;
}
/* ensure we have some output room left in the event we
* would want to return some info right after parsing.
*/
- if (buffer_almost_full(si->ib->buf))
+ if (buffer_almost_full(si->ib->buf)) {
+ si->ib->flags |= CF_WAKE_WRITE;
break;
+ }
reql = bo_getline(si->ob, trash.str, trash.size);
if (reql <= 0) { /* closed or EOL not found */
case STAT_PX_ST_LI:
/* stats.l has been initialized above */
for (; appctx->ctx.stats.l != &px->conf.listeners; appctx->ctx.stats.l = l->by_fe.n) {
- if (buffer_almost_full(rep->buf))
+ if (buffer_almost_full(rep->buf)) {
+ rep->flags |= CF_WAKE_WRITE;
return 0;
+ }
l = LIST_ELEM(appctx->ctx.stats.l, struct listener *, by_fe);
if (!l->counters)
for (; appctx->ctx.stats.sv != NULL; appctx->ctx.stats.sv = sv->next) {
int sv_state; /* 0=DOWN, 1=going up, 2=going down, 3=UP, 4,5=NOLB, 6=unchecked */
- if (buffer_almost_full(rep->buf))
+ if (buffer_almost_full(rep->buf)) {
+ rep->flags |= CF_WAKE_WRITE;
return 0;
+ }
sv = appctx->ctx.stats.sv;
case STAT_ST_LIST:
/* dump proxies */
while (appctx->ctx.stats.px) {
- if (buffer_almost_full(rep->buf))
+ if (buffer_almost_full(rep->buf)) {
+ rep->flags |= CF_WAKE_WRITE;
return 0;
+ }
px = appctx->ctx.stats.px;
/* skip the disabled proxies, global frontend and non-networked ones */
/* some data has still not left the buffer, wake us once that's done */
channel_dont_connect(req);
req->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
+ req->flags |= CF_WAKE_WRITE;
return 0;
}
if (unlikely(bi_end(req->buf) < b_ptr(req->buf, msg->next) ||
/* don't let a connection request be initiated */
channel_dont_connect(req);
s->rep->flags &= ~CF_EXPECT_MORE; /* speed up sending a previous response */
+ s->rep->flags |= CF_WAKE_WRITE;
s->rep->analysers |= an_bit; /* wake us up once it changes */
return 0;
}
goto abort_response;
channel_dont_close(rep);
rep->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
+ rep->flags |= CF_WAKE_WRITE;
return 0;
}
memset(&s->txn.auth, 0, sizeof(s->txn.auth));
/* This flag must explicitly be set every time */
- s->req->flags &= ~CF_READ_NOEXP;
+ s->req->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
+ s->rep->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
/* Keep a copy of req/rep flags so that we can detect shutdowns */
rqf_last = s->req->flags & ~CF_MASK_ANALYSER;
/* changes on the consumption side */
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
((si->ob->flags & CF_WRITE_ACTIVITY) &&
- ((si->ob->flags & CF_SHUTW) ||
- si->ob->prod->state != SI_ST_EST ||
- (channel_is_empty(si->ob) && !si->ob->to_forward)))) {
+ (si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
task_wakeup(si->owner, TASK_WOKEN_IO);
}
/* changes on the consumption side */
(si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
((si->ob->flags & CF_WRITE_ACTIVITY) &&
- ((si->ob->flags & CF_SHUTW) ||
- si->ob->prod->state != SI_ST_EST ||
- (channel_is_empty(si->ob) && !si->ob->to_forward)))) {
+ (si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
task_wakeup(si->owner, TASK_WOKEN_IO);
}
if (si->ib->flags & CF_READ_ACTIVITY)
/* in case of special condition (error, shutdown, end of write...), we
* have to notify the task.
*/
- if (likely((ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
- (channel_is_empty(ob) && !ob->to_forward) ||
- si->state != SI_ST_EST)) {
+ if (likely(ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW|CF_WAKE_WRITE))) {
out_wakeup:
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
task_wakeup(si->owner, TASK_WOKEN_IO);