# Neues Repo https://github.com/conduktor/kafka-stack-docker-compose https://github.com/conduktor/kafka-beginners-course ## Installation im STC 1. Download des Kafka `curl "https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz` # Dockerfiles Sicherung ``` docker save ... ``` # Dockerfiles Restore ``` docker image load -i ``` # Notizen STC apt install docker docker-compose 1. Start läuft auf Fehler: ERROR: for zoo3 Cannot start service zoo3: OCI runtime create failed: container_linux.go:377: starting container process caused: process_linux.go:495: container init caused: process_linux.go:458: **setting cgroup config for procHooks process caused: can't load program:** function not implemented: unknown Der NdB-Kernel unterstüzt das nicht, daher gepurged und den Standardkernel hochgefahren ``` docker compose -f zk-single-kafka-multiple.yml up ``` ## Ablauf 1. lst02 1. Kopieren der Files via cp-Skripte 2. Auslesen und Versenden der Files per Python-Skript an die lstk1 ``` python3 transportv0.9.cpython-39.pyc ``` 2. lstk1 1. den logstash starten ``` /usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf ``` Hier werden die Daten entsprechend Forderung gefiltert 2. zookeeper starten ``` /usr/share/kafka/bin/zookeeper-server-start.sh ../config/zookeeper.properties ``` 3. kafka starten ``` /usr/share/kafka/bin/kafka-server-start.sh ../config/server.properties ``` Damit Daten abgerufen werden können, muss folgender iptables-Eintrag vorgenommen werden ``` iptables -I INPUT -p tcp --dport 9092 -j ACCEPT ``` ## Nagflux.gcfg ``` [main] NagiosSpoolfileFolder = "/var/spool/perfdata" NagiosSpoolfileWorker = 1 InfluxWorker = 1 MaxInfluxWorker = 1 DumpFile = "nagflux.dump" NagfluxSpoolfileFolder = "/var/spool/nagflux" FieldSeparator = "&" BufferSize = 10000 FileBufferSize = 65536 # If the performancedata does not have a certain target set with NAGFLUX:TARGET. # The following field will define the target for this data. # "all" sends the data to all Targets(every Influxdb, Elasticsearch...) # a certain name will direct the data to this certain target DefaultTarget = "logstashpipe" [Log] LogFile = "" MinSeverity = "INFO" [JSONFileExport "logstashpipe"] Enabled = true Path = /tmp/logstash_out AutomaticFileRotation = "1" ``` ## Bash-Skript für den Transport ``` #!/bin/bash set -x # configure network timeouts folder=/tmp/logstash_out maxtries=360 # 360 * 10s => nach 3600s = 1h = upload abbrechen, nächste Datei timewait=10 while true; do files=$(find $folder -type f -name "perfdata_*" -mmin +1 | sort) for file in $files ; do try=0 rc=0 until [ $try -gt $maxtries ] do cat $file | netcat -w1 mlsebzlstk1 8080 rc=$? [ "$rc" == "0" ] && break try=$((try+1)) sleep $timewait done # remove file or rename for timeout if [ "$rc" == "0" ] ; then rm $file else mv $file $folder/timeout_$(date +%s) fi done if [ "$files" == "" ] ; then echo "nothing to do, lets wait 10s" sleep 10 fi done ``` ## Derzeitiges Pythonskript ``` import time, os, sys, socket, shutil from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler from watchdog.events import FileCreatedEvent from queue import Queue from threading import Thread from datetime import datetime """ Prozessklasse """ class MyHandler(PatternMatchingEventHandler): ignore_patterns = None ignore_directories = False case_sensitive = False patterns=['perfdata*'] def on_modified(self, event): #print(f'event type: {event.event_type} path : {event.src_path}') pass #self.queue.put(event.src_path) def on_created(self,event): print(f'event type: {event.event_type} path : {event.src_path}') watchdog_queue = Queue() e = FileCreatedEvent(event.src_path) watchdog_queue.put(e) worker = Thread(target=process_queue, args=(watchdog_queue,)) print("Thread gestartet: " + worker) worker.setDaemon(True) worker.start() def on_deleted(self,event): print(f'event type: {event.event_type} path : {event.src_path}') def on_moved(self,event): print(f'event type: {event.event_type} path : {event.src_path}') def sendfile(client, file_): f = open(file_, "r") try: client.connect((hostname, port)) try: # f = open(file_, "r") data = f.read() # print(data) client.sendall(data.encode("utf-8")) f.close() os.remove(file_) client.close() except: client.close() print("Datei wurde nicht versandt: " +file_) os.rename(file_ + "zzzz") f.close() except socket.error as e: print ("error connecting socket: %s" %e) client.close() #os.rename(file_ +str(int(datetime.now().timestamp()))) def process_queue(q): while True: if not q.empty(): event = q.get() print("New event %s" % event) modification_time = int(os.path.getmtime(event.src_path)) print(modification_time) now = int(datetime.now().timestamp()) print(now-modification_time) ##### hier mit while arbeiten while (int(datetime.now().timestamp()) - modification_time < 10): print("Ich warte") time.sleep(11) # warten von 10sec # nach verlassen sollte File älter als 10sec sein print ("Warten verlassen") if (int(datetime.now().timestamp())- modification_time > 10): print("Datei ist älter als 10sec") ## senden try: client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sendfile(client, event.src_path) except: print("Datei muss gemoved werden") ## gesendet --> ja ## fehlerhaftes senden ## mv file namentlich else: return if __name__ == "__main__": """ init-Daten """ hostname = "mlsebzlstk1" port = 8080 watchdog_queue = Queue() patterns = ['perfdata'] watchDir = "/tmp/logstash_out" # watchDir = "/tmp/test" """ Einlesen des Verzeichnisses und Versenden, als eigene Threads """ for file in os.listdir(watchDir): filename = os.path.join(watchDir, file) # Erzeugen eines FileCreate Events event = FileCreatedEvent(filename) # Einfügen in die watchdog-Queue watchdog_queue.put(event) """ Starten des Threads """ worker = Thread(target=process_queue, args=(watchdog_queue,)) # worker.setDaemon(True) worker.start() #event_handler = FileWatchdog(watchdog_queue, patterns="perfdata*") my_event_handler = MyHandler() go_recursively = False my_observer = Observer() my_observer.schedule(my_event_handler, watchDir, recursive=go_recursively) my_observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: my_observer.stop() my_observer.join() ``` ## conf-File des logstashs auf der Empfängerseite Hier werden auf gleich die Daten an den Kafka übergeben Start: mittels systemctl oder aber zum Testen /usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf ``` input { tcp { host => "mlsebzlstk1" port => 8080 codec => json } #http { # port => 12000 # additional_codecs => { "application/json" => "json_lines" } #} } filter{ json { # ecs_compatibility => disabled source => "message" target => "parsed_json" } #if [Hostname] =~ "dkr..x(skr|ssk).." or [Hostname] =~ "dpf..xivpn." or [Hostname] =~ "dha..xialg." or [Hostname] =~ "dpf..xgvpn." or [Hostname] =~ "drt..2grt.." or [Hostname] =~ "dkr..xwkr.." if [Hostname] =~ "dkr..x(skr|ssk)..|dpf..xivpn.|dha..xialg." #|dpf..xgvpn.|drt..2grt..|dkr..xwkr..|dls..3wlb0.|drt..2wrt..|dls..xnsg..|dls..3elb..|dpf..xeco..|dpfe.i00015" { mutate { add_field => { "ZGR" => "1" } } if [Hostname] =~ "dkr..x(skr|ssk).." { mutate { add_field => { "Dienst" => "MobZu SecuSuite" } } if [PerformanceLabel] =~ "eth3_traffic_(in|out)" { mutate { add_field => ["Art", "Traffik" ] } } else if [PerformaceLabel] =~ "(ike|child)sacount" { mutate { add_field => ["Art", "VPN Connections"] } } else if [PerformaceLabel] =~ "cpu_usage" { mutate { add_field => ["Art", "CPU"] } } mkdir ~/kafka && cd ~/kafka else if [PerformaceLabel] =~ "'load-5'" { mutatemkdir ~/kafka && cd ~/kafka { add_field => ["Art", "Load5"] } } else if [PerformaceLabel] =~ "memory_usage" { mutate { add_field => ["Art", "Memory"] } } else { drop {} } } else if [Hostname] =~ "dpf..xivpn.|dha..xialg." { mutate { add_field => { "Dienst" => "MobZu IOS" } } if [Hostname] =~ "dpf..xivpn." { if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)" { mutate { add_field => {"Art" => "Gateway Traffik" "Service" => "Interface: em0" } } } else if [PerformanceLabel] =~ "VPN Connections" { mutate{ add_field => {"Art" => "Gateway VPN Connections"} } } else if [PerformanceLabel] =~ "cpu.*usage" { mutate{ add_field => {"Art" => "Gateway CPU"} } } else if [PerformanceLabel] =~ "'load5'" { mutate{ add_field => {"Art" => "Gateway Load5"} } } else { drop { } } } else if [Hostname] =~ "dha..xialg." { if [PerformanceLabel] =~ "_traffic_(in|out)" { mutate { add_field => {"Art" => "ALG Traffik" "Service" => "Interface: em[01]" } } } else if [PerformanceLabel] =~ "cpu.*usage" { mutate{ add_field => {"Art" => "ALG CPU"} } } else if [PerformanceLabel] =~ "'load5'" { mutate{ add_field => {"Art" => "ALG Load5"} } } else { drop { } } } } else if [Hostname] =~ "dpf..xgvpn.|drt..2grt.." { mutate { add_field => { "Dienst" => "MobZu GenuCard"} } if [Hostname] =~ "dpf..xgvpn." { if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)" { mutate { add_field => ["Art", "Traffik" ] } } else if [PerformanceLabel] =~ "VPN Connections" { mutate { add_field => ["Art", "VPN Connections" ] } } else if [PerformanceLabel] =~ "cpu.*usage" { mutate { add_field => ["Art", "CPU" ] } } else if [PerformanceLabel] =~ "load5" { mutate { add_field => ["Art", "Load5" ] } } #else #{ # drop { } #} } else if [Hostname] =~ "drt..2grt.." { if [PerformanceLabel] =~ "'Tu.* - ([a-zA-Z]+).*_traffic_(in|out)'" { mutate { add_field => ["Art", "GRE Tunnel Use pro Nutzer" ] } } #else #{ # drop { } #} } } # else if [Hostname] =~ "dkr..xwkr..|dls..3wlb0.|drt..2wrt.." # { # mutate # { # add_field => { # "Dienst" => "MobZu SinaWS" # } # } # } # else if [Hostname] =~ "dls..xnsg.." # { # mutate # { # add_field => { # "Dienst" => "MobZu NCP" # } # } # } # else if [Hostname] =~ "dls..3elb..|dpf..xeco..|dpfe.i00015 #" # { # mutate # { # add_field => { # "Dienst" => "MobZu ECOS" # } # } # } } else { drop { } } } output{ stdout { codec => rubydebug } # kafka { # codec => json # topic_id => "DTBS" # } # http { # url => 'http://mlsebzlstr1:8080' # http_method => post # retry_non_idempotent => true # format => json_batch # http_compression => true #} } # vim:ts=4:sw=4 ```