adllm Insights logo adllm Insights logo

Fluentd Custom Parser for Proprietary Multi-Line Logs with Stack Traces

Published on by The adllm Team. Last modified: . Tags: Fluentd Custom Parser Multi-line Logs Stack Trace Log Parsing Ruby Proprietary Logs Data Collection

Fluentd is a powerful open-source data collector that allows for a unified logging layer across diverse systems. While it comes with a variety of built-in parsers for common log formats, dealing with proprietary log structures, especially those containing multi-line stack traces, often necessitates a custom solution. Standard parsers might incorrectly split stack traces into separate events, thereby losing crucial context essential for effective debugging and comprehensive analysis.

This article provides a detailed guide to creating a custom Fluentd parser plugin in Ruby. We’ll design and implement an ErrorLogParser capable of understanding a specific proprietary log format, correctly identifying log entry boundaries, extracting meaningful fields, and, most importantly, concatenating multi-line stack traces into a single, coherent log event. This ensures that your logging backend receives well-structured and complete error information.

The Challenge: Proprietary Logs and Multi-Line Stack Traces

Proprietary log formats, by their nature, do not adhere to common standards like JSON, Apache, or Syslog logs. When these unique formats also include multi-line stack traces (a common occurrence in languages such as Java, Python, C#, or Ruby when exceptions are logged), the parsing complexity increases significantly. Each line of a stack trace, if not handled correctly, can be misinterpreted by a generic parser as a new, independent log entry, severing its connection to the originating error.

Our primary goal is to build a Fluentd parser that robustly:

  1. Recognizes the beginning of a new log entry based on the distinct pattern of our specific proprietary format.
  2. Extracts key information from the primary log line, such as the timestamp, log level, thread identifier, source class/module, and the main error message.
  3. Identifies subsequent lines that form part of a stack trace associated with the primary error line.
  4. Bundles the main log information and the complete, concatenated stack trace into a single structured Fluentd event.

Defining Our Proprietary Log Format

For the purpose of this guide, let’s assume our proprietary application logs errors and other messages in the following general format:

1
2
3
4
LEVEL TIMESTAMP [THREAD_NAME] SOURCE_CLASS: MESSAGE
  optional stack trace line 1
  optional stack trace line 2
  ...

Here’s a concrete example illustrating this format:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
ERROR 2024-07-15T10:30:01.123Z [thread-app-1] com.example.Processor: \
  Critical error occurred during widget processing!
  at com.example.Processor.processWidget(Processor.java:152)
  at com.example.MainService.run(MainService.java:88)
INFO 2024-07-15T10:30:02.456Z [thread-io-2] com.example.NetworkUtil: Data packet sent successfully.
ERROR 2024-07-15T10:31:00.789Z [main] com.example.DatabaseConnector: Failed to connect to database.
  at com.example.DatabaseConnector.connectInternal(DatabaseConnector.java:95)
  Caused by: java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

The key characteristics to note for parsing are:

  • A primary log line always starts with a recognizable pattern (LEVEL, TIMESTAMP, etc.).
  • Stack trace lines are typically indented with whitespace or may start with specific keywords like “Caused by:”.

Designing the Custom Fluentd Parser

A Fluentd parser plugin is a Ruby class. It must inherit from Fluent::Plugin::Parser and implement a few key methods as defined in the Fluentd Plugin Development Guide.

1. Plugin Structure

Our custom parser, which we’ll name my_error_log, will be defined in a Ruby file (e.g., parser_my_error_log.rb). This file should be placed in Fluentd’s plugin directory.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# parser_my_error_log.rb
# This file should be placed in a directory like /etc/fluent/plugin
# or be part of a gem.

require 'fluent/plugin/parser'
require 'fluent/time' # Required for Fluent::TimeParser

module Fluent
  module Plugin
    class MyErrorLogParser < Parser
      # Register this parser with Fluentd under the name 'my_error_log'.
      # This name is used in the <parse @type ...> directive.
      Fluent::Plugin.register_parser('my_error_log', self)

      # Configuration parameters for the parser will be defined next.

      # The configure method is called by Fluentd to initialize the plugin.
      def configure(conf)
        super
        # Initialize parser based on configuration, e.g.,
        # compile regex patterns (though :regexp type does this),
        # set up TimeParser instance.
      end

      # The parse method contains the core logic for parsing log text.
      # It must yield time (Unix timestamp) and record (Hash) pairs.
      def parse(text)
        # Implementation details will follow.
      end
    end
  end
end

This basic structure allows Fluentd to discover and load our custom parser.

2. Configuration Parameters

To make our parser flexible and reusable, we’ll define several configuration parameters using config_param (detailed in the Plugin Development Guide).

  • log_start_regex: A regular expression to identify and capture fields from the first line of a log entry.
  • stack_trace_regex: A regular expression to identify lines that are part of a stack trace.
  • time_format: The format string for parsing the timestamp from the log, compatible with strptime.

We add these inside the MyErrorLogParser class:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
      # parser_my_error_log.rb (inside MyErrorLogParser class)

      # Regex to identify the start of a log entry and capture initial fields.
      # The :regexp type automatically compiles the string into a Regexp object.
      config_param :log_start_regex, :regexp
      
      # Regex to identify a line belonging to a stack trace.
      config_param :stack_trace_regex, :regexp
      
      # Time format for parsing the timestamp string from log lines.
      # Defaults to a common ISO8601-like format with milliseconds and Z.
      config_param :time_format, :string, default: '%Y-%m-%dT%H:%M:%S.%LZ'

3. The configure Method

The configure(conf) method is called once when Fluentd starts up. It’s the ideal place to initialize components like the Fluent::TimeParser.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
      # parser_my_error_log.rb (inside MyErrorLogParser class)

      def configure(conf)
        super # Calls the parent class's configure method

        # Initialize Fluent::TimeParser with the specified time_format.
        # This object will be used to convert time strings from logs
        # into Unix timestamps for Fluentd events.
        @time_parser = Fluent::TimeParser.new(resolve_time_format)
        
        # The regexes defined with :regexp type are already compiled.
        # We can access them directly as @log_start_regex and
        # @stack_trace_regex.
      end

      private

      # Helper method to resolve time format, allowing for future complexity
      # if needed (e.g., dynamic time format determination).
      def resolve_time_format
        @time_format
      end

4. The parse Method: Core Logic

The parse(text) method is where the actual parsing occurs. It receives a chunk of text (which could contain multiple log entries, separated by newlines) and must yield a time, record pair for each complete log message it identifies and processes. time is an integer (Unix timestamp), and record is a Ruby Hash.

Our parsing strategy:

  1. Iterate through each line of the input text.
  2. If a line matches log_start_regex:
    • If a previous multi-line message (an “active record”) was being built, yield it first.
    • Start a new active record, parse core fields (timestamp, level, message, etc.) from this line using named captures in the regex.
    • Parse the timestamp string into a Unix timestamp using @time_parser.
    • Initialize an empty buffer for its potential stack trace.
  3. If a line matches stack_trace_regex and there is an active record:
    • Append this line (typically stripped of leading/trailing whitespace) to the current active record’s stack trace buffer.
  4. If a line matches neither log_start_regex nor stack_trace_regex, and there is an active record:
    • This signifies the end of the current multi-line entry. Yield the active record.
    • The current line might be an unparseable line or a simple log line not matching our main error format. For this example, we’ll simply reset the active record state. Advanced handling could involve emitting it as a raw message.
  5. After iterating through all lines, if there’s any pending active record, yield it.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
      # parser_my_error_log.rb (inside MyErrorLogParser class)

      def parse(text)
        current_record = nil
        current_time = Fluent::EventTime.now # Default if time parsing fails
        stack_trace_buffer = []

        text.each_line do |line|
          line.chomp! # Remove trailing newline character

          match_start = @log_start_regex.match(line)

          if match_start
            # If a previous record was being built, finalize and yield it.
            finalize_and_yield_record(current_time, current_record, 
                                      stack_trace_buffer) if current_record
            
            # Start a new record based on the matched log_start_regex.
            begin
              # Parse timestamp using the configured @time_parser.
              current_time = @time_parser.parse(match_start['time'])
              current_record = {}
              # Populate record from named captures in the regex.
              match_start.names.each do |name|
                # Store captured value if it's not nil.
                current_record[name] = match_start[name] if match_start[name]
              end
              # Fluentd uses the yielded 'time'. The original 'time' field
              # from the log can be removed unless 'keep_time_key true'
              # is set in the <parse> configuration.
              current_record.delete('time') 
            rescue Fluent::TimeParser::TimeParseError => e
              # Log time parsing errors using the plugin's logger.
              log.warn "Failed to parse time: #{match_start['time']}", error: e
              current_record = nil # Invalidate record on parse failure.
              current_time = Fluent::EventTime.now # Fallback to current time.
            end
            stack_trace_buffer = [] # Reset buffer for the new record.
          elsif current_record && @stack_trace_regex.match(line)
            # Line matches stack_trace_regex and we have an active record.
            # Append it to the stack trace buffer.
            stack_trace_buffer << line.strip # Clean whitespace.
          elsif current_record
            # Line is not a new log start, nor a stack trace for current.
            # Assume current_record is complete. Finalize and yield.
            finalize_and_yield_record(current_time, current_record,
                                      stack_trace_buffer)
            current_record = nil # Reset state.
            stack_trace_buffer = []
            # Optionally, handle this 'unmatched' line (e.g., emit as raw).
            # log.debug "Unmatched line after active record: #{line}"
          else
            # Line doesn't match start and no current_record context.
            # This could be a non-error log or an unparseable line.
            # For this example, we skip it. Advanced: emit as raw message.
            # log.trace "Skipping unhandled line: #{line}"
          end
        end

        # After all lines are processed, yield any remaining active record.
        finalize_and_yield_record(current_time, current_record,
                                  stack_trace_buffer) if current_record
      end

      private

      # Helper method to finalize a record (add stack trace) and yield it.
      def finalize_and_yield_record(time, record, buffer)
        # Ensure there's a valid record and time to yield.
        return unless record && time 
        
        # Add the concatenated stack trace to the record if buffer not empty.
        unless buffer.empty?
          record['stack_trace'] = buffer.join("\n") # Newlines between lines.
        end
        yield time, record
      end

Note on 80-character line limit: The comments and code in the above block, and all subsequent code blocks, are formatted to adhere to the 80-character line limit. Longer lines of code and comments are broken logically.

Complete Custom Parser Plugin Code

Here is the consolidated parser_my_error_log.rb file content:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# fluent-plugin-my-error-log/lib/fluent/plugin/parser_my_error_log.rb
# Or, for simpler setups, /etc/fluent/plugin/parser_my_error_log.rb
require 'fluent/plugin/parser'
require 'fluent/time' # For Fluent::TimeParser and Fluent::EventTime

module Fluent
  module Plugin
    class MyErrorLogParser < Parser
      # Register this parser with Fluentd as 'my_error_log'.
      Fluent::Plugin.register_parser('my_error_log', self)

      # Regex to identify the start of a log entry and capture fields.
      config_param :log_start_regex, :regexp
      # Regex to identify lines that are part of a stack trace.
      config_param :stack_trace_regex, :regexp
      # Time format for parsing timestamps from logs.
      config_param :time_format, :string, default: '%Y-%m-%dT%H:%M:%S.%LZ'

      def configure(conf)
        super
        @time_parser = Fluent::TimeParser.new(resolve_time_format)
      end

      def parse(text)
        current_record = nil
        # Default to current time if log's time is unparseable.
        current_time = Fluent::EventTime.now 
        stack_trace_buffer = []

        text.each_line do |line|
          line.chomp! # Remove trailing newline

          match_start = @log_start_regex.match(line)

          if match_start # Line indicates a new log entry
            finalize_and_yield_record(current_time, current_record, 
                                      stack_trace_buffer) if current_record
            
            begin
              current_time = @time_parser.parse(match_start['time'])
              current_record = {}
              match_start.names.each do |name|
                current_record[name] = match_start[name] if match_start[name]
              end
              current_record.delete('time') # Fluentd uses yielded 'time'
            rescue Fluent::TimeParser::TimeParseError => e
              log.warn "Time parse error: #{match_start['time']}", error: e
              current_record = nil # Invalidate on time parse failure
              current_time = Fluent::EventTime.now # Fallback time
            end
            stack_trace_buffer = [] # Reset for the new record
          elsif current_record && @stack_trace_regex.match(line)
            # Line is part of a stack trace for the active record
            stack_trace_buffer << line.strip
          elsif current_record
            # Line is not a new log, nor stack trace; current record ends.
            finalize_and_yield_record(current_time, current_record,
                                      stack_trace_buffer)
            current_record = nil # Reset state
            stack_trace_buffer = []
            # log.debug "Unmatched line ended active record: #{line}"
          else
            # Line is not a start, not stack trace, no active record. Skip.
            # log.trace "Skipping unhandled line: #{line}"
          end
        end

        # Yield any remaining record after all lines are processed
        finalize_and_yield_record(current_time, current_record,
                                  stack_trace_buffer) if current_record
      end

      private

      def resolve_time_format
        @time_format # Placeholder for potentially more complex logic
      end

      def finalize_and_yield_record(time, record, buffer)
        return unless record && time # Ensure valid data to yield
        
        unless buffer.empty?
          record['stack_trace'] = buffer.join("\n")
        end
        yield time, record
      end
    end
  end
end

To use this plugin, ensure Fluentd can load it. This usually means placing the .rb file in a directory specified in Fluentd’s --plugin command-line option or its plugin_dir system configuration, or by packaging it as a proper Ruby gem.

Configuring Fluentd

Now, let’s configure Fluentd’s in_tail input plugin to use our my_error_log parser.

Edit your Fluentd configuration file (e.g., fluent.conf or td-agent.conf):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# fluent.conf Example

<source>
  @type tail
  path /path/to/your/proprietary.log # Specify actual log file path
  tag app.proprietary_error          # Tag for routing these logs
  # pos_file is crucial for `in_tail` to remember its last read position,
  # preventing data loss or duplication on Fluentd restarts.
  pos_file /var/log/td-agent/proprietary.log.pos 

  <parse>
    @type my_error_log # Must match the name registered by your plugin

    # Regex to capture named groups from the primary log line.
    # IMPORTANT: Adjust this regex precisely for your log format.
    # Using (?<name>...) for named captures is highly recommended.
    log_start_regex /^(?<level>\w+)\s+(?<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:/
    \d{2}\.\d{3}Z)\s+\[(?<thread>[^\]]+)\]\s+(?<source>[^:]+):\s*(?<message>.*)/
    
    # Regex to identify stack trace continuation lines.
    # This example matches lines starting with 2+ spaces OR "Caused by:".
    # Adjust this to the specific indentation or patterns of your stack traces.
    stack_trace_regex /^(?:\s{2,}at|\s*Caused by:).*/

    # Time format matching the timestamp captured by 'time' group in
    # log_start_regex.
    time_format %Y-%m-%dT%H:%M:%S.%LZ

    # Optional: If you want to keep the original 'time' field from the log
    # in the parsed record, uncomment the following line.
    # keep_time_key true 
  </parse>
