MEDIUM: ring: add new srv statement to support octet counting forward
authorEmeric Brun <ebrun@haproxy.com>
Fri, 29 May 2020 23:42:45 +0000 (01:42 +0200)
committerWilly Tarreau <w@1wt.eu>
Sun, 31 May 2020 08:49:43 +0000 (10:49 +0200)
log-proto <logproto>
  The "log-proto" specifies the protocol used to forward event messages to
  a server configured in a ring section. Possible values are "legacy"
  and "octet-count" corresponding respectively to "Non-transparent-framing"
  and "Octet counting" in rfc6587. "legacy" is the default.

Notes: a separated io_handler was created to avoid per messages test
and to prepare code to set different log protocols such as
request- response based ones.

doc/configuration.txt
include/types/server.h
src/server.c
src/sink.c

index 15f9668..0a6086c 100644 (file)
@@ -2606,7 +2606,8 @@ server <name> <address> [param*]
   respond, it will prevent old messages from being purged and may block new
   messages from being inserted into the ring. The proper way to send messages
   to multiple servers is to use one distinct ring per log server, not to
-  attach multiple servers to the same ring.
+  attach multiple servers to the same ring. Note that specific server directive
+  "log-proto" is used to set the protocol used to send messages.
 
 size <size>
   This is the optional size in bytes for the ring-buffer. Default value is
@@ -2639,7 +2640,7 @@ timeout server <timeout>
         size 32764
         timeout connect 5s
         timeout server 10s
-        server mysyslogsrv 127.0.0.1:6514
+        server mysyslogsrv 127.0.0.1:6514 log-proto octet-count
 
 
 4. Proxies
@@ -13080,6 +13081,12 @@ downinter <delay>
   global "spread-checks" keyword. This makes sense for instance when a lot
   of backends use the same servers.
 
+log-proto <logproto>
+  The "log-proto" specifies the protocol used to forward event messages to
+  a server configured in a ring section. Possible values are "legacy"
+  and "octet-count" corresponding respectively to "Non-transparent-framing"
+  and "Octet counting" in rfc6587. "legacy" is the default.
+
 maxconn <maxconn>
   The "maxconn" parameter specifies the maximal number of concurrent
   connections that will be sent to this server. If the number of incoming
