Elastic SIEM – Anbindung eines HAProxy am Logstash mit Grok

Einleitung

Ich hatte bereits versucht, meine Logs vom HAProxy im ElasticSearch einzubinden. Nativ war das nicht möglich. Also habe ich im letzten Artikel einen Logstash-Service installiert. An diesen kann ich meine Logs senden. Logstash soll diese nun aufbereiten und an ElasticSearch schicken. Dieses Parsing wird mit Grok vorgenommen, einer Art Regular Expression Notation, die zum Filtern und Werte-Extrahieren verwendet wird.

Der Artikel gehört zu meiner Serie „Bereitstellung eines Elastic SIEM„.

Vorbereitung im HAProxy meiner PFSense

Zuerst muss ich meine Log Source anschließen. Dann kann ich mir die verschiedenen Log-Strings ansehen und überlegen, welche ich wie parsen möchte. Ich verbinde mit mit meiner PFsense und navigiere zu den Settings vom HAProxy:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Ich suche nach dem Bereich Logging und gebe dort die IP und den Port meines Logstash-Services ein. Zusätzlich trage ich noch den Namen des Services ein. Vergesst dabei den save-Button am Ende der Seite nicht!

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Ich hätte gerne ausführliche Logs vom HAProxy geliefert bekommen, in denen z.B. auch die Namen der Frontends und Backends mit drin stehen. Dies muss für jedes Frontend einzeln aktiviert werden:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Hier finde ich die Option zum Aktivieren des detaillierten Loggings. Auch hier dürft ihr jede Seite mit dem save-Button am Ende bestätigen:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Zum Abschluss bestätige ich die Konfigurationsänderung mit „apply changes“

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Nun muss ich die Logs im Elastic finden. Dazu navigiere ich in Analytics/Discover, wechsle in die Anzeige für logs* und aktiviere den Tag-Filter für „_grokparsefailure“. Das sind die Events, die ElasticSearch nicht verstanden hat und deren Inhalte somit auch nicht extrahieren konnte:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Hier finde ich sofort meine neuen Logfiles wieder 🙂

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Die Vorbereitung ist damit abgeschlossen.

Analyse des ersten Message-Strings

Weiter geht es mit dem Parsing. Im letzten Bild sieht man deutlich, dass nur Basis-Properties für die Events angezeigt werden. Die meisten Eigenschaften wurden aus der „message“ nicht extrahiert – ElasticSearch kennt dieses Logmuster nicht und kann daher den Text nicht durch ein Parsing zerlegen. Hier muss ich im Logstash nachhelfen. Ich sehe mir den ersten Message-String einmal genauer an:

<142>Dec 21 16:42:38 HAPROXY haproxy[56279]: 172.19.130.129:56094 [21/Dec/2024:16:42:37.934] HTTPS-intern-OFF~ PROD-HTTP-Smarthome_ipv4/WS-SH 0/0/0/343/344 200 53194 - - ---- 8/1/0/0/0 0/0 "GET /overview.php?_=1734785789840 HTTP/1.1"

Der String ist eine Aneinanderreihung verschiedener Eigenschaftswerte. Das Muster dabei ist immer gleich. Ich benötige nun die Bedeutung dieser unterschiedlichen Werte. Das werden dann die Property-Names. Einige Werte habe ich aus dieser Seite bezogen: Introduction to HAProxy Logging

Property NameWert
?<142>
Timestamp des EventsDec 21 16:42:38
LOG-HOSTNAME (so habe ich den in der PFSense hinterlegt)HAPROXY
Prozessname und ProzessID in der PFSensehaproxy[56279]
Source IP172.19.130.129
Source Port56094
?[21/Dec/2024:16:42:37.934]
Name des FrontendsHTTPS-intern-OFF
Name des Backends / ZielsystemPROD-HTTP-Smarthome_ipv4/WS-SH
Timers*0/0/0/343/344
HTTP Statuscode200
Bytes count53194
Term Code—-
Connection count8/1/0/0/0
Queue length0/0
Method & Path & Query„GET /overview.php?_=1734785789840 HTTP/1.1“