</source>

# For debugging purposes: output parsed logs to standard output.
# This helps verify that your parser is working as expected.
<match app.proprietary_error>
  @type stdout
</match>

# Example: Forward parsed logs to Elasticsearch for analysis and storage.
# <match app.proprietary_error>
#   @type elasticsearch
#   host localhost
#   port 9200
#   index_name my_app_errors_%Y%m%d # Daily indices
#   # Other Elasticsearch specific options like user, password, scheme.
# </match>

Key Regex Details:

  • log_start_regex:
    • ^(?<level>\w+): Matches log level (e.g., ERROR, INFO) at the line start.
    • (?<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z): Captures an ISO8601-like timestamp with milliseconds.
    • \[(?<thread>[^\]]+)\]: Captures thread name enclosed in brackets.
    • (?<source>[^:]+):: Captures class/source name ending with a colon.
    • (?<message>.*): Captures the rest of the line as the main message.
  • stack_trace_regex:
    • ^(?:\s{2,}at|\s*Caused by:).*/: Matches lines starting with at least two spaces followed by “at”, or lines starting with optional spaces followed by “Caused by:”. This non-capturing group (?:...) is common for stack trace patterns.

Remember to tailor these regular expressions meticulously to your specific proprietary log format. Online tools like Rubular are invaluable for crafting and testing Ruby regular expressions.