index 80119a3..7b1ae5f 100644 (file)
@@ -177,6 +177,12 @@ enum srv_initaddr {
 #define SRV_SSL_O_EARLY_DATA   0x400  /* Allow using early data */
 #endif
 
+/* log servers ring's protocols options */
+enum srv_log_proto {
+        SRV_LOG_PROTO_LEGACY,         // messages on TCP separated by LF
+        SRV_LOG_PROTO_OCTET_COUNTING, // TCP frames: MSGLEN SP MSG
+};
+
 /* The server names dictionary */
 extern struct dict server_name_dict;
 
@@ -291,6 +297,7 @@ struct server {
        char *hostname;                         /* server hostname */
        struct sockaddr_storage init_addr;      /* plain IP address specified on the init-addr line */
        unsigned int init_addr_methods;         /* initial address setting, 3-bit per method, ends at 0, enough to store 10 entries */
+       enum srv_log_proto log_proto;           /* used proto to emmit messages on server lines from ring section */
 
 #ifdef USE_OPENSSL
        char *sni_expr;             /* Temporary variable to store a sample expression for SNI */
index e4044cd..e710b48 100644 (file)
@@ -2275,6 +2275,20 @@ int parse_server(const char *file, int linenum, char **args, struct proxy *curpr
                                newsrv->uweight = newsrv->iweight = w;
                                cur_arg += 2;
                        }
+                       else if (!strcmp(args[cur_arg], "log-proto")) {
+                               if (!strcmp(args[cur_arg + 1], "legacy"))
+                                       newsrv->log_proto = SRV_LOG_PROTO_LEGACY;
+                               else if (!strcmp(args[cur_arg + 1], "octet-count"))
+                                       newsrv->log_proto = SRV_LOG_PROTO_OCTET_COUNTING;
+                               else {
+                                       ha_alert("parsing [%s:%d]: '%s' expects one of 'legacy' or "
+                                               "'octet-count' but got '%s'\n",
+                                               file, linenum, args[cur_arg], args[cur_arg + 1]);
+                                       err_code |= ERR_ALERT | ERR_FATAL;
+                                       goto out;
+                               }
+                               cur_arg += 2;
+                       }
                        else if (!strcmp(args[cur_arg], "minconn")) {
                                newsrv->minconn = atol(args[cur_arg + 1]);
                                cur_arg += 2;
index e7a0c02..9eca481 100644 (file)
@@ -470,6 +470,150 @@ close:
        si_ic(si)->flags |= CF_READ_NULL;
 }
 
+/*
+ * IO Handler to handle message push to syslog tcp server
+ * using octet counting frames
+ */
+static void sink_forward_oc_io_handler(struct appctx *appctx)
+{
+       struct stream_interface *si = appctx->owner;
+       struct stream *s = si_strm(si);
+       struct sink *sink = strm_fe(s)->parent;
+       struct sink_forward_target *sft = appctx->ctx.sft.ptr;
+       struct ring *ring = sink->ctx.ring;
+       struct buffer *buf = &ring->buf;
+       uint64_t msg_len;
+       size_t len, cnt, ofs;
+       int ret = 0;
+       char *p;
+
+       /* if stopping was requested, close immediatly */
+       if (unlikely(stopping))
+               goto close;
+
+       /* for rex because it seems reset to timeout
+        * and we don't want expire on this case
+        * with a syslog server
+        */
+       si_oc(si)->rex = TICK_ETERNITY;
+       /* rto should not change but it seems the case */
+       si_oc(si)->rto = TICK_ETERNITY;
+
+       /* an error was detected */
+       if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
+               goto close;
+
+       /* con closed by server side */
+       if ((si_oc(si)->flags & CF_SHUTW))
+               goto close;
+
+       /* if the connection is not established, inform the stream that we want
+        * to be notified whenever the connection completes.
+        */
+       if (si_opposite(si)->state < SI_ST_EST) {
+               si_cant_get(si);
+               si_rx_conn_blk(si);
+               si_rx_endp_more(si);
+               return;
+       }
+
+       HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+       if (appctx != sft->appctx) {
+               HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+               goto close;
+       }
+       ofs = sft->ofs;
+
+       HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+       LIST_DEL_INIT(&appctx->wait_entry);
+       HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+       HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
+
+       /* explanation for the initialization below: it would be better to do
+        * this in the parsing function but this would occasionally result in
+        * dropped events because we'd take a reference on the oldest message
+        * and keep it while being scheduled. Thus instead let's take it the
+        * first time we enter here so that we have a chance to pass many
+        * existing messages before grabbing a reference to a location. This
+        * value cannot be produced after initialization.
+        */
+       if (unlikely(ofs == ~0)) {
+               ofs = 0;
+
+               HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+               ofs += ring->ofs;
+       }
+
+       /* we were already there, adjust the offset to be relative to
+        * the buffer's head and remove us from the counter.
+        */
+       ofs -= ring->ofs;
+       BUG_ON(ofs >= buf->size);
+       HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
+
+       /* in this loop, ofs always points to the counter byte that precedes
+        * the message so that we can take our reference there if we have to
+        * stop before the end (ret=0).
+        */
+       if (si_opposite(si)->state == SI_ST_EST) {
+               ret = 1;
+               while (ofs + 1 < b_data(buf)) {
+                       cnt = 1;
+                       len = b_peek_varint(buf, ofs + cnt, &msg_len);
+                       if (!len)
+                               break;
+                       cnt += len;
+                       BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
+
+                       chunk_reset(&trash);
+                       p = ulltoa(msg_len, trash.area, b_size(&trash));
+                       if (p) {
+                               trash.data = (p - trash.area) + 1;
+                               *p = ' ';
+                       }
+
+                       if (!p || (trash.data + msg_len > b_size(&trash))) {
+                               /* too large a message to ever fit, let's skip it */
+                               ofs += cnt + msg_len;
+                               continue;
+                       }
+
+                       trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
+
+                       if (ci_putchk(si_ic(si), &trash) == -1) {
+                               si_rx_room_blk(si);
+                               ret = 0;
+                               break;
+                       }
+                       ofs += cnt + msg_len;
+               }
+
+               HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+               ofs += ring->ofs;
+               sft->ofs = ofs;
+       }
+       HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+       if (ret) {
+               /* let's be woken up once new data arrive */
+               HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+               LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
+               HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+               si_rx_endp_done(si);
+       }
+       HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+
+       /* always drain data from server */
+       co_skip(si_oc(si), si_oc(si)->output);
+       return;
+
+close:
+       si_shutw(si);
+       si_shutr(si);
+       si_ic(si)->flags |= CF_READ_NULL;
+}
+
 void __sink_forward_session_deinit(struct sink_forward_target *sft)
 {
        struct stream_interface *si;
@@ -520,6 +664,13 @@ static struct applet sink_forward_applet = {
        .release = sink_forward_session_release,
 };
 
+static struct applet sink_forward_oc_applet = {
+       .obj_type = OBJ_TYPE_APPLET,
+       .name = "<SINKFWDOC>", /* used for logging */
+       .fct = sink_forward_oc_io_handler,
+       .release = sink_forward_session_release,
+};
+
 /*
  * Create a new peer session in assigned state (connect will start automatically)
  */
@@ -529,8 +680,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
        struct appctx *appctx;
        struct session *sess;
        struct stream *s;
+       struct applet *applet = &sink_forward_applet;
+
+       if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
+               applet = &sink_forward_oc_applet;
 
-       appctx = appctx_new(&sink_forward_applet, tid_bit);
+       appctx = appctx_new(applet, tid_bit);
        if (!appctx)
                goto out_close;