Skip to content

Commit

Permalink
Implement proposed k8s-stream-file format
Browse files Browse the repository at this point in the history
Instead of parsing the contents of the data read from the `stdout` and
`stderr` pipes, this commit adds support for a "stream" format, named
`k8s-stream-file`, which just records what is read from a pipe to disk.

It significantly saves on CPU spend processing the buffer read, uses only
2 I/O vectors, and never touches the memory read from the pipe.

This is an updated implementation of
cri-o/cri-o#1605.
  • Loading branch information
portante committed Jun 1, 2021
1 parent 3161452 commit 3257f6a
Showing 1 changed file with 136 additions and 5 deletions.
141 changes: 136 additions & 5 deletions src/ctr_logging.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ static inline int sd_journal_sendv(G_GNUC_UNUSED const struct iovec *iov, G_GNUC
/* Different types of container logging */
static gboolean use_journald_logging = FALSE;
static gboolean use_k8s_logging = FALSE;
static gboolean use_k8s_stream_logging = FALSE;

/* Value the user must input for each log driver */
static const char *const K8S_FILE_STRING = "k8s-file";
static const char *const K8S_STREAM_FILE_STRING = "k8s-stream-file";
static const char *const JOURNALD_FILE_STRING = "journald";

/* Max log size for any log file types */
Expand Down Expand Up @@ -64,6 +66,7 @@ static void parse_log_path(char *log_config);
static const char *stdpipe_name(stdpipe_t pipe);
static int write_journald(int pipe, char *buf, ssize_t num_read);
static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen);
static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen);
static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen);
static ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len);
static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf);
Expand Down Expand Up @@ -134,9 +137,10 @@ void configure_log_drivers(gchar **log_drivers, int64_t log_size_max_, char *cuu
* parse_log_path branches on log driver type the user inputted.
* log_config will either be a ':' delimited string containing:
* <DRIVER_NAME>:<PATH_NAME> or <PATH_NAME>
* in the case of no colon, the driver will be kubernetes-log-file,
* in the case the log driver is 'k8s-stream-file', the <PATH_NAME> must be present.
* in the case of no colon, the driver will be k8s-file,
* in the case the log driver is 'journald', the <PATH_NAME> is ignored.
* exits with error if <DRIVER_NAME> isn't 'journald' or 'kubernetes-log-file'
* exits with error if <DRIVER_NAME> isn't 'journald', 'k8s-file', or 'k8s-stream-file'.
*/
static void parse_log_path(char *log_config)
{
Expand Down Expand Up @@ -165,6 +169,17 @@ static void parse_log_path(char *log_config)
return;
}

// Driver is k8s-file, k8s-stream-file, or empty
if (!strcmp(driver, K8S_STREAM_FILE_STRING)) {
if (path == NULL) {
nexitf("k8s-stream-file requires a filename");
}
use_k8s_logging = TRUE;
use_k8s_stream_logging = TRUE;
k8s_log_path = path;
return;
}

// Driver is k8s-file or empty
if (!strcmp(driver, K8S_FILE_STRING)) {
if (path == NULL) {
Expand All @@ -188,9 +203,18 @@ static void parse_log_path(char *log_config)
/* write container output to all logs the user defined */
bool write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read)
{
if (use_k8s_logging && write_k8s_log(pipe, buf, num_read) < 0) {
nwarn("write_k8s_log failed");
return G_SOURCE_CONTINUE;
if (use_k8s_logging) {
if (use_k8s_stream_logging) {
if (write_k8s_log(pipe, buf, num_read) < 0) {
nwarn("write_k8s_log failed");
return G_SOURCE_CONTINUE;
}
} else {
if (write_k8s_stream_log(pipe, buf, num_read) < 0) {
nwarn("write_k8s_stream_log failed");
return G_SOURCE_CONTINUE;
}
}
}
if (use_journald_logging && write_journald(pipe, buf, num_read) < 0) {
nwarn("write_journald failed");
Expand Down Expand Up @@ -540,3 +564,110 @@ void sync_logs(void)
if (fsync(k8s_log_fd) < 0)
pwarn("Failed to sync log file before exit");
}


/* strlen("1997-03-25T13:20:42.999999999+01:00 stdout 9999999999 999999999 ") + 1 */
#define TSSTREAMBUFLEN 128

/*
* PROPOSED: CRI Stream Format, variable length file format
*/
static int set_k8s_stream_timestamp(char *buf, ssize_t bufsiz, ssize_t *tsbuflen, const char *pipename, uint64_t offset, ssize_t buflen,
ssize_t *btbw)
{
char off_sign = '+';
int off, len, err = -1;

struct timespec ts;
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
/* If CLOCK_REALTIME is not supported, we set nano seconds to 0 */
if (errno == EINVAL) {
ts.tv_nsec = 0;
} else {
return err;
}
}

struct tm current_tm;
if (localtime_r(&ts.tv_sec, &current_tm) == NULL)
return err;

off = (int)current_tm.tm_gmtoff;
if (current_tm.tm_gmtoff < 0) {
off_sign = '-';
off = -off;
}

len = snprintf(buf, bufsiz, "%d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %s %lud %ld ", current_tm.tm_year + 1900,
current_tm.tm_mon + 1, current_tm.tm_mday, current_tm.tm_hour, current_tm.tm_min, current_tm.tm_sec, ts.tv_nsec,
off_sign, off / 3600, off % 3600, pipename, offset, buflen);

if (len < bufsiz)
err = 0;

*tsbuflen = len;
*btbw = len + buflen;
return err;
}


/*
* PROPOSED: CRI Stream Format, variable length file format
*
* %d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %(stream)s %(offset)lud %(buflen)ld %(buf)s
*
* The CRI stream fromat requires us to write each buffer read with a
* timestamp, stream, length (human readable ascii), and the buffer contents
* read (with a space character separating the buffer length string from the
* buffer.
*/
static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen)
{
writev_buffer_t bufv = {0};
char tsbuf[TSSTREAMBUFLEN];
static ssize_t bytes_written = 0;
static uint64_t offset = 0;
ssize_t bytes_to_be_written = 0;
ssize_t tsbuflen = 0;

/*
* Use the same timestamp for every line of the log in this buffer.
* There is no practical difference in the output since write(2) is
* fast.
*/
if (set_k8s_stream_timestamp(tsbuf, sizeof tsbuf, &tsbuflen, stdpipe_name(pipe), buflen, offset, &bytes_to_be_written))
/* TODO: We should handle failures much more cleanly than this. */
return -1;

/*
* We re-open the log file if writing out the bytes will exceed the max
* log size. We also reset the state so that the new file is started with
* a timestamp.
*/
if ((opt_log_size_max > 0) && (bytes_written + bytes_to_be_written) > opt_log_size_max) {
bytes_written = 0;

reopen_k8s_file();
}

/* Output the timestamp, stream, and length */
if (writev_buffer_append_segment(k8s_log_fd, &bufv, tsbuf, tsbuflen) < 0) {
nwarn("failed to write (timestamp, stream) to log");
goto stream_next;
}

/* Output the actual contents. */
if (writev_buffer_append_segment(k8s_log_fd, &bufv, buf, buflen) < 0) {
nwarn("failed to write buffer to log");
}

stream_next:
bytes_written += bytes_to_be_written;
offset += (uint64_t)bytes_to_be_written;

if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) {
nwarn("failed to flush buffer to log");
}

return 0;
}

0 comments on commit 3257f6a

Please sign in to comment.