aboutsummaryrefslogtreecommitdiffstats
path: root/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch
diff options
context:
space:
mode:
Diffstat (limited to 'meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch')
-rw-r--r--meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch494
1 files changed, 494 insertions, 0 deletions
diff --git a/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch b/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch
new file mode 100644
index 0000000000..42e181bb1f
--- /dev/null
+++ b/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch
@@ -0,0 +1,494 @@
+From 23e80b75508187baaa823a68ea019b72e0b2305c Mon Sep 17 00:00:00 2001
+From: Budai Laszlo <lbudai@balabit.hu>
+Date: Tue, 12 Nov 2013 13:19:04 +0100
+Subject: [PATCH] afsql: afsql_dd_insert_db() refactor
+
+Upstream-Status: Backport
+
+A lot of the code that was previously in afsql_dd_insert_db() have been
+extracted to smaller functions, and afsql_dd_insert_db() was rebuilt on
+top of these. At the same time, memory leaks were plugged, and in case
+of a transaction error, backlog rewinding has been fixed too, to not
+loose messages since the last BEGIN command.
+
+Signed-off-by: Juhasz Viktor <jviktor@balabit.hu>
+Signed-off-by: Laszlo Budai <lbudai@balabit.hu>
+---
+ modules/afsql/afsql.c | 301 ++++++++++++++++++++++++++++++++------------------
+ 1 file changed, 192 insertions(+), 109 deletions(-)
+
+diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c
+index 12f6aab..a6a8190 100644
+--- a/modules/afsql/afsql.c
++++ b/modules/afsql/afsql.c
+@@ -456,24 +456,21 @@ afsql_dd_create_index(AFSqlDestDriver *s
+ *
+ * NOTE: This function can only be called from the database thread.
+ **/
+-static GString *
+-afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg)
++static gboolean
++afsql_dd_validate_table(AFSqlDestDriver *self, GString *table)
+ {
+- GString *query_string, *table;
++ GString *query_string;
+ dbi_result db_res;
+ gboolean success = FALSE;
+ gint i;
+
+- table = g_string_sized_new(32);
+- log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
+-
+ if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES)
+- return table;
++ return TRUE;
+
+ afsql_dd_check_sql_identifier(table->str, TRUE);
+
+ if (g_hash_table_lookup(self->validated_tables, table->str))
+- return table;
++ return TRUE;
+
+ query_string = g_string_sized_new(32);
+ g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str);
+@@ -544,14 +541,9 @@ afsql_dd_validate_table(AFSqlDestDriver
+ /* we have successfully created/altered the destination table, record this information */
+ g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE));
+ }
+- else
+- {
+- g_string_free(table, TRUE);
+- table = NULL;
+- }
+ g_string_free(query_string, TRUE);
+
+- return table;
++ return success;
+ }
+
+ /**
+@@ -581,6 +573,20 @@ afsql_dd_begin_txn(AFSqlDestDriver *self
+ }
+
+ /**
++ * afsql_dd_handle_transaction_error:
++ *
++ * Handle errors inside during a SQL transaction (e.g. INSERT or COMMIT failures).
++ *
++ * NOTE: This function can only be called from the database thread.
++ **/
++static void
++afsql_dd_handle_transaction_error(AFSqlDestDriver *self)
++{
++ log_queue_rewind_backlog(self->queue);
++ self->flush_lines_queued = 0;
++}
++
++/**
+ * afsql_dd_begin_txn:
+ *
+ * Commit SQL transaction.
+@@ -596,14 +602,14 @@ afsql_dd_commit_txn(AFSqlDestDriver *sel
+ if (success)
+ {
+ log_queue_ack_backlog(self->queue, self->flush_lines_queued);
++ self->flush_lines_queued = 0;
+ }
+ else
+ {
+- msg_notice("SQL transaction commit failed, rewinding backlog and starting again",
+- NULL);
+- log_queue_rewind_backlog(self->queue);
++ msg_error("SQL transaction commit failed, rewinding backlog and starting again",
++ NULL);
++ afsql_dd_handle_transaction_error(self);
+ }
+- self->flush_lines_queued = 0;
+ return success;
+ }
+
+@@ -644,12 +650,13 @@ afsql_dd_set_dbd_opt_numeric(gpointer ke
+ }
+
+ static gboolean
+-afsql_dd_connect(AFSqlDestDriver *self)
++afsql_dd_ensure_initialized_connection(AFSqlDestDriver *self)
+ {
+ if (self->dbi_ctx)
+ return TRUE;
+
+ self->dbi_ctx = dbi_conn_new(self->type);
++
+ if (!self->dbi_ctx)
+ {
+ msg_error("No such DBI driver",
+@@ -659,10 +666,12 @@ afsql_dd_connect(AFSqlDestDriver *self)
+ }
+
+ dbi_conn_set_option(self->dbi_ctx, "host", self->host);
++
+ if (strcmp(self->type, "mysql"))
+ dbi_conn_set_option(self->dbi_ctx, "port", self->port);
+ else
+ dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port));
++
+ dbi_conn_set_option(self->dbi_ctx, "username", self->user);
+ dbi_conn_set_option(self->dbi_ctx, "password", self->password);
+ dbi_conn_set_option(self->dbi_ctx, "dbname", self->database);
+@@ -691,6 +700,7 @@ afsql_dd_connect(AFSqlDestDriver *self)
+ evt_tag_str("database", self->database),
+ evt_tag_str("error", dbi_error),
+ NULL);
++
+ return FALSE;
+ }
+
+@@ -713,104 +723,145 @@ afsql_dd_connect(AFSqlDestDriver *self)
+ return TRUE;
+ }
+
+-static gboolean
+-afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg,
+- LogPathOptions *path_options)
++static GString *
++afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg)
+ {
+- if (self->failed_message_counter < self->num_retries - 1)
+- {
+- log_queue_push_head(self->queue, msg, path_options);
+-
+- /* database connection status sanity check after failed query */
+- if (dbi_conn_ping(self->dbi_ctx) != 1)
+- {
+- const gchar *dbi_error;
+-
+- dbi_conn_error(self->dbi_ctx, &dbi_error);
+- msg_error("Error, no SQL connection after failed query attempt",
+- evt_tag_str("type", self->type),
+- evt_tag_str("host", self->host),
+- evt_tag_str("port", self->port),
+- evt_tag_str("username", self->user),
+- evt_tag_str("database", self->database),
+- evt_tag_str("error", dbi_error),
+- NULL);
+- return FALSE;
+- }
++ GString *table = g_string_sized_new(32);
++ log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
+
+- self->failed_message_counter++;
+- return FALSE;
++ if (!afsql_dd_validate_table(self, table))
++ {
++ /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
++ msg_error("Error checking table, disconnecting from database, trying again shortly",
++ evt_tag_int("time_reopen", self->time_reopen),
++ NULL);
++ g_string_free(table, TRUE);
++ return NULL;
+ }
+
+- msg_error("Multiple failures while inserting this record into the database, message dropped",
+- evt_tag_int("attempts", self->num_retries),
+- NULL);
+- stats_counter_inc(self->dropped_messages);
+- log_msg_drop(msg, path_options);
+- self->failed_message_counter = 0;
+- return TRUE;
++ return table;
+ }
+
+ static GString *
+-afsql_dd_construct_query(AFSqlDestDriver *self, GString *table,
+- LogMessage *msg)
++afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table)
+ {
+- GString *value;
+- GString *query_string;
+- gint i;
++ GString *insert_command = g_string_sized_new(256);
++ GString *value = g_string_sized_new(512);
++ gint i, j;
+
+- value = g_string_sized_new(256);
+- query_string = g_string_sized_new(512);
++ g_string_printf(insert_command, "INSERT INTO %s (", table->str);
+
+- g_string_printf(query_string, "INSERT INTO %s (", table->str);
+ for (i = 0; i < self->fields_len; i++)
+ {
+- g_string_append(query_string, self->fields[i].name);
+- if (i != self->fields_len - 1)
+- g_string_append(query_string, ", ");
++ if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
++ {
++ g_string_append(insert_command, self->fields[i].name);
++
++ j = i + 1;
++ while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
++ j++;
++
++ if (j < self->fields_len)
++ g_string_append(insert_command, ", ");
++ }
+ }
+- g_string_append(query_string, ") VALUES (");
++
++ g_string_append(insert_command, ") VALUES (");
+
+ for (i = 0; i < self->fields_len; i++)
+ {
+ gchar *quoted;
+
+- if (self->fields[i].value == NULL)
+- {
+- /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */
+- g_string_append(query_string, "DEFAULT");
+- }
+- else
++ if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
+ {
+ log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value);
+-
+ if (self->null_value && strcmp(self->null_value, value->str) == 0)
+ {
+- g_string_append(query_string, "NULL");
++ g_string_append(insert_command, "NULL");
+ }
+ else
+ {
+ dbi_conn_quote_string_copy(self->dbi_ctx, value->str, &quoted);
+ if (quoted)
+ {
+- g_string_append(query_string, quoted);
++ g_string_append(insert_command, quoted);
+ free(quoted);
+ }
+ else
+ {
+- g_string_append(query_string, "''");
++ g_string_append(insert_command, "''");
+ }
+ }
+- }
+
+- if (i != self->fields_len - 1)
+- g_string_append(query_string, ", ");
++ j = i + 1;
++ while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
++ j++;
++ if (j < self->fields_len)
++ g_string_append(insert_command, ", ");
++ }
+ }
+- g_string_append(query_string, ")");
++
++ g_string_append(insert_command, ")");
+
+ g_string_free(value, TRUE);
+
+- return query_string;
++ return insert_command;
++}
++
++static inline gboolean
++afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self)
++{
++ return self->flush_lines_queued != -1;
++}
++
++static inline gboolean
++afsql_dd_should_start_new_transaction(const AFSqlDestDriver *self)
++{
++ return self->flush_lines_queued == 0;
++}
++
++static inline gboolean
++afsql_dd_should_commit_transaction(const AFSqlDestDriver *self)
++{
++ return afsql_dd_is_transaction_handling_enabled(self) && self->flush_lines_queued == self->flush_lines;
++}
++
++static inline gboolean
++afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self,
++ LogMessage *msg,
++ LogPathOptions *path_options)
++{
++ const gchar *dbi_error, *error_message;
++
++ if (dbi_conn_ping(self->dbi_ctx) == 1)
++ {
++ log_queue_push_head(self->queue, msg, path_options);
++ return TRUE;
++ }
++
++ if (afsql_dd_is_transaction_handling_enabled(self))
++ {
++ error_message = "SQL connection lost in the middle of a transaction,"
++ " rewinding backlog and starting again";
++ afsql_dd_handle_transaction_error(self);
++ }
++ else
++ {
++ error_message = "Error, no SQL connection after failed query attempt";
++ log_queue_push_head(self->queue, msg, path_options);
++ }
++
++ dbi_conn_error(self->dbi_ctx, &dbi_error);
++ msg_error(error_message,
++ evt_tag_str("type", self->type),
++ evt_tag_str("host", self->host),
++ evt_tag_str("port", self->port),
++ evt_tag_str("username", self->user),
++ evt_tag_str("database", self->database),
++ evt_tag_str("error", dbi_error),
++ NULL);
++
++ return FALSE;
+ }
+
+ /**
+@@ -824,61 +875,93 @@ afsql_dd_construct_query(AFSqlDestDriver
+ static gboolean
+ afsql_dd_insert_db(AFSqlDestDriver *self)
+ {
+- GString *table, *query_string;
++ GString *table = NULL;
++ GString *insert_command = NULL;
+ LogMessage *msg;
+ gboolean success;
+ LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
+
+- afsql_dd_connect(self);
++ if (!afsql_dd_ensure_initialized_connection(self))
++ return FALSE;
+
+- success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE);
++ /* connection established, try to insert a message */
++ success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, self->flags & AFSQL_DDF_EXPLICIT_COMMITS);
+ if (!success)
+ return TRUE;
+
+ msg_set_context(msg);
+
+- table = afsql_dd_validate_table(self, msg);
++ table = afsql_dd_ensure_accessible_database_table(self, msg);
++
+ if (!table)
+ {
+- /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
+- msg_error("Error checking table, disconnecting from database, trying again shortly",
+- evt_tag_int("time_reopen", self->time_reopen),
+- NULL);
+- msg_set_context(NULL);
+- g_string_free(table, TRUE);
+- return afsql_dd_insert_fail_handler(self, msg, &path_options);
++ success = FALSE;
++ goto out;
+ }
+
+- query_string = afsql_dd_construct_query(self, table, msg);
++ if (afsql_dd_should_start_new_transaction(self) && !afsql_dd_begin_txn(self))
++ {
++ success = FALSE;
++ goto out;
++ }
+
+- if (self->flush_lines_queued == 0 && !afsql_dd_begin_txn(self))
+- return FALSE;
++ insert_command = afsql_dd_build_insert_command(self, msg, table);
++ success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL);
+
+- success = afsql_dd_run_query(self, query_string->str, FALSE, NULL);
+ if (success && self->flush_lines_queued != -1)
+ {
+ self->flush_lines_queued++;
+
+- if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self))
+- return FALSE;
++ if (afsql_dd_should_commit_transaction(self) && !afsql_dd_commit_txn(self))
++ {
++ /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_txn() */
++
++ g_string_free(insert_command, TRUE);
++ msg_set_context(NULL);
++
++ return FALSE;
++ }
+ }
+
+- g_string_free(table, TRUE);
+- g_string_free(query_string, TRUE);
++ out:
++
++ if (table != NULL)
++ g_string_free(table, TRUE);
++
++ if (insert_command != NULL)
++ g_string_free(insert_command, TRUE);
+
+ msg_set_context(NULL);
+
+- if (!success)
+- return afsql_dd_insert_fail_handler(self, msg, &path_options);
++ if (success)
++ {
++ log_msg_ack(msg, &path_options);
++ log_msg_unref(msg);
++ step_sequence_number(&self->seq_num);
++ self->failed_message_counter = 0;
++ }
++ else
++ {
++ if (self->failed_message_counter < self->num_retries - 1)
++ {
++ if (!afsql_dd_handle_insert_row_error_depending_on_connection_availability(self, msg, &path_options))
++ return FALSE;
+
+- /* we only ACK if each INSERT is a separate transaction */
+- if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0)
+- log_msg_ack(msg, &path_options);
+- log_msg_unref(msg);
+- step_sequence_number(&self->seq_num);
+- self->failed_message_counter = 0;
++ self->failed_message_counter++;
++ }
++ else
++ {
++ msg_error("Multiple failures while inserting this record into the database, message dropped",
++ evt_tag_int("attempts", self->num_retries),
++ NULL);
++ stats_counter_inc(self->dropped_messages);
++ log_msg_drop(msg, &path_options);
++ self->failed_message_counter = 0;
++ success = TRUE;
++ }
++ }
+
+- return TRUE;
++ return success;
+ }
+
+ static void
+@@ -895,7 +978,7 @@ afsql_dd_message_became_available_in_the
+ static void
+ afsql_dd_wait_for_suspension_wakeup(AFSqlDestDriver *self)
+ {
+- /* we got suspended, probably because of a connection error,
++ /* we got suspended, probably because of a connection error,
+ * during this time we only get wakeups if we need to be
+ * terminated. */
+ if (!self->db_thread_terminate)
+@@ -974,8 +1057,7 @@ afsql_dd_database_thread(gpointer arg)
+
+ afsql_dd_commit_txn(self);
+ }
+-
+- exit:
++exit:
+ afsql_dd_disconnect(self);
+
+ msg_verbose("Database thread finished",
+--
+1.8.4.1
+