Files

17 KiB

Neues Repo

https://github.com/conduktor/kafka-stack-docker-compose

https://github.com/conduktor/kafka-beginners-course

Installation im STC

  1. Download des Kafka curl "https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz

Dockerfiles Sicherung

docker save ...

Dockerfiles Restore

docker image load -i <tar>

Notizen STC

apt install docker docker-compose

  1. Start läuft auf Fehler:

ERROR: for zoo3 Cannot start service zoo3: OCI runtime create failed: container_linux.go:377: starting container process caused: process_linux.go:495: container init caused: process_linux.go:458: setting cgroup config for procHooks process caused: can't load program: function not implemented: unknown

Der NdB-Kernel unterstüzt das nicht, daher gepurged und den Standardkernel hochgefahren

docker compose -f zk-single-kafka-multiple.yml up

Ablauf

  1. lst02

    1. Kopieren der Files via cp-Skripte

    2. Auslesen und Versenden der Files per Python-Skript an die lstk1

      python3 transportv0.9.cpython-39.pyc
      
  2. lstk1

    1. den logstash starten

      /usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf
      

      Hier werden die Daten entsprechend Forderung gefiltert

    2. zookeeper starten

      /usr/share/kafka<ver>/bin/zookeeper-server-start.sh ../config/zookeeper.properties
      
    3. kafka starten

      /usr/share/kafka<ver>/bin/kafka-server-start.sh ../config/server.properties
      

      Damit Daten abgerufen werden können, muss folgender iptables-Eintrag vorgenommen werden

      iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
      

Nagflux.gcfg

[main]
    NagiosSpoolfileFolder = "/var/spool/perfdata"
    NagiosSpoolfileWorker = 1
    InfluxWorker = 1
    MaxInfluxWorker = 1
    DumpFile = "nagflux.dump"
    NagfluxSpoolfileFolder = "/var/spool/nagflux"
    FieldSeparator = "&"
    BufferSize = 10000
    FileBufferSize = 65536
    # If the performancedata does not have a certain target set with NAGFLUX:TARGET.
    # The following field will define the target for this data.
    # "all" sends the data to all Targets(every Influxdb, Elasticsearch...)
    # a certain name will direct the data to this certain target
    DefaultTarget = "logstashpipe"


[Log]
	LogFile = ""
	MinSeverity = "INFO"

[JSONFileExport "logstashpipe"]
	Enabled = true
	Path = /tmp/logstash_out
	AutomaticFileRotation = "1"

Bash-Skript für den Transport

#!/bin/bash

set -x

# configure network timeouts
folder=/tmp/logstash_out
maxtries=360	# 360 * 10s => nach 3600s = 1h = upload abbrechen, nächste Datei
timewait=10

while true; do
	files=$(find $folder -type f -name "perfdata_*" -mmin +1 | sort)
	for file in $files ; do
		try=0
		rc=0
		until [ $try -gt $maxtries ]
		do
			cat $file | netcat -w1 mlsebzlstk1 8080
			rc=$?
			[ "$rc" == "0" ] && break
			try=$((try+1))
			sleep $timewait
		done
		
		# remove file or rename for timeout
		if [ "$rc" == "0" ] ; then
			rm $file
		else
			mv $file $folder/timeout_$(date +%s)
		fi
	done
	
	if [ "$files" == "" ] ; then
		echo "nothing to do, lets wait 10s"
		sleep 10
	fi
done

Derzeitiges Pythonskript

import time, os, sys, socket, shutil
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from watchdog.events import FileCreatedEvent
from queue import Queue
from threading import Thread
from datetime import datetime

"""
    Prozessklasse
"""

class MyHandler(PatternMatchingEventHandler):
    ignore_patterns = None
    ignore_directories = False
    case_sensitive = False 
    patterns=['perfdata*']

    def on_modified(self, event):
        #print(f'event type: {event.event_type}  path : {event.src_path}')
        pass

        #self.queue.put(event.src_path)

    def on_created(self,event):
        print(f'event type: {event.event_type}  path : {event.src_path}')
        watchdog_queue = Queue()
        e = FileCreatedEvent(event.src_path)
        watchdog_queue.put(e)

    
        worker = Thread(target=process_queue, args=(watchdog_queue,))
        print("Thread gestartet: " + worker)
        worker.setDaemon(True)
        worker.start()

    def on_deleted(self,event):
        print(f'event type: {event.event_type}  path : {event.src_path}')

    def on_moved(self,event):
        print(f'event type: {event.event_type}  path : {event.src_path}')




def sendfile(client, file_):
    
    f = open(file_, "r")
    try:
        client.connect((hostname, port))
        try:
        #    f = open(file_, "r")
            data = f.read()