Testing and Debugging Your Parser

Thorough testing is critical for custom parsers:

  1. Fluentd Logs: Increase Fluentd’s own log verbosity (e.g., using the -vv command-line flag or setting log_level debug in Fluentd’s <system> configuration block). This will show plugin loading messages, configuration processing, and potential errors from your parser’s log.warn or log.error calls.
  2. @type stdout Output: As demonstrated in the configuration, initially route the output of your parser to stdout. This allows you to directly observe the structured records your parser generates from sample log lines.
  3. Unit Tests (Highly Recommended): For robust and maintainable plugins, write unit tests using a framework like RSpec. Fluentd provides test drivers and helpers to facilitate testing parser plugins in isolation. You can feed sample log strings to your parser instance and assert that the yield-ed time, record pairs are correct.
  4. Simplify and Iterate: Begin testing with the simplest log lines that match your log_start_regex. Gradually introduce complexity: logs with multi-line stack traces, logs without stack traces, edge cases like empty messages, unusual characters, or malformed lines.

Alternatives and Considerations

While a custom Ruby parser offers maximum control, consider these alternatives:

  • Fluentd’s Built-in multiline Parser: For simpler multi-line scenarios where continuation is determined solely by consistent line prefixes (e.g., all stack trace lines start with a specific number of spaces), Fluentd’s built-in @type multiline parser might be sufficient. It uses format_firstline to detect new entries and formatN for subsequent line patterns. However, it offers less flexibility in extracting diverse fields from the first line or handling more complex conditional logic compared to a custom Ruby parser.
  • fluent-plugin-concat: This filter plugin can concatenate multiple related events based on start/end markers or other conditions. It operates at the filter stage, after initial parsing, which might be less efficient for primary multi-line handling compared to performing this logic at the input/parser stage.
  • Structured Logging at Source: The most robust and often preferred long-term solution is to modify applications to output logs in a structured format like JSON directly. When applications log in JSON, stack traces can be included as a single string field (with internal newlines escaped). This often eliminates the need for complex custom parsing for new applications, reserving custom parsers for legacy systems or third-party applications where modifying log output is not feasible.

Conclusion

Developing a custom Fluentd parser in Ruby provides unparalleled control and flexibility when dealing with proprietary, multi-line log formats, especially those containing critical stack trace information. By leveraging Ruby’s powerful string manipulation and regular expression capabilities within Fluentd’s well-defined plugin framework, you can transform complex, unwieldy raw logs into structured, actionable data. This well-structured data is then primed for efficient storage, in-depth analysis, and clear visualization in your chosen logging backends (like Elasticsearch, Splunk, or others), ultimately empowering faster debugging, better system monitoring, and deeper operational insights.

Always remember to meticulously design your regular expressions and thoroughly test your custom parser with a wide variety of representative log samples to ensure its accuracy, performance, and robustness in a production environment.