596 lines
17 KiB
Markdown
596 lines
17 KiB
Markdown
# Neues Repo
|
|
|
|
https://github.com/conduktor/kafka-stack-docker-compose
|
|
|
|
https://github.com/conduktor/kafka-beginners-course
|
|
|
|
## Installation im STC
|
|
|
|
1. Download des Kafka `curl "https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz`
|
|
|
|
# Dockerfiles Sicherung
|
|
|
|
```
|
|
docker save ...
|
|
```
|
|
|
|
# Dockerfiles Restore
|
|
|
|
```
|
|
docker image load -i <tar>
|
|
```
|
|
|
|
|
|
|
|
# Notizen STC
|
|
|
|
apt install docker docker-compose
|
|
|
|
1. Start läuft auf Fehler:
|
|
|
|
ERROR: for zoo3 Cannot start service zoo3: OCI runtime create failed: container_linux.go:377: starting container process caused: process_linux.go:495: container init caused: process_linux.go:458: **setting cgroup config for procHooks process caused: can't load program:** function not implemented: unknown
|
|
|
|
Der NdB-Kernel unterstüzt das nicht, daher gepurged und den Standardkernel hochgefahren
|
|
|
|
```
|
|
docker compose -f zk-single-kafka-multiple.yml up
|
|
```
|
|
|
|
## Ablauf
|
|
|
|
1. lst02
|
|
|
|
1. Kopieren der Files via cp-Skripte
|
|
|
|
2. Auslesen und Versenden der Files per Python-Skript an die lstk1
|
|
|
|
```
|
|
python3 transportv0.9.cpython-39.pyc
|
|
```
|
|
|
|
2. lstk1
|
|
|
|
1. den logstash starten
|
|
|
|
```
|
|
/usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf
|
|
```
|
|
|
|
Hier werden die Daten entsprechend Forderung gefiltert
|
|
|
|
2. zookeeper starten
|
|
|
|
```
|
|
/usr/share/kafka<ver>/bin/zookeeper-server-start.sh ../config/zookeeper.properties
|
|
```
|
|
|
|
3. kafka starten
|
|
|
|
```
|
|
/usr/share/kafka<ver>/bin/kafka-server-start.sh ../config/server.properties
|
|
```
|
|
|
|
Damit Daten abgerufen werden können, muss folgender iptables-Eintrag vorgenommen werden
|
|
|
|
```
|
|
iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
|
|
```
|
|
|
|
## Nagflux.gcfg
|
|
|
|
```
|
|
[main]
|
|
NagiosSpoolfileFolder = "/var/spool/perfdata"
|
|
NagiosSpoolfileWorker = 1
|
|
InfluxWorker = 1
|
|
MaxInfluxWorker = 1
|
|
DumpFile = "nagflux.dump"
|
|
NagfluxSpoolfileFolder = "/var/spool/nagflux"
|
|
FieldSeparator = "&"
|
|
BufferSize = 10000
|
|
FileBufferSize = 65536
|
|
# If the performancedata does not have a certain target set with NAGFLUX:TARGET.
|
|
# The following field will define the target for this data.
|
|
# "all" sends the data to all Targets(every Influxdb, Elasticsearch...)
|
|
# a certain name will direct the data to this certain target
|
|
DefaultTarget = "logstashpipe"
|
|
|
|
|
|
[Log]
|
|
LogFile = ""
|
|
MinSeverity = "INFO"
|
|
|
|
[JSONFileExport "logstashpipe"]
|
|
Enabled = true
|
|
Path = /tmp/logstash_out
|
|
AutomaticFileRotation = "1"
|
|
```
|
|
|
|
## Bash-Skript für den Transport
|
|
|
|
```
|
|
#!/bin/bash
|
|
|
|
set -x
|
|
|
|
# configure network timeouts
|
|
folder=/tmp/logstash_out
|
|
maxtries=360 # 360 * 10s => nach 3600s = 1h = upload abbrechen, nächste Datei
|
|
timewait=10
|
|
|
|
while true; do
|
|
files=$(find $folder -type f -name "perfdata_*" -mmin +1 | sort)
|
|
for file in $files ; do
|
|
try=0
|
|
rc=0
|
|
until [ $try -gt $maxtries ]
|
|
do
|
|
cat $file | netcat -w1 mlsebzlstk1 8080
|
|
rc=$?
|
|
[ "$rc" == "0" ] && break
|
|
try=$((try+1))
|
|
sleep $timewait
|
|
done
|
|
|
|
# remove file or rename for timeout
|
|
if [ "$rc" == "0" ] ; then
|
|
rm $file
|
|
else
|
|
mv $file $folder/timeout_$(date +%s)
|
|
fi
|
|
done
|
|
|
|
if [ "$files" == "" ] ; then
|
|
echo "nothing to do, lets wait 10s"
|
|
sleep 10
|
|
fi
|
|
done
|
|
```
|
|
|
|
## Derzeitiges Pythonskript
|
|
|
|
```
|
|
import time, os, sys, socket, shutil
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import PatternMatchingEventHandler
|
|
from watchdog.events import FileCreatedEvent
|
|
from queue import Queue
|
|
from threading import Thread
|
|
from datetime import datetime
|
|
|
|
"""
|
|
Prozessklasse
|
|
"""
|
|
|
|
class MyHandler(PatternMatchingEventHandler):
|
|
ignore_patterns = None
|
|
ignore_directories = False
|
|
case_sensitive = False
|
|
patterns=['perfdata*']
|
|
|
|
def on_modified(self, event):
|
|
#print(f'event type: {event.event_type} path : {event.src_path}')
|
|
pass
|
|
|
|
#self.queue.put(event.src_path)
|
|
|
|
def on_created(self,event):
|
|
print(f'event type: {event.event_type} path : {event.src_path}')
|
|
watchdog_queue = Queue()
|
|
e = FileCreatedEvent(event.src_path)
|
|
watchdog_queue.put(e)
|
|
|
|
|
|
worker = Thread(target=process_queue, args=(watchdog_queue,))
|
|
print("Thread gestartet: " + worker)
|
|
worker.setDaemon(True)
|
|
worker.start()
|
|
|
|
def on_deleted(self,event):
|
|
print(f'event type: {event.event_type} path : {event.src_path}')
|
|
|
|
def on_moved(self,event):
|
|
print(f'event type: {event.event_type} path : {event.src_path}')
|
|
|
|
|
|
|
|
|
|
def sendfile(client, file_):
|
|
|
|
f = open(file_, "r")
|
|
try:
|
|
client.connect((hostname, port))
|
|
try:
|
|
# f = open(file_, "r")
|
|
data = f.read()
|
|
# print(data)
|
|
client.sendall(data.encode("utf-8"))
|
|
f.close()
|
|
os.remove(file_)
|
|
client.close()
|
|
except:
|
|
client.close()
|
|
print("Datei wurde nicht versandt: " +file_)
|
|
os.rename(file_ + "zzzz")
|
|
f.close()
|
|
|
|
except socket.error as e:
|
|
print ("error connecting socket: %s" %e)
|
|
client.close()
|
|
#os.rename(file_ +str(int(datetime.now().timestamp())))
|
|
|
|
def process_queue(q):
|
|
while True:
|
|
if not q.empty():
|
|
event = q.get()
|
|
print("New event %s" % event)
|
|
modification_time = int(os.path.getmtime(event.src_path))
|
|
print(modification_time)
|
|
now = int(datetime.now().timestamp())
|
|
print(now-modification_time)
|
|
##### hier mit while arbeiten
|
|
while (int(datetime.now().timestamp()) - modification_time < 10):
|
|
print("Ich warte")
|
|
time.sleep(11) # warten von 10sec
|
|
# nach verlassen sollte File älter als 10sec sein
|
|
print ("Warten verlassen")
|
|
|
|
if (int(datetime.now().timestamp())- modification_time > 10):
|
|
print("Datei ist älter als 10sec")
|
|
## senden
|
|
try:
|
|
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sendfile(client, event.src_path)
|
|
except:
|
|
print("Datei muss gemoved werden")
|
|
|
|
|
|
## gesendet --> ja
|
|
|
|
## fehlerhaftes senden
|
|
## mv file namentlich
|
|
else:
|
|
return
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
"""
|
|
init-Daten
|
|
"""
|
|
hostname = "mlsebzlstk1"
|
|
port = 8080
|
|
watchdog_queue = Queue()
|
|
patterns = ['perfdata']
|
|
watchDir = "/tmp/logstash_out"
|
|
|
|
# watchDir = "/tmp/test"
|
|
|
|
"""
|
|
Einlesen des Verzeichnisses und Versenden, als eigene Threads
|
|
"""
|
|
for file in os.listdir(watchDir):
|
|
filename = os.path.join(watchDir, file)
|
|
|
|
# Erzeugen eines FileCreate Events
|
|
event = FileCreatedEvent(filename)
|
|
|
|
# Einfügen in die watchdog-Queue
|
|
watchdog_queue.put(event)
|
|
|
|
|
|
"""
|
|
Starten des Threads
|
|
"""
|
|
|
|
worker = Thread(target=process_queue, args=(watchdog_queue,))
|
|
# worker.setDaemon(True)
|
|
worker.start()
|
|
|
|
#event_handler = FileWatchdog(watchdog_queue, patterns="perfdata*")
|
|
|
|
my_event_handler = MyHandler()
|
|
|
|
|
|
go_recursively = False
|
|
my_observer = Observer()
|
|
my_observer.schedule(my_event_handler, watchDir, recursive=go_recursively)
|
|
|
|
my_observer.start()
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
my_observer.stop()
|
|
my_observer.join()
|
|
```
|
|
|
|
## conf-File des logstashs auf der Empfängerseite
|
|
|
|
Hier werden auf gleich die Daten an den Kafka übergeben
|
|
|
|
Start: mittels systemctl oder aber zum Testen /usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf
|
|
|
|
```
|
|
input {
|
|
tcp {
|
|
host => "mlsebzlstk1"
|
|
port => 8080
|
|
codec => json
|
|
}
|
|
#http {
|
|
# port => 12000
|
|
# additional_codecs => { "application/json" => "json_lines" }
|
|
#}
|
|
}
|
|
|
|
filter{
|
|
json {
|
|
# ecs_compatibility => disabled
|
|
source => "message"
|
|
target => "parsed_json"
|
|
}
|
|
|
|
#if [Hostname] =~ "dkr..x(skr|ssk).." or [Hostname] =~ "dpf..xivpn." or [Hostname] =~ "dha..xialg." or [Hostname] =~ "dpf..xgvpn." or [Hostname] =~ "drt..2grt.." or [Hostname] =~ "dkr..xwkr.."
|
|
if [Hostname] =~ "dkr..x(skr|ssk)..|dpf..xivpn.|dha..xialg." #|dpf..xgvpn.|drt..2grt..|dkr..xwkr..|dls..3wlb0.|drt..2wrt..|dls..xnsg..|dls..3elb..|dpf..xeco..|dpfe.i00015"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => {
|
|
"ZGR" => "1"
|
|
}
|
|
}
|
|
|
|
if [Hostname] =~ "dkr..x(skr|ssk).."
|
|
{
|
|
mutate
|
|
{
|
|
add_field => {
|
|
"Dienst" => "MobZu SecuSuite"
|
|
}
|
|
}
|
|
|
|
if [PerformanceLabel] =~ "eth3_traffic_(in|out)"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => ["Art", "Traffik" ]
|
|
}
|
|
}
|
|
|
|
|
|
else if [PerformaceLabel] =~ "(ike|child)sacount"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => ["Art", "VPN Connections"]
|
|
}
|
|
}
|
|
|
|
|
|
else if [PerformaceLabel] =~ "cpu_usage"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => ["Art", "CPU"]
|
|
}
|
|
}
|
|
mkdir ~/kafka && cd ~/kafka
|
|
else if [PerformaceLabel] =~ "'load-5'"
|
|
{
|
|
mutatemkdir ~/kafka && cd ~/kafka
|
|
{
|
|
add_field => ["Art", "Load5"]
|
|
}
|
|
}
|
|
|
|
else if [PerformaceLabel] =~ "memory_usage"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => ["Art", "Memory"]
|
|
}
|
|
}
|
|
else
|
|
{
|
|
drop {}
|
|
}
|
|
}
|
|
|
|
else if [Hostname] =~ "dpf..xivpn.|dha..xialg."
|
|
{
|
|
mutate
|
|
{
|
|
add_field => {
|
|
"Dienst" => "MobZu IOS"
|
|
}
|
|
}
|
|
|
|
if [Hostname] =~ "dpf..xivpn."
|
|
{
|
|
if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => {"Art" => "Gateway Traffik"
|
|
"Service" => "Interface: em0"
|
|
}
|
|
}
|
|
}
|
|
else if [PerformanceLabel] =~ "VPN Connections"
|
|
{
|
|
mutate{
|
|
add_field => {"Art" => "Gateway VPN Connections"}
|
|
}
|
|
}
|
|
else if [PerformanceLabel] =~ "cpu.*usage"
|
|
{
|
|
mutate{
|
|
add_field => {"Art" => "Gateway CPU"}
|
|
}
|
|
}
|
|
else if [PerformanceLabel] =~ "'load5'"
|
|
{
|
|
mutate{
|
|
add_field => {"Art" => "Gateway Load5"}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
drop { }
|
|
}
|
|
|
|
}
|
|
else if [Hostname] =~ "dha..xialg."
|
|
{
|
|
if [PerformanceLabel] =~ "_traffic_(in|out)"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => {"Art" => "ALG Traffik"
|
|
"Service" => "Interface: em[01]"
|
|
}
|
|
}
|
|
}
|
|
|
|
else if [PerformanceLabel] =~ "cpu.*usage"
|
|
{
|
|
mutate{
|
|
add_field => {"Art" => "ALG CPU"}
|
|
}
|
|
|
|
}
|
|
else if [PerformanceLabel] =~ "'load5'"
|
|
{
|
|
mutate{
|
|
add_field => {"Art" => "ALG Load5"}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
drop { }
|
|
}
|
|
}
|
|
}
|
|
else if [Hostname] =~ "dpf..xgvpn.|drt..2grt.."
|
|
{
|
|
mutate
|
|
{
|
|
add_field => {
|
|
"Dienst" => "MobZu GenuCard"}
|
|
}
|
|
if [Hostname] =~ "dpf..xgvpn."
|
|
{
|
|
if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => ["Art", "Traffik" ]
|
|
}
|
|
}
|
|
else if [PerformanceLabel] =~ "VPN Connections"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => ["Art", "VPN Connections" ]
|
|
}
|
|
}
|
|
else if [PerformanceLabel] =~ "cpu.*usage"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => ["Art", "CPU" ]
|
|
}
|
|
}
|
|
else if [PerformanceLabel] =~ "load5"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => ["Art", "Load5" ]
|
|
}
|
|
}
|
|
#else
|
|
#{
|
|
# drop { }
|
|
#}
|
|
}
|
|
else if [Hostname] =~ "drt..2grt.."
|
|
{
|
|
if [PerformanceLabel] =~ "'Tu.* - ([a-zA-Z]+).*_traffic_(in|out)'"
|
|
{
|
|
mutate
|
|
{
|
|
add_field => ["Art", "GRE Tunnel Use pro Nutzer" ]
|
|
}
|
|
}
|
|
#else
|
|
#{
|
|
# drop { }
|
|
#}
|
|
}
|
|
}
|
|
# else if [Hostname] =~ "dkr..xwkr..|dls..3wlb0.|drt..2wrt.."
|
|
# {
|
|
# mutate
|
|
# {
|
|
# add_field => {
|
|
# "Dienst" => "MobZu SinaWS"
|
|
# }
|
|
# }
|
|
# }
|
|
# else if [Hostname] =~ "dls..xnsg.."
|
|
# {
|
|
# mutate
|
|
# {
|
|
# add_field => {
|
|
# "Dienst" => "MobZu NCP"
|
|
# }
|
|
# }
|
|
# }
|
|
# else if [Hostname] =~ "dls..3elb..|dpf..xeco..|dpfe.i00015
|
|
#"
|
|
# {
|
|
# mutate
|
|
# {
|
|
# add_field => {
|
|
# "Dienst" => "MobZu ECOS"
|
|
# }
|
|
# }
|
|
# }
|
|
|
|
}
|
|
else
|
|
{
|
|
drop { }
|
|
}
|
|
}
|
|
|
|
output{
|
|
stdout
|
|
{
|
|
codec => rubydebug
|
|
}
|
|
# kafka {
|
|
|
|
# codec => json
|
|
# topic_id => "DTBS"
|
|
# }
|
|
|
|
|
|
|
|
|
|
# http {
|
|
# url => 'http://mlsebzlstr1:8080'
|
|
# http_method => post
|
|
# retry_non_idempotent => true
|
|
# format => json_batch
|
|
# http_compression => true
|
|
#}
|
|
}
|
|
|
|
# vim:ts=4:sw=4
|
|
```
|
|
|