#            print(data)
            client.sendall(data.encode("utf-8"))
            f.close()
            os.remove(file_)
            client.close()
        except:
            client.close()
            print("Datei wurde nicht versandt: " +file_)
            os.rename(file_  + "zzzz")
            f.close()

    except socket.error as e:
        print ("error connecting socket: %s" %e)
        client.close()
        #os.rename(file_  +str(int(datetime.now().timestamp())))

def process_queue(q):
    while True:
        if not q.empty():
            event = q.get()
            print("New event %s" % event)
            modification_time = int(os.path.getmtime(event.src_path))
            print(modification_time)
            now = int(datetime.now().timestamp())
            print(now-modification_time)
            ##### hier mit while arbeiten
            while (int(datetime.now().timestamp()) - modification_time < 10):
                print("Ich warte")
                time.sleep(11) # warten von 10sec
                # nach verlassen sollte File älter als 10sec sein
            print ("Warten verlassen")

            if (int(datetime.now().timestamp())- modification_time > 10):
                print("Datei ist älter als 10sec")
                ## senden
                try:
                    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sendfile(client, event.src_path)
                except:
                    print("Datei muss gemoved werden")
                

                ## gesendet --> ja

                ## fehlerhaftes senden
                ## mv file namentlich
        else:
            return
    



if __name__ == "__main__":

    """
    init-Daten
    """
    hostname = "mlsebzlstk1"
    port = 8080
    watchdog_queue = Queue()
    patterns = ['perfdata']
    watchDir = "/tmp/logstash_out"
    
#    watchDir = "/tmp/test"

    """
    Einlesen des Verzeichnisses und Versenden, als eigene Threads
    """ 
    for file in os.listdir(watchDir):
        filename = os.path.join(watchDir, file)
        
        # Erzeugen eines FileCreate Events
        event = FileCreatedEvent(filename)
        
        # Einfügen in die watchdog-Queue
        watchdog_queue.put(event)


    """
    Starten des Threads
    """

    worker = Thread(target=process_queue, args=(watchdog_queue,))
#    worker.setDaemon(True)
    worker.start()

    #event_handler = FileWatchdog(watchdog_queue, patterns="perfdata*")

    my_event_handler = MyHandler()


    go_recursively = False
    my_observer = Observer()
    my_observer.schedule(my_event_handler, watchDir, recursive=go_recursively)
 
    my_observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        my_observer.stop()
        my_observer.join()

conf-File des logstashs auf der Empfängerseite

Hier werden auf gleich die Daten an den Kafka übergeben

Start: mittels systemctl oder aber zum Testen /usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf

input {
	tcp {
		host => "mlsebzlstk1"
		port => 8080
		codec => json
		}
    #http {
    #    port => 12000
    #    additional_codecs => { "application/json" => "json_lines" }
    #}
}