Grok ist nun eine Art Beschreibung, welcher Bestandteil in dem String in welche Property geschrieben werden soll. Dabei kennt Grok verschiedene Musterbeschreibungen, die das Parsen vereinfachen sollen. Den Parsing- und Filterstring kann man so einfach zusammensetzen. Nur wenn die empfangene Message dem Filter entspricht, wird sie auch in ihre Bestandteile zerlegt. Auf diese Weise können auch mehrere unterschiedliche Message-Strings richtig erkannt und verarbeitet werden.

Parsing mit Grok

Aufbau des Grok Patterns für mein HAProxy Message-String

Für die Fehlerminimierung gibt es im Kibana einen Grok Debugger. Diesen findet man unter Management/Dev-Tools. Hier kann man einen Message-String eintragen und den dafür erforderlichen Grok Pattern Stück für Stück zusammensetzen:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Der Grok Pattern muss von links nach rechts den Message-String verarbeiten. Ich beginne mit der Source IP des Clients. Hier benötige ich den Pattern für eine IP-Adresse und den Namen der Property im ElasticSearch: %{IP:source.ip} Mit dem Prozentzeichen und den geschweiften Klammern wird die Mustersuche eingeleitet. In den Klammern steht zuerst der Mustername IP und durch einen Doppelpunkt abgetrennt der Name der Ziel-Property.

Dieses Pattern trage ich in dem zweiten Feld ein. Mit „simulate“ kann ich das Verhalten des Filters und des Parsers prüfen. Das funktioniert schon mal. Die erste IP(v4) im Message-String wird als source.ip extrahiert. Der Text davor wird ignoriert:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Weiter geht es mit dem Source-Port. Der steht hinter der ersten IPv4. Dazwischen steht aber noch ein Doppelpunkt. Ich muss also den Doppelpunkt als Textzeichen und ein weiteres Pattern hinzufügen. Da der Port immer eine Zahl ist, kann ich hier %{NUMBER:source.port} verwenden. Im Ergebnis schaut das dann so aus:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Der Doppelpunkt wird also nicht mit extrahiert, muss aber zur Mustererkennung mit im Grok Pattern enthalten sein! Weiter geht es mit dem Datums-String in den eckigen Klammern. Wenn ich den brauchen würde, dann könnte ich folgende Patterns verwenden:

\[%{NUMBER:logtime.day}/%{MONTH:logtime.month}/%{YEAR:logtime.year}:%{TIME:logtime.time}]

Die öffnende Klammer hat in Regular Expression eine besondere Bedeutung. Wenn ich sie aber als Texterkennungszeichen notieren will, dann muss ich sie mit einem Backslash davor „escapen“. Die anderen Patterns kann man im Bild gut nachvollziehen. Beachtet bitte die Trennzeichen zwischen den Patterns:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Ich benötige das Datum aber eigentlich nicht, da ElasticSearch das Logdatum ja eh schon kennt. Daher würde ich lieber ein Pattern notieren, um den Teil zwischen den eckigen Klammern zu überspringen. Das sieht dann so aus: \[.*] Ohne den Term mit %{} wird hier keine Eigenschaft entnommen. Die öffnende, eckige Klammer muss ebenfalls escaped werden. Und zwischen beiden Klammern nehme ich den gesamten Text mit .* auf bis zur schließenden, eckigen Klammer:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Die nächste Property ist der Name des Frontends. Dieser ist ein Text. Und Text extrahiert man mit @{DATA:frontend.name}. Wenn man das so eingibt, dann wird aber nur ein leerer Text in der Property ausgegeben. Lasst euch davon nicht beirren: DATA braucht einen Abschluss! Und den habe ich nicht mit angegeben. Beachtet bitte auch das zusätzliche Leerzeichen vor dem Prozentzeichen. Das ist im Message-String auch enthalten!

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Nehme ich das Tildezeichen danach mit dazu, dann wird der Text damit abgeschlossen und kann extrahiert werden:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Und so geht es immer weiter, bis ich alle Properties entnommen habe. Die Daten gruppiere ich mir durch die Punktnotation im Property-Name sinnvoll. So stehen sie später auch schön zusammen. Im Bild zeigen die Pfeile die Trennzeichen zwischen den Properties an:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Das ist also mein erster Grok Pattern:

%{IP:source.ip}:%{NUMBER:source.port} [.*] %{DATA:frontend.name}~ %{DATA:backend.name}/%{DATA:backend.servername} %{DATA:connection.timers} %{NUMBER:Http.Status} %{NUMBER:connection.bytes} – – %{DATA:connection.TermCode} %{DATA:connection.count} %{DATA:connection.queuelength} „%{DATA:Http.method} %{DATA:Http.query} %{DATA:Http.version}“

Und das ist das vorläufige Ergebnis im Grok Developer:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Einbau des Grok Patterns im Logstash

Nun muss ich den neuen Pattern in meinem Logstash einbauen. Für die eingehende Verbindung auf UDP:10001 habe ich dort eine Konfigurationsdatei angelegt und deren Filter leer gelassen. Diese Datei muss ich nun bearbeiten. Also verbinde ich mit via SSH mit meinem ELK-Server. Die Datei habe ich zwischenzeitlich umbenannt, damit ich deren Verwendung besser zuordnen kann:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Ich editiere die Datei mit nano. Das ist der aktuelle Inhalt. Es werden alle Messages via UDP:10001 angenommen und als Message-Strings an ElasticSearch weitergeleitet:

input {
  udp {
    host => "192.168.100.32"
    port => 10001
  }
}

filter {}

output {
  elasticsearch {
    hosts => ["https://ws-siem.ws.its:9200"]
    api_key => "####################:################-#####"
    data_stream => true
    ssl => true
    cacert => "/etc/ssl/cert/siem9200.crt"
  }
}

In die Filtersektion gehört nun der Grok Pattern mit rein. Hier nutze ich ein match, um den Filter und das dazugehörige Parsing anzugeben. Sollte der Filter passen, dann werden mit add_field noch 2 weitere Properties erzeugt. Und mit add_tag tagge ich jedes Event, damit ich sie später leichter wiederfinden und filtern kann. Vereinfacht schaut das so aus:

input {
  udp {
    host => "192.168.100.32"
    port => 10001
  }
}

filter {
  grok {
    match => { "message" => '<Grok Pattern>' }
  }
}

output {
  elasticsearch {
    hosts => ["https://ws-siem.ws.its:9200"]
    api_key => "####################:################-#####"
    data_stream => true
    ssl => true
    cacert => "/etc/ssl/cert/siem9200.crt"
  }
}

Und ausgefüllt ergibt sich dieser Text:

input {
  udp {
    host => "192.168.100.32"
    port => 10001
  }
}

