aboutsummaryrefslogtreecommitdiffstats
path: root/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch
diff options
context:
space:
mode:
Diffstat (limited to 'meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch')
-rw-r--r--meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch494
1 files changed, 0 insertions, 494 deletions
diff --git a/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch b/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch
deleted file mode 100644
index 42e181bb1f..0000000000
--- a/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch
+++ /dev/null
@@ -1,494 +0,0 @@
-From 23e80b75508187baaa823a68ea019b72e0b2305c Mon Sep 17 00:00:00 2001
-From: Budai Laszlo <lbudai@balabit.hu>
-Date: Tue, 12 Nov 2013 13:19:04 +0100
-Subject: [PATCH] afsql: afsql_dd_insert_db() refactor
-
-Upstream-Status: Backport
-
-A lot of the code that was previously in afsql_dd_insert_db() have been
-extracted to smaller functions, and afsql_dd_insert_db() was rebuilt on
-top of these. At the same time, memory leaks were plugged, and in case
-of a transaction error, backlog rewinding has been fixed too, to not
-loose messages since the last BEGIN command.
-
-Signed-off-by: Juhasz Viktor <jviktor@balabit.hu>
-Signed-off-by: Laszlo Budai <lbudai@balabit.hu>
----
- modules/afsql/afsql.c | 301 ++++++++++++++++++++++++++++++++------------------
- 1 file changed, 192 insertions(+), 109 deletions(-)
-
-diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c
-index 12f6aab..a6a8190 100644
---- a/modules/afsql/afsql.c
-+++ b/modules/afsql/afsql.c
-@@ -456,24 +456,21 @@ afsql_dd_create_index(AFSqlDestDriver *s
- *
- * NOTE: This function can only be called from the database thread.
- **/
--static GString *
--afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg)
-+static gboolean
-+afsql_dd_validate_table(AFSqlDestDriver *self, GString *table)
- {
-- GString *query_string, *table;
-+ GString *query_string;
- dbi_result db_res;
- gboolean success = FALSE;
- gint i;
-
-- table = g_string_sized_new(32);
-- log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
--
- if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES)
-- return table;
-+ return TRUE;
-
- afsql_dd_check_sql_identifier(table->str, TRUE);
-
- if (g_hash_table_lookup(self->validated_tables, table->str))
-- return table;
-+ return TRUE;
-
- query_string = g_string_sized_new(32);
- g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str);
-@@ -544,14 +541,9 @@ afsql_dd_validate_table(AFSqlDestDriver
- /* we have successfully created/altered the destination table, record this information */
- g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE));
- }
-- else
-- {
-- g_string_free(table, TRUE);
-- table = NULL;
-- }
- g_string_free(query_string, TRUE);
-
-- return table;
-+ return success;
- }
-
- /**
-@@ -581,6 +573,20 @@ afsql_dd_begin_txn(AFSqlDestDriver *self
- }
-
- /**
-+ * afsql_dd_handle_transaction_error:
-+ *
-+ * Handle errors inside during a SQL transaction (e.g. INSERT or COMMIT failures).
-+ *
-+ * NOTE: This function can only be called from the database thread.
-+ **/
-+static void
-+afsql_dd_handle_transaction_error(AFSqlDestDriver *self)
-+{
-+ log_queue_rewind_backlog(self->queue);
-+ self->flush_lines_queued = 0;
-+}
-+
-+/**
- * afsql_dd_begin_txn:
- *
- * Commit SQL transaction.
-@@ -596,14 +602,14 @@ afsql_dd_commit_txn(AFSqlDestDriver *sel
- if (success)
- {
- log_queue_ack_backlog(self->queue, self->flush_lines_queued);
-+ self->flush_lines_queued = 0;
- }
- else
- {
-- msg_notice("SQL transaction commit failed, rewinding backlog and starting again",
-- NULL);
-- log_queue_rewind_backlog(self->queue);
-+ msg_error("SQL transaction commit failed, rewinding backlog and starting again",
-+ NULL);
-+ afsql_dd_handle_transaction_error(self);
- }
-- self->flush_lines_queued = 0;
- return success;
- }
-
-@@ -644,12 +650,13 @@ afsql_dd_set_dbd_opt_numeric(gpointer ke
- }
-
- static gboolean
--afsql_dd_connect(AFSqlDestDriver *self)
-+afsql_dd_ensure_initialized_connection(AFSqlDestDriver *self)
- {
- if (self->dbi_ctx)
- return TRUE;
-
- self->dbi_ctx = dbi_conn_new(self->type);
-+
- if (!self->dbi_ctx)
- {
- msg_error("No such DBI driver",
-@@ -659,10 +666,12 @@ afsql_dd_connect(AFSqlDestDriver *self)
- }
-
- dbi_conn_set_option(self->dbi_ctx, "host", self->host);
-+
- if (strcmp(self->type, "mysql"))
- dbi_conn_set_option(self->dbi_ctx, "port", self->port);
- else
- dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port));
-+
- dbi_conn_set_option(self->dbi_ctx, "username", self->user);
- dbi_conn_set_option(self->dbi_ctx, "password", self->password);
- dbi_conn_set_option(self->dbi_ctx, "dbname", self->database);
-@@ -691,6 +700,7 @@ afsql_dd_connect(AFSqlDestDriver *self)
- evt_tag_str("database", self->database),
- evt_tag_str("error", dbi_error),
- NULL);
-+
- return FALSE;
- }
-
-@@ -713,104 +723,145 @@ afsql_dd_connect(AFSqlDestDriver *self)
- return TRUE;
- }
-
--static gboolean
--afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg,
-- LogPathOptions *path_options)
-+static GString *
-+afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg)
- {
-- if (self->failed_message_counter < self->num_retries - 1)
-- {
-- log_queue_push_head(self->queue, msg, path_options);
--
-- /* database connection status sanity check after failed query */
-- if (dbi_conn_ping(self->dbi_ctx) != 1)
-- {
-- const gchar *dbi_error;
--
-- dbi_conn_error(self->dbi_ctx, &dbi_error);
-- msg_error("Error, no SQL connection after failed query attempt",
-- evt_tag_str("type", self->type),
-- evt_tag_str("host", self->host),
-- evt_tag_str("port", self->port),
-- evt_tag_str("username", self->user),
-- evt_tag_str("database", self->database),
-- evt_tag_str("error", dbi_error),
-- NULL);
-- return FALSE;
-- }
-+ GString *table = g_string_sized_new(32);
-+ log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
-
-- self->failed_message_counter++;
-- return FALSE;
-+ if (!afsql_dd_validate_table(self, table))
-+ {
-+ /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
-+ msg_error("Error checking table, disconnecting from database, trying again shortly",
-+ evt_tag_int("time_reopen", self->time_reopen),
-+ NULL);
-+ g_string_free(table, TRUE);
-+ return NULL;
- }
-
-- msg_error("Multiple failures while inserting this record into the database, message dropped",
-- evt_tag_int("attempts", self->num_retries),
-- NULL);
-- stats_counter_inc(self->dropped_messages);
-- log_msg_drop(msg, path_options);
-- self->failed_message_counter = 0;
-- return TRUE;
-+ return table;
- }
-
- static GString *
--afsql_dd_construct_query(AFSqlDestDriver *self, GString *table,
-- LogMessage *msg)
-+afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table)
- {
-- GString *value;
-- GString *query_string;
-- gint i;
-+ GString *insert_command = g_string_sized_new(256);
-+ GString *value = g_string_sized_new(512);
-+ gint i, j;
-
-- value = g_string_sized_new(256);
-- query_string = g_string_sized_new(512);
-+ g_string_printf(insert_command, "INSERT INTO %s (", table->str);
-
-- g_string_printf(query_string, "INSERT INTO %s (", table->str);
- for (i = 0; i < self->fields_len; i++)
- {
-- g_string_append(query_string, self->fields[i].name);
-- if (i != self->fields_len - 1)
-- g_string_append(query_string, ", ");
-+ if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
-+ {
-+ g_string_append(insert_command, self->fields[i].name);
-+
-+ j = i + 1;
-+ while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
-+ j++;
-+
-+ if (j < self->fields_len)
-+ g_string_append(insert_command, ", ");
-+ }
- }
-- g_string_append(query_string, ") VALUES (");
-+
-+ g_string_append(insert_command, ") VALUES (");
-
- for (i = 0; i < self->fields_len; i++)
- {
- gchar *quoted;
-
-- if (self->fields[i].value == NULL)
-- {
-- /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */
-- g_string_append(query_string, "DEFAULT");
-- }
-- else
-+ if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
- {
- log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value);
--
- if (self->null_value && strcmp(self->null_value, value->str) == 0)
- {
-- g_string_append(query_string, "NULL");
-+ g_string_append(insert_command, "NULL");
- }
- else
- {
- dbi_conn_quote_string_copy(self->dbi_ctx, value->str, &quoted);
- if (quoted)
- {
-- g_string_append(query_string, quoted);
-+ g_string_append(insert_command, quoted);
- free(quoted);
- }
- else
- {
-- g_string_append(query_string, "''");
-+ g_string_append(insert_command, "''");
- }
- }
-- }
-
-- if (i != self->fields_len - 1)
-- g_string_append(query_string, ", ");
-+ j = i + 1;
-+ while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
-+ j++;
-+ if (j < self->fields_len)
-+ g_string_append(insert_command, ", ");
-+ }
- }
-- g_string_append(query_string, ")");
-+
-+ g_string_append(insert_command, ")");
-
- g_string_free(value, TRUE);
-
-- return query_string;
-+ return insert_command;
-+}
-+
-+static inline gboolean
-+afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self)
-+{
-+ return self->flush_lines_queued != -1;
-+}
-+
-+static inline gboolean
-+afsql_dd_should_start_new_transaction(const AFSqlDestDriver *self)
-+{
-+ return self->flush_lines_queued == 0;
-+}
-+
-+static inline gboolean
-+afsql_dd_should_commit_transaction(const AFSqlDestDriver *self)
-+{
-+ return afsql_dd_is_transaction_handling_enabled(self) && self->flush_lines_queued == self->flush_lines;
-+}
-+
-+static inline gboolean
-+afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self,
-+ LogMessage *msg,
-+ LogPathOptions *path_options)
-+{
-+ const gchar *dbi_error, *error_message;
-+
-+ if (dbi_conn_ping(self->dbi_ctx) == 1)
-+ {
-+ log_queue_push_head(self->queue, msg, path_options);
-+ return TRUE;
-+ }
-+
-+ if (afsql_dd_is_transaction_handling_enabled(self))
-+ {
-+ error_message = "SQL connection lost in the middle of a transaction,"
-+ " rewinding backlog and starting again";
-+ afsql_dd_handle_transaction_error(self);
-+ }
-+ else
-+ {
-+ error_message = "Error, no SQL connection after failed query attempt";
-+ log_queue_push_head(self->queue, msg, path_options);
-+ }
-+
-+ dbi_conn_error(self->dbi_ctx, &dbi_error);
-+ msg_error(error_message,
-+ evt_tag_str("type", self->type),
-+ evt_tag_str("host", self->host),
-+ evt_tag_str("port", self->port),
-+ evt_tag_str("username", self->user),
-+ evt_tag_str("database", self->database),
-+ evt_tag_str("error", dbi_error),
-+ NULL);
-+
-+ return FALSE;
- }
-
- /**
-@@ -824,61 +875,93 @@ afsql_dd_construct_query(AFSqlDestDriver
- static gboolean
- afsql_dd_insert_db(AFSqlDestDriver *self)
- {
-- GString *table, *query_string;
-+ GString *table = NULL;
-+ GString *insert_command = NULL;
- LogMessage *msg;
- gboolean success;
- LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
-
-- afsql_dd_connect(self);
-+ if (!afsql_dd_ensure_initialized_connection(self))
-+ return FALSE;
-
-- success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE);
-+ /* connection established, try to insert a message */
-+ success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, self->flags & AFSQL_DDF_EXPLICIT_COMMITS);
- if (!success)
- return TRUE;
-
- msg_set_context(msg);
-
-- table = afsql_dd_validate_table(self, msg);
-+ table = afsql_dd_ensure_accessible_database_table(self, msg);
-+
- if (!table)
- {
-- /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
-- msg_error("Error checking table, disconnecting from database, trying again shortly",
-- evt_tag_int("time_reopen", self->time_reopen),
-- NULL);
-- msg_set_context(NULL);
-- g_string_free(table, TRUE);
-- return afsql_dd_insert_fail_handler(self, msg, &path_options);
-+ success = FALSE;
-+ goto out;
- }
-
-- query_string = afsql_dd_construct_query(self, table, msg);
-+ if (afsql_dd_should_start_new_transaction(self) && !afsql_dd_begin_txn(self))
-+ {
-+ success = FALSE;
-+ goto out;
-+ }
-
-- if (self->flush_lines_queued == 0 && !afsql_dd_begin_txn(self))
-- return FALSE;
-+ insert_command = afsql_dd_build_insert_command(self, msg, table);
-+ success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL);
-
-- success = afsql_dd_run_query(self, query_string->str, FALSE, NULL);
- if (success && self->flush_lines_queued != -1)
- {
- self->flush_lines_queued++;
-
-- if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self))
-- return FALSE;
-+ if (afsql_dd_should_commit_transaction(self) && !afsql_dd_commit_txn(self))
-+ {
-+ /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_txn() */
-+
-+ g_string_free(insert_command, TRUE);
-+ msg_set_context(NULL);
-+
-+ return FALSE;
-+ }
- }
-
-- g_string_free(table, TRUE);
-- g_string_free(query_string, TRUE);
-+ out:
-+
-+ if (table != NULL)
-+ g_string_free(table, TRUE);
-+
-+ if (insert_command != NULL)
-+ g_string_free(insert_command, TRUE);
-
- msg_set_context(NULL);
-
-- if (!success)
-- return afsql_dd_insert_fail_handler(self, msg, &path_options);
-+ if (success)
-+ {
-+ log_msg_ack(msg, &path_options);
-+ log_msg_unref(msg);
-+ step_sequence_number(&self->seq_num);
-+ self->failed_message_counter = 0;
-+ }
-+ else
-+ {
-+ if (self->failed_message_counter < self->num_retries - 1)
-+ {
-+ if (!afsql_dd_handle_insert_row_error_depending_on_connection_availability(self, msg, &path_options))
-+ return FALSE;
-
-- /* we only ACK if each INSERT is a separate transaction */
-- if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0)
-- log_msg_ack(msg, &path_options);
-- log_msg_unref(msg);
-- step_sequence_number(&self->seq_num);
-- self->failed_message_counter = 0;
-+ self->failed_message_counter++;
-+ }
-+ else
-+ {
-+ msg_error("Multiple failures while inserting this record into the database, message dropped",
-+ evt_tag_int("attempts", self->num_retries),
-+ NULL);
-+ stats_counter_inc(self->dropped_messages);
-+ log_msg_drop(msg, &path_options);
-+ self->failed_message_counter = 0;
-+ success = TRUE;
-+ }
-+ }
-
-- return TRUE;
-+ return success;
- }
-
- static void
-@@ -895,7 +978,7 @@ afsql_dd_message_became_available_in_the
- static void
- afsql_dd_wait_for_suspension_wakeup(AFSqlDestDriver *self)
- {
-- /* we got suspended, probably because of a connection error,
-+ /* we got suspended, probably because of a connection error,
- * during this time we only get wakeups if we need to be
- * terminated. */
- if (!self->db_thread_terminate)
-@@ -974,8 +1057,7 @@ afsql_dd_database_thread(gpointer arg)
-
- afsql_dd_commit_txn(self);
- }
--
-- exit:
-+exit:
- afsql_dd_disconnect(self);
-
- msg_verbose("Database thread finished",
---
-1.8.4.1
-