filter{
        json {
#				ecs_compatibility => disabled
                source => "message"
                target => "parsed_json"
        }

        #if [Hostname] =~ "dkr..x(skr|ssk).." or [Hostname] =~ "dpf..xivpn." or [Hostname] =~ "dha..xialg." or [Hostname] =~ "dpf..xgvpn." or [Hostname] =~ "drt..2grt.." or [Hostname] =~ "dkr..xwkr.."
        if [Hostname] =~ "dkr..x(skr|ssk)..|dpf..xivpn.|dha..xialg." #|dpf..xgvpn.|drt..2grt..|dkr..xwkr..|dls..3wlb0.|drt..2wrt..|dls..xnsg..|dls..3elb..|dpf..xeco..|dpfe.i00015"
        {
                mutate
                {
                        add_field => {
                                "ZGR" =>  "1"
                        }
                }

                if [Hostname] =~ "dkr..x(skr|ssk).."
                {
                        mutate
                        {
                                add_field => {
                                        "Dienst" => "MobZu SecuSuite"
                                }
                        }

                        if [PerformanceLabel] =~ "eth3_traffic_(in|out)"
                        {
                                mutate
                                {
                                        add_field => ["Art", "Traffik" ]
                      	        }
                        }

                        
                        else if [PerformaceLabel] =~ "(ike|child)sacount"
                        {
                                mutate
                                {
                                        add_field => ["Art", "VPN Connections"]
                                }
                        }

                        
                        else if [PerformaceLabel] =~ "cpu_usage"
                        {
                                mutate
                                {
                                        add_field => ["Art", "CPU"]
                                }
                        }
                        mkdir ~/kafka && cd ~/kafka
                        else if [PerformaceLabel] =~ "'load-5'"
                        {
                                mutatemkdir ~/kafka && cd ~/kafka
                                {
                                        add_field => ["Art", "Load5"]
                                }
                        }
                        
                        else if [PerformaceLabel] =~ "memory_usage"
                        {
                                mutate
                                {
                                        add_field => ["Art", "Memory"]
                                }
                        }
                        else
                        {
                                drop {}
                        }
                }

                else if [Hostname] =~ "dpf..xivpn.|dha..xialg."
                {
                        mutate
                        {
                                add_field => {
                                "Dienst" => "MobZu IOS"
                                }
                        }
                        
                        if [Hostname] =~ "dpf..xivpn."
                        {
                        	if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)"
                        	{
                        		mutate
                                {
                                        add_field => {"Art" => "Gateway Traffik" 
                                        "Service" => "Interface: em0"
                                        }
                      	        }
                        	}
                        	else if [PerformanceLabel] =~ "VPN Connections"
                        	{
                        		 mutate{
                                        add_field => {"Art" => "Gateway VPN Connections"}
                                }
                        	}
                        	else if [PerformanceLabel] =~ "cpu.*usage" 
                        	{
                        		 mutate{
                                        add_field => {"Art" => "Gateway CPU"}
                                }
                        	}
                        	else if [PerformanceLabel] =~ "'load5'" 
                        	{
                                mutate{
                                        add_field => {"Art" => "Gateway Load5"}
                                }                        	
                        	}
                        	else
                        	{
                        		drop { }
                        	}
                        	
                        }
                        else if [Hostname] =~ "dha..xialg."
                        {
                        	if [PerformanceLabel] =~ "_traffic_(in|out)"
                        	{
                                mutate
                                {
                                        add_field => {"Art" => "ALG Traffik" 
                                        "Service" => "Interface: em[01]"
                                        }
                      	        }
                        	}
                        	
                        	else if [PerformanceLabel] =~ "cpu.*usage"
                        	{
                                mutate{
                                        add_field => {"Art" => "ALG CPU"}
                                }
                        	
                        	}
                        	else if [PerformanceLabel] =~ "'load5'"
                        	{
                                mutate{
                                        add_field => {"Art" => "ALG Load5"}
                                }
                        	}
                        	else
                        	{
                        		drop { }
                        	}
                        }
                }
                else if [Hostname] =~ "dpf..xgvpn.|drt..2grt.."
                {		
                	mutate
			{
				add_field => {
				"Dienst" => "MobZu GenuCard"}
	               	}
	                    if [Hostname] =~ "dpf..xgvpn."
	                    {
	                    	if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)"
	                    	{
                                mutate
                                {
                                        add_field => ["Art", "Traffik" ]
                      	        }	                    	
	                    	}
	                    	else if [PerformanceLabel] =~ "VPN Connections"
	                    	{
                                mutate
                                {
                                        add_field => ["Art", "VPN Connections" ]
                      	        }	  	                    	
	                    	}
	                    	else if [PerformanceLabel] =~ "cpu.*usage"
	                    	{
                                mutate
                                {
                                        add_field => ["Art", "CPU" ]
                      	        }	  	                    	
	                    	}
	                    	else if [PerformanceLabel] =~ "load5"
	                    	{
                                mutate
                                {
                                        add_field => ["Art", "Load5" ]
                      	        }		                    	
	                    	}
	                    	#else
	                    	#{
	                    	#	drop { }
	                    	#}
	                    }
	                    else if [Hostname] =~ "drt..2grt.."
	                    {
	                    	if [PerformanceLabel] =~ "'Tu.* - ([a-zA-Z]+).*_traffic_(in|out)'"
	                    	{
                                mutate
                                {
                                        add_field => ["Art", "GRE Tunnel Use pro Nutzer" ]
                      	        }		                    		                    	
	                    	}
	                    	#else
	                    	#{
	                    	#	drop { }
	                    	#}
	                    }
                 }
#                else if  [Hostname] =~ "dkr..xwkr..|dls..3wlb0.|drt..2wrt.."
#                {
#                	mutate
#                	{
#                		add_field => {
#                                "Dienst" => "MobZu SinaWS"
#                        }
#                   }
#                 }
#                else if  [Hostname] =~ "dls..xnsg.."
#                {
#                	mutate
#                	{
#                		add_field => {
#                                "Dienst" => "MobZu NCP"
#                        }
#                   }
#                 }  
#                else if  [Hostname] =~ "dls..3elb..|dpf..xeco..|dpfe.i00015
#"
#                {
#                	mutate
#                	{
#                		add_field => {
#                                "Dienst" => "MobZu ECOS"
#                        }
#                   }
#                 }  
                
        }
        else
        {
                drop  { }
        }
}
	
output{
		stdout
		{
			codec => rubydebug
		}		
#		 kafka {

#        codec => json
#        topic_id => "DTBS"
#      }

    


	#    http {
    #    url => 'http://mlsebzlstr1:8080'
    #    http_method => post
    #    retry_non_idempotent => true
     #   format => json_batch
     #   http_compression => true
    #}
}

# vim:ts=4:sw=4