filter {
  grok {
    match => { "message" => '%{IP:source.ip}:%{NUMBER:source.port} \[.*] %{DATA:frontend.name}~ %{DATA:backend.name}/%{DATA:backend.servername} %{DATA:connection.timers} %{NUMBER:Http.Status} %{NUMBER:connection.bytes} - - %{DATA:connection.TermCode} %{DATA:connection.count} %{DATA:connection.queuelength} "%{DATA:Http.method} %{DATA:Http.query} %{DATA:Http.version}"' }
    add_field => [ "received_at", "%{@timestamp}" ]
    add_field => [ "received_from", "%{host}" ]
    add_tag => [ "haproxy” ]
  }
}

output {
  elasticsearch {
    hosts => ["https://ws-siem.ws.its:9200"]
    api_key => "####################:################-#####"
    data_stream => true
    ssl => true
    cacert => "/etc/ssl/cert/siem9200.crt"
  }
}

Ich speichere die Datei und starten den Service logstash neu. Das dauert einen Moment, wie das Log zeigt:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Nun suche ich wieder nach Events im Kibana. Leider werden die Logs nicht korrekt geparst!

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Zum einen gibt es hier noch andere Message-Strings, für die mein Patternfilter nicht matcht. Und zum anderen scheint das Tildezeichen meines Frontends nicht immer mit dabei zu sein – wahrscheinlich nur, wenn der String zu lang wird. Also matcht der Filter auch hier nicht. Daher passe ich meinen Filterstring erneut an und entferne das Tildezeichen nach %{DATA:frontend.name}:

input {
  udp {
    host => "192.168.100.32"
    port => 10001
  }
}

filter {
  grok {
    match => { "message" => '%{IP:source.ip}:%{NUMBER:source.port} \[.*] %{DATA:frontend.name} %{DATA:backend.name}/%{DATA:backend.servername} %{DATA:connection.timers} %{NUMBER:Http.Status} %{NUMBER:connection.bytes} - - %{DATA:connection.TermCode} %{DATA:connection.count} %{DATA:connection.queuelength} "%{DATA:Http.method} %{DATA:Http.query} %{DATA:Http.version}"' }
    add_field => [ "received_at", "%{@timestamp}" ]
    add_field => [ "received_from", "%{host}" ]
    add_tag => [ "haproxy” ]
  }
}

output {
  elasticsearch {
    hosts => ["https://ws-siem.ws.its:9200"]
    api_key => "####################:################-#####"
    data_stream => true
    ssl => true
    cacert => "/etc/ssl/cert/siem9200.crt"
  }
}

Dann starte ich meinen Logstash durch und warte auf neue Logs im Kibana. Jetzt schaut es gut aus:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Aufbau weiterer Grok-Patterns

Nun extrahiere ich den 2. Message-String-Typ und baue dafür im Grok Developer einen Pattern. Hier nutze ich für den restlichen Text ein %{GREEDYDATA:connection.message}

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Dann muss der neue Pattern in das Logstash-File eingetragen werden. also gehts zurück zur SSH-Session in den nano-Editor. Wie man erkennen kann, dürfen mehrere Grok-Filter untereinander stehen:

input {
  udp {
    host => "192.168.100.32"
    port => 10001
  }
}

filter {
  grok {
    match => { "message" => '%{IP:source.ip}:%{NUMBER:source.port} \[.*] %{DATA:frontend.name} %{DATA:backend.name}/%{DATA:backend.servername} %{DATA:connection.timers} %{NUMBER:Http.Status} %{NUMBER:connection.bytes} - - %{DATA:connection.TermCode} %{DATA:connection.count} %{DATA:connection.queuelength} "%{DATA:Http.method} %{DATA:Http.query} %{DATA:Http.version}"' }
    add_field => [ "received_at", "%{@timestamp}" ]
    add_field => [ "received_from", "%{host}" ]
    add_tag => [ "haproxy” ]
  }
  grok {
    match => { "message" => '%{IP:source.ip}:%{NUMBER:source.port} \[.*] %{DATA:frontend.name}/%{IP:frontend.ip}:%{NUMBER:frontend.port}: %{GREEDYDATA:connection.message}' }
    add_tag => [ "haproxy” ]
  }
}

output {
  elasticsearch {
    hosts => ["https://ws-siem.ws.its:9200"]
    api_key => "####################:################-#####"
    data_stream => true
    ssl => true
    cacert => "/etc/ssl/cert/siem9200.crt"
  }
}

Ich starte Logstash neu. Dieses mal wirft das Log aber einen Fehler aus:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Ich kontrolliere also noch einmal meine Konfigurationsdatei. Oft passiert so etwas wohl, wenn man die Strings in eine SSH-Session aus Browsern oder aus notepad++ über die Zwischenablage kopiert. Dann werden ggf. Sonderzeichen oder versteckte Steuerzeichen mit kopiert. Ich setze den String also im nano noch einmal direkt zusammen und teste erneut. Jetzt werden die Properties erkannt. Aber zusätzlich finde ich das Tag _grokparsefailure…

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Nach einer Recherche im Netz habe ich herausgefunden, dass meine Vorgehensweise mit den multiplen Grok-Patterns das Problem verursacht. Offenbar werden so alle Filter in einer Sequenz nacheinander angewendet. Ich muss eine andere Logik in meiner config-File finden. Nach einigen Versuchen habe ich diese hier gefunden. Die unterschiedlichen Grok-Patterns werden kommagetrennt in einem Array (eckige Klammer) der Message zugewiesen. Das zusätzliche break_on_match weist Logstash an, beim ersten Treffer alle weiteren Patterns zu ignorieren:

input {
  udp {
    host => "192.168.100.32"
    port => 10001
  }
}

filter {
  grok {
    match => {
      "message" => [
        '%{IP:source.ip}:%{NUMBER:source.port} \[.*] %{DATA:frontend.name} %{DATA:backend.name}/%{DATA:backend.servername} %{DATA:timers} %{NUMBER:Http.Status} %{NUMBER:connection.bytes} - - %{DATA:connection.TermCode} %{DATA:connection.count} %{DATA:connection.queuelength} "%{DATA:Http.method} %{DATA:Http.query} %{DATA:Http.version}"',
        '%{IP:source.ip}:%{NUMBER:source.port} \[.*] %{DATA:frontend.name}/%{IP:frontend.ip}:%{NUMBER:frontend.port}: %{GREEDYDATA:connection.message}'
      ]
    }
    add_tag => [ "haproxy" ]
    break_on_match => true
  }
}

output {
  elasticsearch {
    hosts => ["https://ws-siem.ws.its:9200"]
    api_key => "####################:################-#####"
    data_stream => true
    ssl => true
    cacert => "/etc/ssl/cert/siem9200.crt"
  }
}

Damit sind die grok-Fehler verschwunden 🙂

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Uns so suche ich weiter nach Events, bei denen das Parsing noch nicht passt und erweitere meinen Grok-Pattern:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

So komme ich nun zu dieser finalen Logstash-config:

input {
  udp {
    host => "192.168.100.32"
    port => 10001
  }
}

filter {
  grok {
    match => {
      "message" => [
        '%{IP:source.ip}:%{NUMBER:source.port} \[.*] %{DATA:frontend.name} %{DATA:backend.name}/%{DATA:backend.servername} %{DATA:timers} %{NUMBER:Http.Status} %{NUMBER:connection.bytes} - - %{DATA:connection.TermCode} %{DATA:connection.count} %{DATA:connection.queuelength} "%{DATA:Http.method} %{DATA:Http.query} %{DATA:Http.version}"',
        '%{IP:source.ip}:%{NUMBER:source.port} \[.*] %{DATA:frontend.name}/%{IP:frontend.ip}:%{NUMBER:frontend.port}: %{GREEDYDATA:connection.message}',
        '%{IP:source.ip}:%{NUMBER:source.port} \[.*] %{DATA:frontend.name} %{DATA:backend.name}/%{DATA:backend.servername} %{DATA:connection.timers} %{NUMBER:connection.bytes} %{DATA:connection.terminationstate} %{DATA:connection.count} %{GREEDYDATA:connection.queuelength}'
      ]
    }
    add_tag => [ "haproxy" ]
    break_on_match => true
  }
}

output {
  elasticsearch {
    hosts => ["https://ws-siem.ws.its:9200"]
    api_key => "####################:################-#####"
    data_stream => true
    ssl => true
    cacert => "/etc/ssl/cert/siem9200.crt"
  }
}

Aufbau einer Search

Final erstelle ich nun noch eine Search, mit der ich schnell auf die Daten zugreifen kann und die relevanten Properties tabellarisch gleich mit angezeigt werden:

Elastic SIEM Anbindung eines HAProxy am Logstash mit Grok

Zusammenfassung

Das war wieder ein komplexeres Thema. bei Grok gibt es vieles zu beachten und Versuche dauern durch den Neustart vom Logstash etwas. Aber insgesamt bin ich nun froh, dass ich 2 weitere Ziele erreicht habe:

  • Ich habe meine detaillierten HAProxy-Logfiles endlich im ELK
  • Mit dem Logstash und Grok kann ich nun auch Events selber parsen

Also kann es mit dem nächsten Thema weiter gehen 🙂 Weitere Artikel findet ihr in meiner Serie „Bereitstellung eines Elastic SIEM„.

Stay tuned

Kommentar hinterlassen