Files

596 lines
17 KiB
Markdown

# Neues Repo
https://github.com/conduktor/kafka-stack-docker-compose
https://github.com/conduktor/kafka-beginners-course
## Installation im STC
1. Download des Kafka `curl "https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz`
# Dockerfiles Sicherung
```
docker save ...
```
# Dockerfiles Restore
```
docker image load -i <tar>
```
# Notizen STC
apt install docker docker-compose
1. Start läuft auf Fehler:
ERROR: for zoo3 Cannot start service zoo3: OCI runtime create failed: container_linux.go:377: starting container process caused: process_linux.go:495: container init caused: process_linux.go:458: **setting cgroup config for procHooks process caused: can't load program:** function not implemented: unknown
Der NdB-Kernel unterstüzt das nicht, daher gepurged und den Standardkernel hochgefahren
```
docker compose -f zk-single-kafka-multiple.yml up
```
## Ablauf
1. lst02
1. Kopieren der Files via cp-Skripte
2. Auslesen und Versenden der Files per Python-Skript an die lstk1
```
python3 transportv0.9.cpython-39.pyc
```
2. lstk1
1. den logstash starten
```
/usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf
```
Hier werden die Daten entsprechend Forderung gefiltert
2. zookeeper starten
```
/usr/share/kafka<ver>/bin/zookeeper-server-start.sh ../config/zookeeper.properties
```
3. kafka starten
```
/usr/share/kafka<ver>/bin/kafka-server-start.sh ../config/server.properties
```
Damit Daten abgerufen werden können, muss folgender iptables-Eintrag vorgenommen werden
```
iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
```
## Nagflux.gcfg
```
[main]
NagiosSpoolfileFolder = "/var/spool/perfdata"
NagiosSpoolfileWorker = 1
InfluxWorker = 1
MaxInfluxWorker = 1
DumpFile = "nagflux.dump"
NagfluxSpoolfileFolder = "/var/spool/nagflux"
FieldSeparator = "&"
BufferSize = 10000
FileBufferSize = 65536
# If the performancedata does not have a certain target set with NAGFLUX:TARGET.
# The following field will define the target for this data.
# "all" sends the data to all Targets(every Influxdb, Elasticsearch...)
# a certain name will direct the data to this certain target
DefaultTarget = "logstashpipe"
[Log]
LogFile = ""
MinSeverity = "INFO"
[JSONFileExport "logstashpipe"]
Enabled = true
Path = /tmp/logstash_out
AutomaticFileRotation = "1"
```
## Bash-Skript für den Transport
```
#!/bin/bash
set -x
# configure network timeouts
folder=/tmp/logstash_out
maxtries=360 # 360 * 10s => nach 3600s = 1h = upload abbrechen, nächste Datei
timewait=10
while true; do
files=$(find $folder -type f -name "perfdata_*" -mmin +1 | sort)
for file in $files ; do
try=0
rc=0
until [ $try -gt $maxtries ]
do
cat $file | netcat -w1 mlsebzlstk1 8080
rc=$?
[ "$rc" == "0" ] && break
try=$((try+1))
sleep $timewait
done
# remove file or rename for timeout
if [ "$rc" == "0" ] ; then
rm $file
else
mv $file $folder/timeout_$(date +%s)
fi
done
if [ "$files" == "" ] ; then
echo "nothing to do, lets wait 10s"
sleep 10
fi
done
```
## Derzeitiges Pythonskript
```
import time, os, sys, socket, shutil
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from watchdog.events import FileCreatedEvent
from queue import Queue
from threading import Thread
from datetime import datetime
"""
Prozessklasse
"""
class MyHandler(PatternMatchingEventHandler):
ignore_patterns = None
ignore_directories = False
case_sensitive = False
patterns=['perfdata*']
def on_modified(self, event):
#print(f'event type: {event.event_type} path : {event.src_path}')
pass
#self.queue.put(event.src_path)
def on_created(self,event):
print(f'event type: {event.event_type} path : {event.src_path}')
watchdog_queue = Queue()
e = FileCreatedEvent(event.src_path)
watchdog_queue.put(e)
worker = Thread(target=process_queue, args=(watchdog_queue,))
print("Thread gestartet: " + worker)
worker.setDaemon(True)
worker.start()
def on_deleted(self,event):
print(f'event type: {event.event_type} path : {event.src_path}')
def on_moved(self,event):
print(f'event type: {event.event_type} path : {event.src_path}')
def sendfile(client, file_):
f = open(file_, "r")
try:
client.connect((hostname, port))
try:
# f = open(file_, "r")
data = f.read()
# print(data)
client.sendall(data.encode("utf-8"))
f.close()
os.remove(file_)
client.close()
except:
client.close()
print("Datei wurde nicht versandt: " +file_)
os.rename(file_ + "zzzz")
f.close()
except socket.error as e:
print ("error connecting socket: %s" %e)
client.close()
#os.rename(file_ +str(int(datetime.now().timestamp())))
def process_queue(q):
while True:
if not q.empty():
event = q.get()
print("New event %s" % event)
modification_time = int(os.path.getmtime(event.src_path))
print(modification_time)
now = int(datetime.now().timestamp())
print(now-modification_time)
##### hier mit while arbeiten
while (int(datetime.now().timestamp()) - modification_time < 10):
print("Ich warte")
time.sleep(11) # warten von 10sec
# nach verlassen sollte File älter als 10sec sein
print ("Warten verlassen")
if (int(datetime.now().timestamp())- modification_time > 10):
print("Datei ist älter als 10sec")
## senden
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sendfile(client, event.src_path)
except:
print("Datei muss gemoved werden")
## gesendet --> ja
## fehlerhaftes senden
## mv file namentlich
else:
return
if __name__ == "__main__":
"""
init-Daten
"""
hostname = "mlsebzlstk1"
port = 8080
watchdog_queue = Queue()
patterns = ['perfdata']
watchDir = "/tmp/logstash_out"
# watchDir = "/tmp/test"
"""
Einlesen des Verzeichnisses und Versenden, als eigene Threads
"""
for file in os.listdir(watchDir):
filename = os.path.join(watchDir, file)
# Erzeugen eines FileCreate Events
event = FileCreatedEvent(filename)
# Einfügen in die watchdog-Queue
watchdog_queue.put(event)
"""
Starten des Threads
"""
worker = Thread(target=process_queue, args=(watchdog_queue,))
# worker.setDaemon(True)
worker.start()
#event_handler = FileWatchdog(watchdog_queue, patterns="perfdata*")
my_event_handler = MyHandler()
go_recursively = False
my_observer = Observer()
my_observer.schedule(my_event_handler, watchDir, recursive=go_recursively)
my_observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
my_observer.stop()
my_observer.join()
```
## conf-File des logstashs auf der Empfängerseite
Hier werden auf gleich die Daten an den Kafka übergeben
Start: mittels systemctl oder aber zum Testen /usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf
```
input {
tcp {
host => "mlsebzlstk1"
port => 8080
codec => json
}
#http {
# port => 12000
# additional_codecs => { "application/json" => "json_lines" }
#}
}
filter{
json {
# ecs_compatibility => disabled
source => "message"
target => "parsed_json"
}
#if [Hostname] =~ "dkr..x(skr|ssk).." or [Hostname] =~ "dpf..xivpn." or [Hostname] =~ "dha..xialg." or [Hostname] =~ "dpf..xgvpn." or [Hostname] =~ "drt..2grt.." or [Hostname] =~ "dkr..xwkr.."
if [Hostname] =~ "dkr..x(skr|ssk)..|dpf..xivpn.|dha..xialg." #|dpf..xgvpn.|drt..2grt..|dkr..xwkr..|dls..3wlb0.|drt..2wrt..|dls..xnsg..|dls..3elb..|dpf..xeco..|dpfe.i00015"
{
mutate
{
add_field => {
"ZGR" => "1"
}
}
if [Hostname] =~ "dkr..x(skr|ssk).."
{
mutate
{
add_field => {
"Dienst" => "MobZu SecuSuite"
}
}
if [PerformanceLabel] =~ "eth3_traffic_(in|out)"
{
mutate
{
add_field => ["Art", "Traffik" ]
}
}
else if [PerformaceLabel] =~ "(ike|child)sacount"
{
mutate
{
add_field => ["Art", "VPN Connections"]
}
}
else if [PerformaceLabel] =~ "cpu_usage"
{
mutate
{
add_field => ["Art", "CPU"]
}
}
mkdir ~/kafka && cd ~/kafka
else if [PerformaceLabel] =~ "'load-5'"
{
mutatemkdir ~/kafka && cd ~/kafka
{
add_field => ["Art", "Load5"]
}
}
else if [PerformaceLabel] =~ "memory_usage"
{
mutate
{
add_field => ["Art", "Memory"]
}
}
else
{
drop {}
}
}
else if [Hostname] =~ "dpf..xivpn.|dha..xialg."
{
mutate
{
add_field => {
"Dienst" => "MobZu IOS"
}
}
if [Hostname] =~ "dpf..xivpn."
{
if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)"
{
mutate
{
add_field => {"Art" => "Gateway Traffik"
"Service" => "Interface: em0"
}
}
}
else if [PerformanceLabel] =~ "VPN Connections"
{
mutate{
add_field => {"Art" => "Gateway VPN Connections"}
}
}
else if [PerformanceLabel] =~ "cpu.*usage"
{
mutate{
add_field => {"Art" => "Gateway CPU"}
}
}
else if [PerformanceLabel] =~ "'load5'"
{
mutate{
add_field => {"Art" => "Gateway Load5"}
}
}
else
{
drop { }
}
}
else if [Hostname] =~ "dha..xialg."
{
if [PerformanceLabel] =~ "_traffic_(in|out)"
{
mutate
{
add_field => {"Art" => "ALG Traffik"
"Service" => "Interface: em[01]"
}
}
}
else if [PerformanceLabel] =~ "cpu.*usage"
{
mutate{
add_field => {"Art" => "ALG CPU"}
}
}
else if [PerformanceLabel] =~ "'load5'"
{
mutate{
add_field => {"Art" => "ALG Load5"}
}
}
else
{
drop { }
}
}
}
else if [Hostname] =~ "dpf..xgvpn.|drt..2grt.."
{
mutate
{
add_field => {
"Dienst" => "MobZu GenuCard"}
}
if [Hostname] =~ "dpf..xgvpn."
{
if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)"
{
mutate
{
add_field => ["Art", "Traffik" ]
}
}
else if [PerformanceLabel] =~ "VPN Connections"
{
mutate
{
add_field => ["Art", "VPN Connections" ]
}
}
else if [PerformanceLabel] =~ "cpu.*usage"
{
mutate
{
add_field => ["Art", "CPU" ]
}
}
else if [PerformanceLabel] =~ "load5"
{
mutate
{
add_field => ["Art", "Load5" ]
}
}
#else
#{
# drop { }
#}
}
else if [Hostname] =~ "drt..2grt.."
{
if [PerformanceLabel] =~ "'Tu.* - ([a-zA-Z]+).*_traffic_(in|out)'"
{
mutate
{
add_field => ["Art", "GRE Tunnel Use pro Nutzer" ]
}
}
#else
#{
# drop { }
#}
}
}
# else if [Hostname] =~ "dkr..xwkr..|dls..3wlb0.|drt..2wrt.."
# {
# mutate
# {
# add_field => {
# "Dienst" => "MobZu SinaWS"
# }
# }
# }
# else if [Hostname] =~ "dls..xnsg.."
# {
# mutate
# {
# add_field => {
# "Dienst" => "MobZu NCP"
# }
# }
# }
# else if [Hostname] =~ "dls..3elb..|dpf..xeco..|dpfe.i00015
#"
# {
# mutate
# {
# add_field => {
# "Dienst" => "MobZu ECOS"
# }
# }
# }
}
else
{
drop { }
}
}
output{
stdout
{
codec => rubydebug
}
# kafka {
# codec => json
# topic_id => "DTBS"
# }
# http {
# url => 'http://mlsebzlstr1:8080'
# http_method => post
# retry_non_idempotent => true
# format => json_batch
# http_compression => true
#}
}
# vim:ts=4:sw=4
```