From 36469ce09a5e93c628d509ced8c8a58c455ee015 Mon Sep 17 00:00:00 2001 From: Lars Erik Wik Date: Mon, 11 Nov 2024 18:01:40 +0100 Subject: [PATCH] Added file stream API Added API for streaming a file over network contents using the RSYNC algorithm from librsync. Ticket: ENT-12414 Changelog: None Signed-off-by: Lars Erik Wik --- configure.ac | 23 +- libcfnet/Makefile.am | 3 +- libcfnet/file_stream.c | 884 ++++++++++++++++++++++++++++++++++++++++ libcfnet/file_stream.h | 97 +++++ libpromises/Makefile.am | 8 +- 5 files changed, 1006 insertions(+), 9 deletions(-) create mode 100644 libcfnet/file_stream.c create mode 100644 libcfnet/file_stream.h diff --git a/configure.ac b/configure.ac index 96609095da..6029ae0841 100644 --- a/configure.ac +++ b/configure.ac @@ -521,6 +521,21 @@ CF3_WITH_LIBRARY(pcre2, [ )] ) +dnl librsync + +AC_ARG_WITH([librsync], [AS_HELP_STRING([--with-librsync[[=PATH]]], [Spefify librsync path])], [], [with_librsync=yes]) + +if test "x$with_librsync" = "xno"; then + AC_MSG_ERROR([librsync is required]) +fi + +CF3_WITH_LIBRARY(librsync, [ + AC_CHECK_HEADERS([librsync.h], [], AC_MSG_ERROR(Cannot find librsync)) + AC_CHECK_HEADERS([librsync_export.h], [], AC_MSG_ERROR(Cannot find librsync)) + AC_CHECK_LIB(rsync, rs_file_size, [], [AC_MSG_ERROR(Cannot find librsync)]) + ] +) + dnl defined for libntech AC_DEFINE(WITH_PCRE2, 1, [Define if PCRE2 is being used]) @@ -1317,10 +1332,10 @@ dnl ###################################################################### dnl Collect all the options dnl ###################################################################### -CORE_CPPFLAGS="$LMDB_CPPFLAGS $TOKYOCABINET_CPPFLAGS $QDBM_CPPFLAGS $PCRE2_CPPFLAGS $OPENSSL_CPPFLAGS $SQLITE3_CPPFLAGS $LIBACL_CPPFLAGS $LIBCURL_CPPFLAGS $LIBYAML_CPPFLAGS $POSTGRESQL_CPPFLAGS $MYSQL_CPPFLAGS $LIBXML2_CPPFLAGS $CPPFLAGS $CFECOMPAT_CPPFLAGS" -CORE_CFLAGS="$LMDB_CFLAGS $TOKYOCABINET_CFLAGS $QDBM_CFLAGS $PCRE2_CFLAGS $OPENSSL_CFLAGS $SQLITE3_CFLAGS $LIBACL_CFLAGS $LIBCURL_CFLAGS $LIBYAML_CFLAGS $POSTGRESQL_CFLAGS $MYSQL_CFLAGS $LIBXML2_CFLAGS $CFLAGS" -CORE_LDFLAGS="$LMDB_LDFLAGS $TOKYOCABINET_LDFLAGS $QDBM_LDFLAGS $PCRE2_LDFLAGS $OPENSSL_LDFLAGS $SQLITE3_LDFLAGS $LIBACL_LDFLAGS $LIBCURL_LDFLAGS $LIBYAML_LDFLAGS $POSTGRESQL_LDFLAGS $MYSQL_LDFLAGS $LIBXML2_LDFLAGS $LDFLAGS" -CORE_LIBS="$LMDB_LIBS $TOKYOCABINET_LIBS $QDBM_LIBS $PCRE2_LIBS $OPENSSL_LIBS $SQLITE3_LIBS $LIBACL_LIBS $LIBCURL_LIBS $LIBYAML_LIBS $POSTGRESQL_LIBS $MYSQL_LIBS $LIBXML2_LIBS $LIBS" +CORE_CPPFLAGS="$LMDB_CPPFLAGS $TOKYOCABINET_CPPFLAGS $QDBM_CPPFLAGS $PCRE2_CPPFLAGS $OPENSSL_CPPFLAGS $SQLITE3_CPPFLAGS $LIBACL_CPPFLAGS $LIBCURL_CPPFLAGS $LIBRSYNC_CPPFLAGS $LIBYAML_CPPFLAGS $POSTGRESQL_CPPFLAGS $MYSQL_CPPFLAGS $LIBXML2_CPPFLAGS $CPPFLAGS $CFECOMPAT_CPPFLAGS" +CORE_CFLAGS="$LMDB_CFLAGS $TOKYOCABINET_CFLAGS $QDBM_CFLAGS $PCRE2_CFLAGS $OPENSSL_CFLAGS $SQLITE3_CFLAGS $LIBACL_CFLAGS $LIBCURL_CFLAGS $LIBRSYNC_CFLAGS $LIBYAML_CFLAGS $POSTGRESQL_CFLAGS $MYSQL_CFLAGS $LIBXML2_CFLAGS $CFLAGS" +CORE_LDFLAGS="$LMDB_LDFLAGS $TOKYOCABINET_LDFLAGS $QDBM_LDFLAGS $PCRE2_LDFLAGS $OPENSSL_LDFLAGS $SQLITE3_LDFLAGS $LIBACL_LDFLAGS $LIBCURL_LDFLAGS $LIBRSYNC_LDFLAGS $LIBYAML_LDFLAGS $POSTGRESQL_LDFLAGS $MYSQL_LDFLAGS $LIBXML2_LDFLAGS $LDFLAGS" +CORE_LIBS="$LMDB_LIBS $TOKYOCABINET_LIBS $QDBM_LIBS $PCRE2_LIBS $OPENSSL_LIBS $SQLITE3_LIBS $LIBACL_LIBS $LIBCURL_LIBS $LIBRSYNC_LIBS $LIBYAML_LIBS $POSTGRESQL_LIBS $MYSQL_LIBS $LIBXML2_LIBS $LIBS" dnl ###################################################################### dnl Make them available to subprojects. diff --git a/libcfnet/Makefile.am b/libcfnet/Makefile.am index 3e5cff8619..ddb5e50f1d 100644 --- a/libcfnet/Makefile.am +++ b/libcfnet/Makefile.am @@ -46,4 +46,5 @@ libcfnet_la_SOURCES = \ server_code.c server_code.h \ stat_cache.c stat_cache.h \ tls_client.c tls_client.h \ - tls_generic.c tls_generic.h + tls_generic.c tls_generic.h \ + file_stream.c file_stream.h diff --git a/libcfnet/file_stream.c b/libcfnet/file_stream.c new file mode 100644 index 0000000000..4b0702229a --- /dev/null +++ b/libcfnet/file_stream.c @@ -0,0 +1,884 @@ +/* + Copyright 2024 Northern.tech AS + + This file is part of CFEngine 3 - written and maintained by Northern.tech AS. + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; version 3. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA + + To the extent this program is licensed as part of the Enterprise + versions of CFEngine, the applicable Commercial Open Source License + (COSL) may apply to this file if you as a licensee so wish it. See + included file COSL.txt. +*/ + +#include + +#include "file_stream.h" + +#include +#include +#include +#include +#include +#include + +/*********************************************************/ +/* Network protocol */ +/*********************************************************/ + +/** + * @brief Simple network protocol on top of SSL/TCP. Used for client-server + * communication during RSYNC stream. + * + * @details Header format: + * +----------+----------+----------+----------+ + * | SDU Len. | Reserved | EOF Flag | ERR Flag | + * +----------+----------+----------+----------+ + * | 12 bits | 2 bits | 1 bit | 1 bit | + * +----------+----------+----------+----------+ + * + * The header fields are defined as follows: + * SDU Length Length of the SDU (i.e. payload) encapsulated within + * this datagram. + * Reserved 2 bits reserved for future use. + * End-of-File flag Signals whether or not the receiver should expect to + * receive more datagrams. + * Error flag Signals that the transmission must be canceled due to + * unexpected error. + * + * @note If the End-of-File flag is set, there may still be data to process in + * in the payload. If the Error flag is set, there may be an error + * message in the payload. + */ +#define HEADER_SIZE ((int) sizeof(uint16_t)) + +/** + * @note The TLS Generic API requires that the message length is less than + * CF_BUFSIZE. Furthermore, the protocol can only handle up to 4095 + * Bytes, because it's the largest unsigned integer you can represent + * with 12 bits (2^12 - 1 = 4095). + */ +#define MESSAGE_SIZE MIN(CF_BUFSIZE - 1, 4095) + +/** + * @brief Send a message using the file stream protocol + * @warning You probably want to use SendMessage() or SendError() instead + * + * @param conn The SSL connection object + * @param msg The message to send + * @param len The length of the message to send (must be less or equal to + * MESSAGE_SIZE Bytes) + * @param eof Set to true if this is the last message in a transaction, + * otherwise false + * @param err Set to true if transaction must be canceled (e.g., due to an + * unexpected error), otherwise false + * @return true on success, otherwise false + */ +static bool __SendMessage( + SSL *conn, const char *msg, size_t len, bool eof, bool err) +{ + assert(conn != NULL); + assert(msg != NULL); + assert(len <= MESSAGE_SIZE); + + /* Set message length */ + uint16_t header = len << 4; + + /* Set Error flag */ + if (err) + { + header |= (1 << 0); + } + + /* Set End-of-File flag */ + if (eof) + { + header |= (1 << 1); + } + + /* Send header */ + header = htons(header); + int ret = TLSSend(conn, (char *) &header, HEADER_SIZE); + if (ret != HEADER_SIZE) + { + Log(LOG_LEVEL_ERR, + "Failed to send message header during file stream (%d != %d)", + ret, + HEADER_SIZE); + return false; + } + + if (len > 0) + { + /* Send payload */ + ret = TLSSend(conn, msg, len); + if (ret != (int) len) + { + Log(LOG_LEVEL_ERR, + "Failed to send message payload during file stream (%d != %d)", + ret, + HEADER_SIZE); + return false; + } + } + + return true; +} + +/** + * @brief Send a message using the file stream protocol + * + * @param conn The SSL connection object + * @param msg The message to send + * @param len The length of the message to send (must be less or equal to + * MESSAGE_SIZE Bytes) + * @param eof Set to true if this is the last message in a transaction, + * otherwise false + * @return true on success, otherwise false + */ +static inline bool SendMessage( + SSL *conn, const char *msg, size_t len, bool eof) +{ + assert(conn != NULL); + assert(msg != NULL); + + return __SendMessage(conn, msg, len, eof, false); +} + +/** + * @brief Receive a message using the file stream protocol + * + * @param conn The SSL connection object + * @param msg The message receive buffer (must be MESSAGE_SIZE Bytes large) + * @param len The length of the reveived message + * @param eof Is set to true if this was the last message in the transaction + * @return true on success, otherwise false + * + * @note RecvMessage fails if the communication is broken or if we received an + * error from the remote host. In both cases, we should not try to flush + * the stream. + */ +static bool RecvMessage(SSL *conn, char *msg, size_t *len, bool *eof) +{ + assert(conn != NULL); + assert(msg != NULL); + assert(len != NULL); + assert(eof != NULL); + + /* TLSRecv() expects a buffer this size */ + char recv_buffer[CF_BUFSIZE]; + + /* Receive header */ + uint16_t header; + int ret = TLSRecv(conn, recv_buffer, HEADER_SIZE); + if (ret != HEADER_SIZE) + { + Log(LOG_LEVEL_ERR, + "Failed to receive message header during file stream (%d != %d)", + ret, + HEADER_SIZE); + return false; + } + memcpy(&header, recv_buffer, HEADER_SIZE); + header = ntohs(header); + + /* Extract Error flag */ + bool err = header & (1 << 0); + + /* Extract End-of-File flag */ + *eof = header & (1 << 1); + + /* Extract message length */ + *len = header >> 4; + + /* Read payload */ + if (*len > 0) + { + ret = TLSRecv(conn, recv_buffer, *len); + if (ret != *len) + { + Log(LOG_LEVEL_ERR, + "Failed to receive message payload during file stream (%d != %d)", + ret, + HEADER_SIZE); + return false; + } + memcpy(msg, recv_buffer, *len); + + if (err) + { + /* If the error flag is set, then a payload contains an error + * message in the form of a NUL-byte terminated string. */ + Log(LOG_LEVEL_ERR, "Remote file stream error: %s", msg); + } + } + + return !err; +} + +/** + * @brief Flush the file stream + * + * It's used to prevent the remote host from blocking while sending the + * remaining data after we have experienced an unexpected error and need to + * abort the file stream. Once the stream has been successfully flushed, the + * remote host will be ready to receive our error message. + * + * @param conn The SSL connection object + * @return true on success, otherwise false + */ +static bool FlushStream(SSL *conn) +{ + assert(conn != NULL); + + char msg[MESSAGE_SIZE]; + size_t len; + bool eof; + while (RecvMessage(conn, msg, &len, &eof)) + { + if (eof) + { + return true; + } + } + + Log(LOG_LEVEL_ERR, "Remote file stream error: %s", msg); + return false; +} + +/** + * @brief Send an error message using the file stream protocol + * + * @param conn The SSL connection object + * @param flush Whether or not to flush the stream (see FlushStream()) + * @param fmt The format string + * @param ... The format string arguments + * @return true on success, otherwise false + */ +static bool SendError(SSL *conn, bool flush, const char *fmt, ...) +{ + assert(conn != NULL); + assert(fmt != NULL); + + va_list ap; + char msg[MESSAGE_SIZE]; + + va_start(ap, fmt); + int len = vsnprintf(msg, MESSAGE_SIZE, fmt, ap); + va_end(ap); + + assert(len >= 0); /* Let's make sure we detect this in debug builds */ + if (len < 0) + { + Log(LOG_LEVEL_ERR, + "Failed to format error message during file stream"); + len = 0; /* We still want to send the header */ + } + else if (len >= MESSAGE_SIZE) + { + Log(LOG_LEVEL_WARNING, + "Error message truncated during file stream (%d >= %d)", + len, + MESSAGE_SIZE); + /* Add ... to indicate message truncation */ + msg[MESSAGE_SIZE - 1] = '.'; + msg[MESSAGE_SIZE - 2] = '.'; + msg[MESSAGE_SIZE - 3] = '.'; + len = MESSAGE_SIZE; + } + + if (flush && FlushStream(conn)) + { + return __SendMessage(conn, msg, (size_t) len, false, true); + } + + return true; +} + +/*********************************************************/ +/* Server specific */ +/*********************************************************/ + +#define ERROR_MSG_UNSPECIFIED_SERVER_REFUSAL "Unspecified server refusal" +#define ERROR_MSG_INTERNAL_SERVER_ERROR "Internal server error" + +bool FileStreamRefuse(SSL *conn) +{ + return SendError(conn, false, ERROR_MSG_UNSPECIFIED_SERVER_REFUSAL); +} + +/** + * @brief Receive and load signature into memory + * + * @param conn The SSL connection object + * @param sig The signature of the outdated file + * @return true on success, otherwise false + */ +static bool RecvSignature(SSL *conn, rs_signature_t **sig) +{ + assert(conn != NULL); + assert(sig != NULL); + + /* The input buffer has to be twice the message size, so that if can fit a + * new message, as well as some tail data from the last job iteration */ + char in_buf[MESSAGE_SIZE * 2]; + + /* Start a job for loading a signature into memory */ + rs_job_t *job = rs_loadsig_begin(sig); + if (job == NULL) + { + SendError(conn, true, ERROR_MSG_INTERNAL_SERVER_ERROR); + return false; + } + + /* Setup buffers for the job */ + rs_buffers_t bufs = {0}; + + rs_result res; + do + { + /* Fill input buffers */ + if (bufs.eof_in == 0) + { + if (bufs.avail_in > MESSAGE_SIZE) + { + /* The job requires more data, but we cannot fit another + * message into the input buffer */ + Log(LOG_LEVEL_ERR, + "Insufficient buffer capacity to receive file stream signature"); + SendError(conn, true, ERROR_MSG_INTERNAL_SERVER_ERROR); + + rs_job_free(job); + return false; + } + + if (bufs.avail_in > 0) + { + /* Move leftover tail data to the front of the buffer */ + memmove(in_buf, bufs.next_in, bufs.avail_in); + } + + size_t n_bytes; + bool eof; + if (!RecvMessage(conn, in_buf + bufs.avail_in, &n_bytes, &eof)) + { + /* Error is already logged */ + rs_job_free(job); + return false; + } + + bufs.eof_in = eof ? 1 : 0; + bufs.next_in = in_buf; + bufs.avail_in += n_bytes; + } + + /* Iterate job */ + res = rs_job_iter(job, &bufs); + if (res != RS_DONE && res != RS_BLOCKED) + { + SendError(conn, bufs.eof_in == 0, ERROR_MSG_INTERNAL_SERVER_ERROR); + rs_job_free(job); + return false; + } + } while (res != RS_DONE); + + rs_job_free(job); + + return true; +} + +/** + * @brief Compute and send delta based on the source file and the signature of + * the basis file + * + * @param conn The SSL connection object + * @param sig The signature of the basis file + * @param filename The name of the source file + * @return true on success, otherwise false + */ +static bool SendDelta(SSL *conn, rs_signature_t *sig, const char *filename) +{ + assert(conn != NULL); + assert(sig != NULL); + assert(filename != NULL); + + /* In this case, the input buffer does not need to be twice the message + * size, because we can control how much we read into it */ + char in_buf[MESSAGE_SIZE], out_buf[MESSAGE_SIZE]; + + /* Open source file */ + FILE *file = safe_fopen(filename, "rb"); + if (file == NULL) + { + Log(LOG_LEVEL_ERR, + "Failed to open the source file '%s' for computing delta during file stream: %s", + filename, + GetErrorStr()); + SendError(conn, false, ERROR_MSG_UNSPECIFIED_SERVER_REFUSAL); + return false; + } + + /* Build hash table */ + rs_result res = rs_build_hash_table(sig); + if (res != RS_DONE) + { + SendError(conn, false, ERROR_MSG_INTERNAL_SERVER_ERROR); + fclose(file); + return false; + } + + /* Start generating delta */ + rs_job_t *job = rs_delta_begin(sig); + if (job == NULL) + { + SendError(conn, false, ERROR_MSG_INTERNAL_SERVER_ERROR); + fclose(file); + return false; + } + + /* Setup buffers for the job */ + rs_buffers_t bufs = {0}; + bufs.next_out = out_buf; + bufs.avail_out = MESSAGE_SIZE; /* We cannot send more using the protocol */ + + do + { + /* Fill input buffers */ + if (bufs.eof_in == 0) + { + if (bufs.avail_in >= MESSAGE_SIZE) + { + /* The job requires more data, but the input buffer is full */ + Log(LOG_LEVEL_ERR, + "Insufficient buffer capacity to compute delta"); + SendError(conn, false, ERROR_MSG_INTERNAL_SERVER_ERROR); + + fclose(file); + rs_job_free(job); + return false; + } + + if (bufs.avail_in > 0) + { + /* Move leftover tail data to the front of the buffer */ + memmove(in_buf, bufs.next_in, bufs.avail_in); + } + + size_t n_bytes = fread( + in_buf + bufs.avail_in, + 1 /* Byte */, + sizeof(in_buf) - bufs.avail_in, + file); + if (n_bytes == 0) + { + if (ferror(file)) + { + Log(LOG_LEVEL_ERR, + "Failed to read the source file '%s' during file stream: %s", + filename, + GetErrorStr()); + SendError(conn, false, ERROR_MSG_INTERNAL_SERVER_ERROR); + + fclose(file); + rs_job_free(job); + return false; + } + + /* End-of-File reached */ + bufs.eof_in = feof(file); + assert(bufs.eof_in != 0); + } + + bufs.next_in = in_buf; + bufs.avail_in += n_bytes; + } + + /* Iterate job */ + res = rs_job_iter(job, &bufs); + if (res != RS_DONE && res != RS_BLOCKED) + { + SendError(conn, false, ERROR_MSG_INTERNAL_SERVER_ERROR); + + fclose(file); + rs_job_free(job); + return false; + } + + /* Drain output buffer, if there is data */ + size_t present = bufs.next_out - out_buf; + if (present > 0) + { + assert(present <= MESSAGE_SIZE); + if (!SendMessage(conn, out_buf, present, res == RS_DONE)) + { + fclose(file); + rs_job_free(job); + return false; + } + + bufs.next_out = out_buf; + bufs.avail_out = MESSAGE_SIZE; + } + } while (res != RS_DONE); + + fclose(file); + rs_job_free(job); + + return true; +} + +bool FileStreamServe(SSL *conn, const char *filename) +{ + assert(conn != NULL); + assert(filename != NULL); + + Log(LOG_LEVEL_VERBOSE, + "Receiving- & loading signature into memory for file '%s'...", + filename); + rs_signature_t *sig; + if (!RecvSignature(conn, &sig)) + { + /* Error is already logged */ + return false; + } + + Log(LOG_LEVEL_VERBOSE, + "Computing- & sending delta for file '%s'...", + filename); + if (!SendDelta(conn, sig, filename)) + { + /* Error is already logged */ + rs_free_sumset(sig); + return false; + } + + rs_free_sumset(sig); + return true; +} + +/*********************************************************/ +/* Client specific */ +/*********************************************************/ + +#define ERROR_MSG_INTERNAL_CLIENT_ERROR "Internal client error" + +/** + * @brief Compute and send a signature of the basis file + * + * @param conn The SSL connection object + * @param filename The name of the basis file + * @return true on success, otherwise false + */ +static bool SendSignature(SSL *conn, const char *filename) +{ + assert(conn != NULL); + assert(filename != NULL); + + /* In this case, the input buffer does not need to be twice the message + * size, because we can control how much we read into it */ + char in_buf[MESSAGE_SIZE], out_buf[MESSAGE_SIZE]; + + /* Open basis file */ + FILE *file = safe_fopen(filename, "rb"); + if (file == NULL) + { + Log(LOG_LEVEL_ERR, + "Failed to open the basis file '%s' for computing delta during file stream: %s", + filename, + GetErrorStr()); + SendError(conn, false, ERROR_MSG_INTERNAL_CLIENT_ERROR); + return false; + } + + /* Get file size */ + rs_long_t fsize = rs_file_size(file); + + /* Get recommended arguments */ + rs_magic_number sig_magic = 0; + size_t block_len = 0, strong_len = 0; + rs_result res = rs_sig_args(fsize, &sig_magic, &block_len, &strong_len); + if (res != RS_DONE) + { + SendError(conn, false, ERROR_MSG_INTERNAL_CLIENT_ERROR); + fclose(file); + return false; + } + + /* Start generating signature */ + rs_job_t *job = rs_sig_begin(block_len, strong_len, sig_magic); + if (job == NULL) + { + SendError(conn, false, ERROR_MSG_INTERNAL_CLIENT_ERROR); + fclose(file); + return false; + } + + /* Setup buffers */ + rs_buffers_t bufs = {0}; + bufs.next_out = out_buf; + bufs.avail_out = MESSAGE_SIZE; /* We cannot send more using the protocol */ + + do + { + if (bufs.eof_in == 0) + { + if (bufs.avail_in >= MESSAGE_SIZE) + { + /* The job requires more data, but the input buffer is full */ + Log(LOG_LEVEL_ERR, + "Insufficient buffer capacity to compute delta"); + SendError(conn, false, ERROR_MSG_INTERNAL_CLIENT_ERROR); + + fclose(file); + rs_job_free(job); + return false; + } + + if (bufs.avail_in > 0) + { + /* Move leftover tail data to the front of the buffer */ + memmove(in_buf, bufs.next_in, bufs.avail_in); + } + + /* Fill input buffer */ + size_t n_bytes = fread( + in_buf + bufs.avail_in, + 1 /* Byte */, + sizeof(in_buf) - bufs.avail_in, + file); + if (n_bytes == 0) + { + if (ferror(file)) + { + Log(LOG_LEVEL_ERR, + "Failed to read the basis file '%s' during file stream: %s", + filename, + GetErrorStr()); + SendError(conn, false, ERROR_MSG_INTERNAL_CLIENT_ERROR); + + fclose(file); + rs_job_free(job); + return false; + } + + /* End-of-File reached */ + bufs.eof_in = feof(file); + assert(bufs.eof_in != 0); + } + + bufs.next_in = in_buf; + bufs.avail_in += n_bytes; + } + + /* Iterate job */ + res = rs_job_iter(job, &bufs); + if (res != RS_DONE && res != RS_BLOCKED) + { + SendError(conn, false, ERROR_MSG_INTERNAL_CLIENT_ERROR); + + rs_file_close(file); + rs_job_free(job); + return false; + } + + /* Drain output buffer, if there is data */ + size_t present = bufs.next_out - out_buf; + if (present > 0) + { + assert(present <= MESSAGE_SIZE); + if (!SendMessage(conn, out_buf, present, res == RS_DONE)) + { + fclose(file); + rs_job_free(job); + return false; + } + + bufs.next_out = out_buf; + bufs.avail_out = MESSAGE_SIZE; + } + } while (res != RS_DONE); + + fclose(file); + rs_job_free(job); + + return true; +} + +/** + * @brief Receive delta and apply patch to the outdated copy of the file + * + * @param conn The SSL connection object + * @param basis The name of basis file + * @param dest The name of destination file + * @param perms The desired file permissions of the destination file + * @return true on success, otherwise false + */ +static bool RecvDelta(SSL *conn, const char *basis, const char *dest, mode_t perms) +{ + assert(conn != NULL); + assert(basis != NULL); + assert(dest != NULL); + + /* The input buffer has to be twice the message size, so that if can fit a + * new message, as well as some tail data from the last job iteration */ + char in_buf[MESSAGE_SIZE * 2], out_buf[MESSAGE_SIZE]; + + /* Open/create the destination file */ + FILE *new = safe_fopen_create_perms(dest, "wb", perms); + if (new == NULL) + { + Log(LOG_LEVEL_ERR, + "Failed to open/create destination file '%s': %s", + dest, + GetErrorStr()); + SendError(conn, true, ERROR_MSG_INTERNAL_CLIENT_ERROR); + return false; + } + + /* Open the basis file */ + FILE *old = old = safe_fopen(basis, "rb"); + if (old == NULL) + { + Log(LOG_LEVEL_ERR, + "Failed to open basis file '%s': %s", + basis, + GetErrorStr()); + SendError(conn, true, ERROR_MSG_INTERNAL_CLIENT_ERROR); + return false; + } + + /* Start a job for patching destination file */ + rs_job_t *job = rs_patch_begin(rs_file_copy_cb, old); + if (job == NULL) + { + SendError(conn, true, ERROR_MSG_INTERNAL_CLIENT_ERROR); + return false; + } + + /* Setup buffers for the job */ + rs_buffers_t bufs = {0}; + bufs.next_out = out_buf; + bufs.avail_out = sizeof(out_buf); + + rs_result res; + do + { + /* Fill input buffers */ + if (bufs.eof_in == 0) + { + if (bufs.avail_in > MESSAGE_SIZE) + { + /* The job requires more data, but we cannot fit another + * message into the input buffer */ + Log(LOG_LEVEL_ERR, + "Insufficient buffer capacity to receive file stream delta"); + SendError(conn, true, ERROR_MSG_INTERNAL_CLIENT_ERROR); + + fclose(new); + fclose(old); + rs_job_free(job); + return false; + } + + if (bufs.avail_in > 0) + { + /* Move leftover tail data to the front of the buffer */ + memmove(in_buf, bufs.next_in, bufs.avail_in); + } + + size_t n_bytes; + bool eof; + if (!RecvMessage(conn, in_buf + bufs.avail_in, &n_bytes, &eof)) + { + /* Error is already logged */ + fclose(new); + fclose(old); + rs_job_free(job); + return false; + } + + bufs.eof_in = eof ? 1 : 0; + bufs.next_in = in_buf; + bufs.avail_in += n_bytes; + } + + res = rs_job_iter(job, &bufs); + if (res != RS_DONE && res != RS_BLOCKED) + { + SendError(conn, bufs.eof_in == 0, ERROR_MSG_INTERNAL_CLIENT_ERROR); + + fclose(new); + fclose(old); + rs_job_free(job); + return false; + } + + /* Drain output buffer, if there is data */ + size_t present = bufs.next_out - out_buf; + if (present > 0) + { + size_t n_bytes = fwrite(out_buf, 1 /* Byte */, present, new); + if (n_bytes == 0) + { + Log(LOG_LEVEL_ERR, + "Failed to write to destination file '%s' during file stream: %s", + dest, + GetErrorStr()); + SendError( + conn, bufs.eof_in == 0, ERROR_MSG_INTERNAL_CLIENT_ERROR); + + fclose(new); + fclose(old); + rs_job_free(job); + return false; + } + + bufs.next_out = out_buf; + bufs.avail_out = sizeof(out_buf); + } + } while (res != RS_DONE); + + fclose(new); + fclose(old); + rs_job_free(job); + + return true; +} + +bool FileStreamFetch(SSL *conn, const char *basis, const char *dest, mode_t perms) +{ + assert(conn != NULL); + assert(basis != NULL); + assert(dest != NULL); + + Log(LOG_LEVEL_VERBOSE, + "Computing- & sending signature of file '%s'...", + basis); + if (!SendSignature(conn, basis)) + { + /* Error is already logged */ + return false; + } + + Log(LOG_LEVEL_VERBOSE, + "Receiving delta & applying patch to file '%s'...", + dest); + if (!RecvDelta(conn, basis, dest, perms)) + { + /* Error is already logged */ + return false; + } + + return true; +} diff --git a/libcfnet/file_stream.h b/libcfnet/file_stream.h new file mode 100644 index 0000000000..ca88da6a1f --- /dev/null +++ b/libcfnet/file_stream.h @@ -0,0 +1,97 @@ +/* + Copyright 2024 Northern.tech AS + + This file is part of CFEngine 3 - written and maintained by Northern.tech AS. + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; version 3. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA + + To the extent this program is licensed as part of the Enterprise + versions of CFEngine, the applicable Commercial Open Source License + (COSL) may apply to this file if you as a licensee so wish it. See + included file COSL.txt. +*/ + +#ifndef FILE_STREAM_H +#define FILE_STREAM_H + +/** + * @file file_stream.h + * + * +---------+ +---------+ + * | client | | server | + * +----+----+ +----+----+ + * | | + * | sig = Compute(basis) | + * | | + * | Send(sig) ---------------->| sig = Recv() + * | | + * | | sig = BuildHashTable(sig) + * | | + * | | delta = Compute(sig, src) + * | | + * | delta = Recv() <-----------| Send(delta) + * | | + * | dest = Patch(delta, basis) | + * | | + * v v + * + * 1. Client generates a signature of the basis file (i.e., the "outdated" + * file) + * 2. Client sends signature to server + * 4. Server builds a hash table from the signature + * 5. Server generates delta from signature and the source file (i.e., the + * "up-to-date" file) + * 6. Server sends delta to client + * 7. Client applies delta on contents of the basis file in order to create + * the destination file + * + */ + +#include +#include +#include /* mode_t */ + + +/** + * @brief Reply with unspecified server refusal + * + * E.g., use this function to when the resource does not exist or access is + * denied. We don't disinguish between these two for security reasons. + * + * @param conn The SSL connection object + * @return true on success, otherwise false + */ +bool FileStreamRefuse(SSL *conn); + +/** + * @brief Serve a file using the stream API + * + * @param conn The SSL connection object + * @param filename The name of the source file + * @return true on success, otherwise false + */ +bool FileStreamServe(SSL *conn, const char *filename); + +/** + * @brief Fetch a file using the stream API + * + * @param conn The SSL connection object + * @param basis The name of the basis file + * @param dest The name of the destination file + * @param perms The desired permissions of the destination file + * @return true on success, otherwise false + */ +bool FileStreamFetch(SSL *conn, const char *basis, const char *dest, mode_t perms); + +#endif // FILE_STREAM_H diff --git a/libpromises/Makefile.am b/libpromises/Makefile.am index 4162972d18..b2123440de 100644 --- a/libpromises/Makefile.am +++ b/libpromises/Makefile.am @@ -30,7 +30,7 @@ AM_LDFLAGS = endif AM_LDFLAGS += $(CORE_LDFLAGS) $(LMDB_LDFLAGS) $(TOKYOCABINET_LDFLAGS) $(QDBM_LDFLAGS) \ - $(PCRE2_LDFLAGS) $(OPENSSL_LDFLAGS) $(SQLITE3_LDFLAGS) $(LIBACL_LDFLAGS) $(LIBYAML_LDFLAGS) $(LIBCURL_LDFLAGS) + $(PCRE2_LDFLAGS) $(OPENSSL_LDFLAGS) $(SQLITE3_LDFLAGS) $(LIBACL_LDFLAGS) $(LIBYAML_LDFLAGS) $(LIBCURL_LDFLAGS) $(LIBRSYNC_LDFLAGS) AM_CPPFLAGS = \ -I$(srcdir)/../libntech/libutils -I$(srcdir)/../libcfecompat \ @@ -39,16 +39,16 @@ AM_CPPFLAGS = \ -I$(srcdir)/../cf-check \ $(CORE_CPPFLAGS) $(ENTERPRISE_CPPFLAGS) \ $(LMDB_CPPFLAGS) $(TOKYOCABINET_CPPFLAGS) $(QDBM_CPPFLAGS) \ - $(PCRE2_CPPFLAGS) $(OPENSSL_CPPFLAGS) $(SQLITE3_CPPFLAGS) $(LIBACL_CPPFLAGS) $(LIBYAML_CPPFLAGS) $(LIBCURL_CPPFLAGS) + $(PCRE2_CPPFLAGS) $(OPENSSL_CPPFLAGS) $(SQLITE3_CPPFLAGS) $(LIBACL_CPPFLAGS) $(LIBYAML_CPPFLAGS) $(LIBCURL_CPPFLAGS) $(LIBRSYNC_CPPFLAGS) AM_CFLAGS = $(CORE_CFLAGS) $(ENTERPRISE_CFLAGS) \ $(LMDB_CFLAGS) $(TOKYOCABINET_CFLAGS) $(QDBM_CFLAGS) \ - $(PCRE2_CFLAGS) $(OPENSSL_CFLAGS) $(SQLITE3_CFLAGS) $(LIBACL_CFLAGS) $(LIBYAML_CFLAGS) $(LIBCURL_CFLAGS) + $(PCRE2_CFLAGS) $(OPENSSL_CFLAGS) $(SQLITE3_CFLAGS) $(LIBACL_CFLAGS) $(LIBYAML_CFLAGS) $(LIBCURL_CFLAGS) $(LIBRSYNC_CFLAGS) AM_YFLAGS = -d LIBS = $(LMDB_LIBS) $(TOKYOCABINET_LIBS) $(QDBM_LIBS) \ - $(PCRE2_LIBS) $(OPENSSL_LIBS) $(SQLITE3_LIBS) $(LIBACL_LIBS) $(LIBYAML_LIBS) $(LIBCURL_LIBS) + $(PCRE2_LIBS) $(OPENSSL_LIBS) $(SQLITE3_LIBS) $(LIBACL_LIBS) $(LIBYAML_LIBS) $(LIBCURL_LIBS) $(LIBRSYNC_LIBS) # The lib providing sd_listen_fds() is not needed in libpromises, it's actually # needed in libcfnet. But adding it here is an easy way to make sure it's