aus wiki übernommen und sql erweitert
This commit is contained in:
@@ -0,0 +1,595 @@
|
||||
# Neues Repo
|
||||
|
||||
https://github.com/conduktor/kafka-stack-docker-compose
|
||||
|
||||
https://github.com/conduktor/kafka-beginners-course
|
||||
|
||||
## Installation im STC
|
||||
|
||||
1. Download des Kafka `curl "https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz`
|
||||
|
||||
# Dockerfiles Sicherung
|
||||
|
||||
```
|
||||
docker save ...
|
||||
```
|
||||
|
||||
# Dockerfiles Restore
|
||||
|
||||
```
|
||||
docker image load -i <tar>
|
||||
```
|
||||
|
||||
|
||||
|
||||
# Notizen STC
|
||||
|
||||
apt install docker docker-compose
|
||||
|
||||
1. Start läuft auf Fehler:
|
||||
|
||||
ERROR: for zoo3 Cannot start service zoo3: OCI runtime create failed: container_linux.go:377: starting container process caused: process_linux.go:495: container init caused: process_linux.go:458: **setting cgroup config for procHooks process caused: can't load program:** function not implemented: unknown
|
||||
|
||||
Der NdB-Kernel unterstüzt das nicht, daher gepurged und den Standardkernel hochgefahren
|
||||
|
||||
```
|
||||
docker compose -f zk-single-kafka-multiple.yml up
|
||||
```
|
||||
|
||||
## Ablauf
|
||||
|
||||
1. lst02
|
||||
|
||||
1. Kopieren der Files via cp-Skripte
|
||||
|
||||
2. Auslesen und Versenden der Files per Python-Skript an die lstk1
|
||||
|
||||
```
|
||||
python3 transportv0.9.cpython-39.pyc
|
||||
```
|
||||
|
||||
2. lstk1
|
||||
|
||||
1. den logstash starten
|
||||
|
||||
```
|
||||
/usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf
|
||||
```
|
||||
|
||||
Hier werden die Daten entsprechend Forderung gefiltert
|
||||
|
||||
2. zookeeper starten
|
||||
|
||||
```
|
||||
/usr/share/kafka<ver>/bin/zookeeper-server-start.sh ../config/zookeeper.properties
|
||||
```
|
||||
|
||||
3. kafka starten
|
||||
|
||||
```
|
||||
/usr/share/kafka<ver>/bin/kafka-server-start.sh ../config/server.properties
|
||||
```
|
||||
|
||||
Damit Daten abgerufen werden können, muss folgender iptables-Eintrag vorgenommen werden
|
||||
|
||||
```
|
||||
iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
|
||||
```
|
||||
|
||||
## Nagflux.gcfg
|
||||
|
||||
```
|
||||
[main]
|
||||
NagiosSpoolfileFolder = "/var/spool/perfdata"
|
||||
NagiosSpoolfileWorker = 1
|
||||
InfluxWorker = 1
|
||||
MaxInfluxWorker = 1
|
||||
DumpFile = "nagflux.dump"
|
||||
NagfluxSpoolfileFolder = "/var/spool/nagflux"
|
||||
FieldSeparator = "&"
|
||||
BufferSize = 10000
|
||||
FileBufferSize = 65536
|
||||
# If the performancedata does not have a certain target set with NAGFLUX:TARGET.
|
||||
# The following field will define the target for this data.
|
||||
# "all" sends the data to all Targets(every Influxdb, Elasticsearch...)
|
||||
# a certain name will direct the data to this certain target
|
||||
DefaultTarget = "logstashpipe"
|
||||
|
||||
|
||||
[Log]
|
||||
LogFile = ""
|
||||
MinSeverity = "INFO"
|
||||
|
||||
[JSONFileExport "logstashpipe"]
|
||||
Enabled = true
|
||||
Path = /tmp/logstash_out
|
||||
AutomaticFileRotation = "1"
|
||||
```
|
||||
|
||||
## Bash-Skript für den Transport
|
||||
|
||||
```
|
||||
#!/bin/bash
|
||||
|
||||
set -x
|
||||
|
||||
# configure network timeouts
|
||||
folder=/tmp/logstash_out
|
||||
maxtries=360 # 360 * 10s => nach 3600s = 1h = upload abbrechen, nächste Datei
|
||||
timewait=10
|
||||
|
||||
while true; do
|
||||
files=$(find $folder -type f -name "perfdata_*" -mmin +1 | sort)
|
||||
for file in $files ; do
|
||||
try=0
|
||||
rc=0
|
||||
until [ $try -gt $maxtries ]
|
||||
do
|
||||
cat $file | netcat -w1 mlsebzlstk1 8080
|
||||
rc=$?
|
||||
[ "$rc" == "0" ] && break
|
||||
try=$((try+1))
|
||||
sleep $timewait
|
||||
done
|
||||
|
||||
# remove file or rename for timeout
|
||||
if [ "$rc" == "0" ] ; then
|
||||
rm $file
|
||||
else
|
||||
mv $file $folder/timeout_$(date +%s)
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$files" == "" ] ; then
|
||||
echo "nothing to do, lets wait 10s"
|
||||
sleep 10
|
||||
fi
|
||||
done
|
||||
```
|
||||
|
||||
## Derzeitiges Pythonskript
|
||||
|
||||
```
|
||||
import time, os, sys, socket, shutil
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import PatternMatchingEventHandler
|
||||
from watchdog.events import FileCreatedEvent
|
||||
from queue import Queue
|
||||
from threading import Thread
|
||||
from datetime import datetime
|
||||
|
||||
"""
|
||||
Prozessklasse
|
||||
"""
|
||||
|
||||
class MyHandler(PatternMatchingEventHandler):
|
||||
ignore_patterns = None
|
||||
ignore_directories = False
|
||||
case_sensitive = False
|
||||
patterns=['perfdata*']
|
||||
|
||||
def on_modified(self, event):
|
||||
#print(f'event type: {event.event_type} path : {event.src_path}')
|
||||
pass
|
||||
|
||||
#self.queue.put(event.src_path)
|
||||
|
||||
def on_created(self,event):
|
||||
print(f'event type: {event.event_type} path : {event.src_path}')
|
||||
watchdog_queue = Queue()
|
||||
e = FileCreatedEvent(event.src_path)
|
||||
watchdog_queue.put(e)
|
||||
|
||||
|
||||
worker = Thread(target=process_queue, args=(watchdog_queue,))
|
||||
print("Thread gestartet: " + worker)
|
||||
worker.setDaemon(True)
|
||||
worker.start()
|
||||
|
||||
def on_deleted(self,event):
|
||||
print(f'event type: {event.event_type} path : {event.src_path}')
|
||||
|
||||
def on_moved(self,event):
|
||||
print(f'event type: {event.event_type} path : {event.src_path}')
|
||||
|
||||
|
||||
|
||||
|
||||
def sendfile(client, file_):
|
||||
|
||||
f = open(file_, "r")
|
||||
try:
|
||||
client.connect((hostname, port))
|
||||
try:
|
||||
# f = open(file_, "r")
|
||||
data = f.read()
|
||||
# print(data)
|
||||
client.sendall(data.encode("utf-8"))
|
||||
f.close()
|
||||
os.remove(file_)
|
||||
client.close()
|
||||
except:
|
||||
client.close()
|
||||
print("Datei wurde nicht versandt: " +file_)
|
||||
os.rename(file_ + "zzzz")
|
||||
f.close()
|
||||
|
||||
except socket.error as e:
|
||||
print ("error connecting socket: %s" %e)
|
||||
client.close()
|
||||
#os.rename(file_ +str(int(datetime.now().timestamp())))
|
||||
|
||||
def process_queue(q):
|
||||
while True:
|
||||
if not q.empty():
|
||||
event = q.get()
|
||||
print("New event %s" % event)
|
||||
modification_time = int(os.path.getmtime(event.src_path))
|
||||
print(modification_time)
|
||||
now = int(datetime.now().timestamp())
|
||||
print(now-modification_time)
|
||||
##### hier mit while arbeiten
|
||||
while (int(datetime.now().timestamp()) - modification_time < 10):
|
||||
print("Ich warte")
|
||||
time.sleep(11) # warten von 10sec
|
||||
# nach verlassen sollte File älter als 10sec sein
|
||||
print ("Warten verlassen")
|
||||
|
||||
if (int(datetime.now().timestamp())- modification_time > 10):
|
||||
print("Datei ist älter als 10sec")
|
||||
## senden
|
||||
try:
|
||||
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sendfile(client, event.src_path)
|
||||
except:
|
||||
print("Datei muss gemoved werden")
|
||||
|
||||
|
||||
## gesendet --> ja
|
||||
|
||||
## fehlerhaftes senden
|
||||
## mv file namentlich
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
"""
|
||||
init-Daten
|
||||
"""
|
||||
hostname = "mlsebzlstk1"
|
||||
port = 8080
|
||||
watchdog_queue = Queue()
|
||||
patterns = ['perfdata']
|
||||
watchDir = "/tmp/logstash_out"
|
||||
|
||||
# watchDir = "/tmp/test"
|
||||
|
||||
"""
|
||||
Einlesen des Verzeichnisses und Versenden, als eigene Threads
|
||||
"""
|
||||
for file in os.listdir(watchDir):
|
||||
filename = os.path.join(watchDir, file)
|
||||
|
||||
# Erzeugen eines FileCreate Events
|
||||
event = FileCreatedEvent(filename)
|
||||
|
||||
# Einfügen in die watchdog-Queue
|
||||
watchdog_queue.put(event)
|
||||
|
||||
|
||||
"""
|
||||
Starten des Threads
|
||||
"""
|
||||
|
||||
worker = Thread(target=process_queue, args=(watchdog_queue,))
|
||||
# worker.setDaemon(True)
|
||||
worker.start()
|
||||
|
||||
#event_handler = FileWatchdog(watchdog_queue, patterns="perfdata*")
|
||||
|
||||
my_event_handler = MyHandler()
|
||||
|
||||
|
||||
go_recursively = False
|
||||
my_observer = Observer()
|
||||
my_observer.schedule(my_event_handler, watchDir, recursive=go_recursively)
|
||||
|
||||
my_observer.start()
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
my_observer.stop()
|
||||
my_observer.join()
|
||||
```
|
||||
|
||||
## conf-File des logstashs auf der Empfängerseite
|
||||
|
||||
Hier werden auf gleich die Daten an den Kafka übergeben
|
||||
|
||||
Start: mittels systemctl oder aber zum Testen /usr/share/bin/logstash -f /etc/logstash/conf.d/importLogstash.conf
|
||||
|
||||
```
|
||||
input {
|
||||
tcp {
|
||||
host => "mlsebzlstk1"
|
||||
port => 8080
|
||||
codec => json
|
||||
}
|
||||
#http {
|
||||
# port => 12000
|
||||
# additional_codecs => { "application/json" => "json_lines" }
|
||||
#}
|
||||
}
|
||||
|
||||
filter{
|
||||
json {
|
||||
# ecs_compatibility => disabled
|
||||
source => "message"
|
||||
target => "parsed_json"
|
||||
}
|
||||
|
||||
#if [Hostname] =~ "dkr..x(skr|ssk).." or [Hostname] =~ "dpf..xivpn." or [Hostname] =~ "dha..xialg." or [Hostname] =~ "dpf..xgvpn." or [Hostname] =~ "drt..2grt.." or [Hostname] =~ "dkr..xwkr.."
|
||||
if [Hostname] =~ "dkr..x(skr|ssk)..|dpf..xivpn.|dha..xialg." #|dpf..xgvpn.|drt..2grt..|dkr..xwkr..|dls..3wlb0.|drt..2wrt..|dls..xnsg..|dls..3elb..|dpf..xeco..|dpfe.i00015"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => {
|
||||
"ZGR" => "1"
|
||||
}
|
||||
}
|
||||
|
||||
if [Hostname] =~ "dkr..x(skr|ssk).."
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => {
|
||||
"Dienst" => "MobZu SecuSuite"
|
||||
}
|
||||
}
|
||||
|
||||
if [PerformanceLabel] =~ "eth3_traffic_(in|out)"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => ["Art", "Traffik" ]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
else if [PerformaceLabel] =~ "(ike|child)sacount"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => ["Art", "VPN Connections"]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
else if [PerformaceLabel] =~ "cpu_usage"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => ["Art", "CPU"]
|
||||
}
|
||||
}
|
||||
mkdir ~/kafka && cd ~/kafka
|
||||
else if [PerformaceLabel] =~ "'load-5'"
|
||||
{
|
||||
mutatemkdir ~/kafka && cd ~/kafka
|
||||
{
|
||||
add_field => ["Art", "Load5"]
|
||||
}
|
||||
}
|
||||
|
||||
else if [PerformaceLabel] =~ "memory_usage"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => ["Art", "Memory"]
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
drop {}
|
||||
}
|
||||
}
|
||||
|
||||
else if [Hostname] =~ "dpf..xivpn.|dha..xialg."
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => {
|
||||
"Dienst" => "MobZu IOS"
|
||||
}
|
||||
}
|
||||
|
||||
if [Hostname] =~ "dpf..xivpn."
|
||||
{
|
||||
if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => {"Art" => "Gateway Traffik"
|
||||
"Service" => "Interface: em0"
|
||||
}
|
||||
}
|
||||
}
|
||||
else if [PerformanceLabel] =~ "VPN Connections"
|
||||
{
|
||||
mutate{
|
||||
add_field => {"Art" => "Gateway VPN Connections"}
|
||||
}
|
||||
}
|
||||
else if [PerformanceLabel] =~ "cpu.*usage"
|
||||
{
|
||||
mutate{
|
||||
add_field => {"Art" => "Gateway CPU"}
|
||||
}
|
||||
}
|
||||
else if [PerformanceLabel] =~ "'load5'"
|
||||
{
|
||||
mutate{
|
||||
add_field => {"Art" => "Gateway Load5"}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
drop { }
|
||||
}
|
||||
|
||||
}
|
||||
else if [Hostname] =~ "dha..xialg."
|
||||
{
|
||||
if [PerformanceLabel] =~ "_traffic_(in|out)"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => {"Art" => "ALG Traffik"
|
||||
"Service" => "Interface: em[01]"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
else if [PerformanceLabel] =~ "cpu.*usage"
|
||||
{
|
||||
mutate{
|
||||
add_field => {"Art" => "ALG CPU"}
|
||||
}
|
||||
|
||||
}
|
||||
else if [PerformanceLabel] =~ "'load5'"
|
||||
{
|
||||
mutate{
|
||||
add_field => {"Art" => "ALG Load5"}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
drop { }
|
||||
}
|
||||
}
|
||||
}
|
||||
else if [Hostname] =~ "dpf..xgvpn.|drt..2grt.."
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => {
|
||||
"Dienst" => "MobZu GenuCard"}
|
||||
}
|
||||
if [Hostname] =~ "dpf..xgvpn."
|
||||
{
|
||||
if [PerformanceLabel] =~ "em0 - .*_traffic_(in|out)"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => ["Art", "Traffik" ]
|
||||
}
|
||||
}
|
||||
else if [PerformanceLabel] =~ "VPN Connections"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => ["Art", "VPN Connections" ]
|
||||
}
|
||||
}
|
||||
else if [PerformanceLabel] =~ "cpu.*usage"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => ["Art", "CPU" ]
|
||||
}
|
||||
}
|
||||
else if [PerformanceLabel] =~ "load5"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => ["Art", "Load5" ]
|
||||
}
|
||||
}
|
||||
#else
|
||||
#{
|
||||
# drop { }
|
||||
#}
|
||||
}
|
||||
else if [Hostname] =~ "drt..2grt.."
|
||||
{
|
||||
if [PerformanceLabel] =~ "'Tu.* - ([a-zA-Z]+).*_traffic_(in|out)'"
|
||||
{
|
||||
mutate
|
||||
{
|
||||
add_field => ["Art", "GRE Tunnel Use pro Nutzer" ]
|
||||
}
|
||||
}
|
||||
#else
|
||||
#{
|
||||
# drop { }
|
||||
#}
|
||||
}
|
||||
}
|
||||
# else if [Hostname] =~ "dkr..xwkr..|dls..3wlb0.|drt..2wrt.."
|
||||
# {
|
||||
# mutate
|
||||
# {
|
||||
# add_field => {
|
||||
# "Dienst" => "MobZu SinaWS"
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
# else if [Hostname] =~ "dls..xnsg.."
|
||||
# {
|
||||
# mutate
|
||||
# {
|
||||
# add_field => {
|
||||
# "Dienst" => "MobZu NCP"
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
# else if [Hostname] =~ "dls..3elb..|dpf..xeco..|dpfe.i00015
|
||||
#"
|
||||
# {
|
||||
# mutate
|
||||
# {
|
||||
# add_field => {
|
||||
# "Dienst" => "MobZu ECOS"
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
drop { }
|
||||
}
|
||||
}
|
||||
|
||||
output{
|
||||
stdout
|
||||
{
|
||||
codec => rubydebug
|
||||
}
|
||||
# kafka {
|
||||
|
||||
# codec => json
|
||||
# topic_id => "DTBS"
|
||||
# }
|
||||
|
||||
|
||||
|
||||
|
||||
# http {
|
||||
# url => 'http://mlsebzlstr1:8080'
|
||||
# http_method => post
|
||||
# retry_non_idempotent => true
|
||||
# format => json_batch
|
||||
# http_compression => true
|
||||
#}
|
||||
}
|
||||
|
||||
# vim:ts=4:sw=4
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user