diff options
Diffstat (limited to 'meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch')
-rw-r--r-- | meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch | 494 |
1 files changed, 494 insertions, 0 deletions
diff --git a/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch b/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch new file mode 100644 index 0000000000..42e181bb1f --- /dev/null +++ b/meta-oe/recipes-support/syslog-ng/files/afsql-afsql_dd_insert_db-refactor.patch @@ -0,0 +1,494 @@ +From 23e80b75508187baaa823a68ea019b72e0b2305c Mon Sep 17 00:00:00 2001 +From: Budai Laszlo <lbudai@balabit.hu> +Date: Tue, 12 Nov 2013 13:19:04 +0100 +Subject: [PATCH] afsql: afsql_dd_insert_db() refactor + +Upstream-Status: Backport + +A lot of the code that was previously in afsql_dd_insert_db() have been +extracted to smaller functions, and afsql_dd_insert_db() was rebuilt on +top of these. At the same time, memory leaks were plugged, and in case +of a transaction error, backlog rewinding has been fixed too, to not +loose messages since the last BEGIN command. + +Signed-off-by: Juhasz Viktor <jviktor@balabit.hu> +Signed-off-by: Laszlo Budai <lbudai@balabit.hu> +--- + modules/afsql/afsql.c | 301 ++++++++++++++++++++++++++++++++------------------ + 1 file changed, 192 insertions(+), 109 deletions(-) + +diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c +index 12f6aab..a6a8190 100644 +--- a/modules/afsql/afsql.c ++++ b/modules/afsql/afsql.c +@@ -456,24 +456,21 @@ afsql_dd_create_index(AFSqlDestDriver *s + * + * NOTE: This function can only be called from the database thread. + **/ +-static GString * +-afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg) ++static gboolean ++afsql_dd_validate_table(AFSqlDestDriver *self, GString *table) + { +- GString *query_string, *table; ++ GString *query_string; + dbi_result db_res; + gboolean success = FALSE; + gint i; + +- table = g_string_sized_new(32); +- log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); +- + if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES) +- return table; ++ return TRUE; + + afsql_dd_check_sql_identifier(table->str, TRUE); + + if (g_hash_table_lookup(self->validated_tables, table->str)) +- return table; ++ return TRUE; + + query_string = g_string_sized_new(32); + g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str); +@@ -544,14 +541,9 @@ afsql_dd_validate_table(AFSqlDestDriver + /* we have successfully created/altered the destination table, record this information */ + g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE)); + } +- else +- { +- g_string_free(table, TRUE); +- table = NULL; +- } + g_string_free(query_string, TRUE); + +- return table; ++ return success; + } + + /** +@@ -581,6 +573,20 @@ afsql_dd_begin_txn(AFSqlDestDriver *self + } + + /** ++ * afsql_dd_handle_transaction_error: ++ * ++ * Handle errors inside during a SQL transaction (e.g. INSERT or COMMIT failures). ++ * ++ * NOTE: This function can only be called from the database thread. ++ **/ ++static void ++afsql_dd_handle_transaction_error(AFSqlDestDriver *self) ++{ ++ log_queue_rewind_backlog(self->queue); ++ self->flush_lines_queued = 0; ++} ++ ++/** + * afsql_dd_begin_txn: + * + * Commit SQL transaction. +@@ -596,14 +602,14 @@ afsql_dd_commit_txn(AFSqlDestDriver *sel + if (success) + { + log_queue_ack_backlog(self->queue, self->flush_lines_queued); ++ self->flush_lines_queued = 0; + } + else + { +- msg_notice("SQL transaction commit failed, rewinding backlog and starting again", +- NULL); +- log_queue_rewind_backlog(self->queue); ++ msg_error("SQL transaction commit failed, rewinding backlog and starting again", ++ NULL); ++ afsql_dd_handle_transaction_error(self); + } +- self->flush_lines_queued = 0; + return success; + } + +@@ -644,12 +650,13 @@ afsql_dd_set_dbd_opt_numeric(gpointer ke + } + + static gboolean +-afsql_dd_connect(AFSqlDestDriver *self) ++afsql_dd_ensure_initialized_connection(AFSqlDestDriver *self) + { + if (self->dbi_ctx) + return TRUE; + + self->dbi_ctx = dbi_conn_new(self->type); ++ + if (!self->dbi_ctx) + { + msg_error("No such DBI driver", +@@ -659,10 +666,12 @@ afsql_dd_connect(AFSqlDestDriver *self) + } + + dbi_conn_set_option(self->dbi_ctx, "host", self->host); ++ + if (strcmp(self->type, "mysql")) + dbi_conn_set_option(self->dbi_ctx, "port", self->port); + else + dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port)); ++ + dbi_conn_set_option(self->dbi_ctx, "username", self->user); + dbi_conn_set_option(self->dbi_ctx, "password", self->password); + dbi_conn_set_option(self->dbi_ctx, "dbname", self->database); +@@ -691,6 +700,7 @@ afsql_dd_connect(AFSqlDestDriver *self) + evt_tag_str("database", self->database), + evt_tag_str("error", dbi_error), + NULL); ++ + return FALSE; + } + +@@ -713,104 +723,145 @@ afsql_dd_connect(AFSqlDestDriver *self) + return TRUE; + } + +-static gboolean +-afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg, +- LogPathOptions *path_options) ++static GString * ++afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg) + { +- if (self->failed_message_counter < self->num_retries - 1) +- { +- log_queue_push_head(self->queue, msg, path_options); +- +- /* database connection status sanity check after failed query */ +- if (dbi_conn_ping(self->dbi_ctx) != 1) +- { +- const gchar *dbi_error; +- +- dbi_conn_error(self->dbi_ctx, &dbi_error); +- msg_error("Error, no SQL connection after failed query attempt", +- evt_tag_str("type", self->type), +- evt_tag_str("host", self->host), +- evt_tag_str("port", self->port), +- evt_tag_str("username", self->user), +- evt_tag_str("database", self->database), +- evt_tag_str("error", dbi_error), +- NULL); +- return FALSE; +- } ++ GString *table = g_string_sized_new(32); ++ log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); + +- self->failed_message_counter++; +- return FALSE; ++ if (!afsql_dd_validate_table(self, table)) ++ { ++ /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ ++ msg_error("Error checking table, disconnecting from database, trying again shortly", ++ evt_tag_int("time_reopen", self->time_reopen), ++ NULL); ++ g_string_free(table, TRUE); ++ return NULL; + } + +- msg_error("Multiple failures while inserting this record into the database, message dropped", +- evt_tag_int("attempts", self->num_retries), +- NULL); +- stats_counter_inc(self->dropped_messages); +- log_msg_drop(msg, path_options); +- self->failed_message_counter = 0; +- return TRUE; ++ return table; + } + + static GString * +-afsql_dd_construct_query(AFSqlDestDriver *self, GString *table, +- LogMessage *msg) ++afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table) + { +- GString *value; +- GString *query_string; +- gint i; ++ GString *insert_command = g_string_sized_new(256); ++ GString *value = g_string_sized_new(512); ++ gint i, j; + +- value = g_string_sized_new(256); +- query_string = g_string_sized_new(512); ++ g_string_printf(insert_command, "INSERT INTO %s (", table->str); + +- g_string_printf(query_string, "INSERT INTO %s (", table->str); + for (i = 0; i < self->fields_len; i++) + { +- g_string_append(query_string, self->fields[i].name); +- if (i != self->fields_len - 1) +- g_string_append(query_string, ", "); ++ if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) ++ { ++ g_string_append(insert_command, self->fields[i].name); ++ ++ j = i + 1; ++ while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) ++ j++; ++ ++ if (j < self->fields_len) ++ g_string_append(insert_command, ", "); ++ } + } +- g_string_append(query_string, ") VALUES ("); ++ ++ g_string_append(insert_command, ") VALUES ("); + + for (i = 0; i < self->fields_len; i++) + { + gchar *quoted; + +- if (self->fields[i].value == NULL) +- { +- /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */ +- g_string_append(query_string, "DEFAULT"); +- } +- else ++ if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) + { + log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value); +- + if (self->null_value && strcmp(self->null_value, value->str) == 0) + { +- g_string_append(query_string, "NULL"); ++ g_string_append(insert_command, "NULL"); + } + else + { + dbi_conn_quote_string_copy(self->dbi_ctx, value->str, "ed); + if (quoted) + { +- g_string_append(query_string, quoted); ++ g_string_append(insert_command, quoted); + free(quoted); + } + else + { +- g_string_append(query_string, "''"); ++ g_string_append(insert_command, "''"); + } + } +- } + +- if (i != self->fields_len - 1) +- g_string_append(query_string, ", "); ++ j = i + 1; ++ while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) ++ j++; ++ if (j < self->fields_len) ++ g_string_append(insert_command, ", "); ++ } + } +- g_string_append(query_string, ")"); ++ ++ g_string_append(insert_command, ")"); + + g_string_free(value, TRUE); + +- return query_string; ++ return insert_command; ++} ++ ++static inline gboolean ++afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self) ++{ ++ return self->flush_lines_queued != -1; ++} ++ ++static inline gboolean ++afsql_dd_should_start_new_transaction(const AFSqlDestDriver *self) ++{ ++ return self->flush_lines_queued == 0; ++} ++ ++static inline gboolean ++afsql_dd_should_commit_transaction(const AFSqlDestDriver *self) ++{ ++ return afsql_dd_is_transaction_handling_enabled(self) && self->flush_lines_queued == self->flush_lines; ++} ++ ++static inline gboolean ++afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self, ++ LogMessage *msg, ++ LogPathOptions *path_options) ++{ ++ const gchar *dbi_error, *error_message; ++ ++ if (dbi_conn_ping(self->dbi_ctx) == 1) ++ { ++ log_queue_push_head(self->queue, msg, path_options); ++ return TRUE; ++ } ++ ++ if (afsql_dd_is_transaction_handling_enabled(self)) ++ { ++ error_message = "SQL connection lost in the middle of a transaction," ++ " rewinding backlog and starting again"; ++ afsql_dd_handle_transaction_error(self); ++ } ++ else ++ { ++ error_message = "Error, no SQL connection after failed query attempt"; ++ log_queue_push_head(self->queue, msg, path_options); ++ } ++ ++ dbi_conn_error(self->dbi_ctx, &dbi_error); ++ msg_error(error_message, ++ evt_tag_str("type", self->type), ++ evt_tag_str("host", self->host), ++ evt_tag_str("port", self->port), ++ evt_tag_str("username", self->user), ++ evt_tag_str("database", self->database), ++ evt_tag_str("error", dbi_error), ++ NULL); ++ ++ return FALSE; + } + + /** +@@ -824,61 +875,93 @@ afsql_dd_construct_query(AFSqlDestDriver + static gboolean + afsql_dd_insert_db(AFSqlDestDriver *self) + { +- GString *table, *query_string; ++ GString *table = NULL; ++ GString *insert_command = NULL; + LogMessage *msg; + gboolean success; + LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; + +- afsql_dd_connect(self); ++ if (!afsql_dd_ensure_initialized_connection(self)) ++ return FALSE; + +- success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE); ++ /* connection established, try to insert a message */ ++ success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, self->flags & AFSQL_DDF_EXPLICIT_COMMITS); + if (!success) + return TRUE; + + msg_set_context(msg); + +- table = afsql_dd_validate_table(self, msg); ++ table = afsql_dd_ensure_accessible_database_table(self, msg); ++ + if (!table) + { +- /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ +- msg_error("Error checking table, disconnecting from database, trying again shortly", +- evt_tag_int("time_reopen", self->time_reopen), +- NULL); +- msg_set_context(NULL); +- g_string_free(table, TRUE); +- return afsql_dd_insert_fail_handler(self, msg, &path_options); ++ success = FALSE; ++ goto out; + } + +- query_string = afsql_dd_construct_query(self, table, msg); ++ if (afsql_dd_should_start_new_transaction(self) && !afsql_dd_begin_txn(self)) ++ { ++ success = FALSE; ++ goto out; ++ } + +- if (self->flush_lines_queued == 0 && !afsql_dd_begin_txn(self)) +- return FALSE; ++ insert_command = afsql_dd_build_insert_command(self, msg, table); ++ success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL); + +- success = afsql_dd_run_query(self, query_string->str, FALSE, NULL); + if (success && self->flush_lines_queued != -1) + { + self->flush_lines_queued++; + +- if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self)) +- return FALSE; ++ if (afsql_dd_should_commit_transaction(self) && !afsql_dd_commit_txn(self)) ++ { ++ /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_txn() */ ++ ++ g_string_free(insert_command, TRUE); ++ msg_set_context(NULL); ++ ++ return FALSE; ++ } + } + +- g_string_free(table, TRUE); +- g_string_free(query_string, TRUE); ++ out: ++ ++ if (table != NULL) ++ g_string_free(table, TRUE); ++ ++ if (insert_command != NULL) ++ g_string_free(insert_command, TRUE); + + msg_set_context(NULL); + +- if (!success) +- return afsql_dd_insert_fail_handler(self, msg, &path_options); ++ if (success) ++ { ++ log_msg_ack(msg, &path_options); ++ log_msg_unref(msg); ++ step_sequence_number(&self->seq_num); ++ self->failed_message_counter = 0; ++ } ++ else ++ { ++ if (self->failed_message_counter < self->num_retries - 1) ++ { ++ if (!afsql_dd_handle_insert_row_error_depending_on_connection_availability(self, msg, &path_options)) ++ return FALSE; + +- /* we only ACK if each INSERT is a separate transaction */ +- if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0) +- log_msg_ack(msg, &path_options); +- log_msg_unref(msg); +- step_sequence_number(&self->seq_num); +- self->failed_message_counter = 0; ++ self->failed_message_counter++; ++ } ++ else ++ { ++ msg_error("Multiple failures while inserting this record into the database, message dropped", ++ evt_tag_int("attempts", self->num_retries), ++ NULL); ++ stats_counter_inc(self->dropped_messages); ++ log_msg_drop(msg, &path_options); ++ self->failed_message_counter = 0; ++ success = TRUE; ++ } ++ } + +- return TRUE; ++ return success; + } + + static void +@@ -895,7 +978,7 @@ afsql_dd_message_became_available_in_the + static void + afsql_dd_wait_for_suspension_wakeup(AFSqlDestDriver *self) + { +- /* we got suspended, probably because of a connection error, ++ /* we got suspended, probably because of a connection error, + * during this time we only get wakeups if we need to be + * terminated. */ + if (!self->db_thread_terminate) +@@ -974,8 +1057,7 @@ afsql_dd_database_thread(gpointer arg) + + afsql_dd_commit_txn(self); + } +- +- exit: ++exit: + afsql_dd_disconnect(self); + + msg_verbose("Database thread finished", +-- +1.8.4.1 + |