249 lines
10 KiB
Python
249 lines
10 KiB
Python
import whisper
|
|
from pyannote.audio import Pipeline
|
|
import torch
|
|
print(torch.__version__)
|
|
from fuzzywuzzy import fuzz
|
|
#print(torch.cuda.is_available()) # True, wenn GPU verfügbar und erkannt
|
|
|
|
# Im Skript, oder besser als Umgebungsvariable
|
|
import os
|
|
|
|
import certifi
|
|
os.environ['SSL_CERT_FILE'] = certifi.where()
|
|
|
|
#os.environ["HF_AUTH_TOKEN"] = "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"
|
|
os.environ["HF_AUTH_TOKEN"] = "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"
|
|
|
|
|
|
from pydub import AudioSegment
|
|
import os
|
|
|
|
|
|
def load_audio(file_path):
|
|
"""Lädt eine Audiodatei und konvertiert sie bei Bedarf nach WAV."""
|
|
audio = AudioSegment.from_file(file_path)
|
|
# Optional: Sicherstellen, dass es Mono ist und eine Standard-Samplerate hat
|
|
# für bessere Kompatibilität mit ASR/Diarisierung
|
|
if audio.channels > 1:
|
|
audio = audio.set_channels(1)
|
|
if audio.frame_rate != 16000: # Viele Modelle bevorzugen 16kHz
|
|
audio = audio.set_frame_rate(16000)
|
|
|
|
# Temporäre WAV-Datei speichern, da manche Bibliotheken Dateipfade bevorzugen
|
|
temp_wav_path = "temp_audio.wav"
|
|
audio.export(temp_wav_path, format="wav")
|
|
return audio, temp_wav_path
|
|
|
|
|
|
|
|
# Authentifizierung mit Hugging Face Token
|
|
#pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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")
|
|
# Oder wenn als Umgebungsvariable gesetzt:
|
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization",use_auth_token="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")
|
|
|
|
|
|
def diarize_audio(audio_path):
|
|
print("Starte Diarisierung...")
|
|
try:
|
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization") # token wird aus env-var geholt
|
|
diarization_result = pipeline(audio_path)
|
|
speakers = []
|
|
for turn, _, speaker in diarization_result.itertracks(yield_label=True):
|
|
speakers.append({
|
|
"start": turn.start,
|
|
"end": turn.end,
|
|
"speaker": speaker
|
|
})
|
|
print("Diarisierung abgeschlossen.")
|
|
return speakers
|
|
except Exception as e:
|
|
print(f"Fehler bei der Diarisierung: {e}")
|
|
print("Stelle sicher, dass dein Hugging Face Token gültig ist und du die Modellbedingungen akzeptiert hast.")
|
|
return []
|
|
|
|
|
|
model = whisper.load_model("medium") # Nur einmal laden
|
|
|
|
|
|
def transcribe_audio(audio_path, language="sv"):
|
|
print(f"Starte Transkription in Sprache '{language}'...")
|
|
try:
|
|
# Hier ist die Zeile, die zuvor den Fehler verursacht hat, jetzt mit try-except
|
|
model.eval() # Dies sollte jetzt funktionieren, wenn das Modell geladen wurde
|
|
result = model.transcribe(audio_path, language=language)
|
|
print("Transkription abgeschlossen.")
|
|
return result["segments"]
|
|
except Exception as e:
|
|
print(f"Fehler bei der Transkription: {e}")
|
|
return []
|
|
|
|
|
|
def identify_phrases(diarization_segments, transcription_segments, audio_data, output_dir="output_phrases"):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Automatische Zuordnung der Sprecher-IDs (verbessert)
|
|
male_speaker_id = None
|
|
female_speaker_id = None
|
|
|
|
# Einfache Heuristik: Der erste Sprecher, der am längsten spricht, ist der erste Typ
|
|
# und der nächste ist der zweite Typ. Annahme: Es gibt genau zwei Sprecher.
|
|
speaker_durations = {}
|
|
for seg in diarization_segments:
|
|
speaker_durations[seg['speaker']] = speaker_durations.get(seg['speaker'], 0) + (seg['end'] - seg['start'])
|
|
|
|
if len(speaker_durations) >= 2:
|
|
# Sortiere nach der Dauer, um die Hauptsprecher zu finden, aber wir brauchen ihre IDs
|
|
# Eine einfachere Annahme ist, dass die ersten beiden unterschiedlichen Speaker_IDs
|
|
# Männlich und Weiblich sind, basierend auf der Konsistenz deines Audios.
|
|
|
|
# Finde die ersten beiden unterschiedlichen Sprecher-IDs in der Reihenfolge ihres Auftretens
|
|
seen_speakers = []
|
|
for seg in diarization_segments:
|
|
if seg['speaker'] not in seen_speakers:
|
|
seen_speakers.append(seg['speaker'])
|
|
if len(seen_speakers) == 2:
|
|
break
|
|
|
|
if len(seen_speakers) == 2:
|
|
male_speaker_id = seen_speakers[0] # Annahme: erster ist männlich
|
|
female_speaker_id = seen_speakers[1] # Annahme: zweiter ist weiblich
|
|
else:
|
|
print("Konnte nicht genügend unterschiedliche Sprecher-IDs für die automatische Zuordnung finden.")
|
|
return []
|
|
else:
|
|
print("Nicht genügend Sprecher für die Diarisierung gefunden. Benötige mindestens 2.")
|
|
return []
|
|
|
|
print(f"Zugewiesene Sprecher-IDs: Männlich='{male_speaker_id}', Weiblich='{female_speaker_id}'")
|
|
|
|
phrases = []
|
|
current_male_phrase = None
|
|
|
|
# Kombiniere Diarisierung und Transkription für eine bessere Analyse
|
|
# Erstelle eine Liste von kombinierten Segmenten
|
|
combined_segments = []
|
|
for d_seg in diarization_segments:
|
|
d_start_ms = int(d_seg['start'] * 1000)
|
|
d_end_ms = int(d_seg['end'] * 1000)
|
|
|
|
# Transkriptionen für dieses Diarisierungssegment sammeln
|
|
segment_transcription = ""
|
|
for t_seg in transcription_segments:
|
|
t_start_ms = int(t_seg['start'] * 1000)
|
|
t_end_ms = int(t_seg['end'] * 1000)
|
|
|
|
# Überlappung prüfen (oder t_seg komplett innerhalb von d_seg)
|
|
if max(d_start_ms, t_start_ms) < min(d_end_ms, t_end_ms):
|
|
segment_transcription += t_seg['text'].strip() + " "
|
|
|
|
combined_segments.append({
|
|
"start": d_start_ms,
|
|
"end": d_end_ms,
|
|
"speaker": d_seg['speaker'],
|
|
"text": segment_transcription.strip()
|
|
})
|
|
|
|
for i, seg in enumerate(combined_segments):
|
|
speaker = seg['speaker']
|
|
current_text = seg['text']
|
|
start_ms = seg['start']
|
|
end_ms = seg['end']
|
|
|
|
if speaker == male_speaker_id:
|
|
# Wir haben einen männlichen Part. Speichere ihn als potenziellen Start einer Phrase.
|
|
current_male_phrase = {
|
|
"start": start_ms,
|
|
"end": end_ms,
|
|
"text": current_text
|
|
}
|
|
elif speaker == female_speaker_id and current_male_phrase:
|
|
# Jetzt haben wir einen weiblichen Part nach einem männlichen Part.
|
|
# Prüfe, ob die weibliche Äußerung die männliche wiederholt und innerhalb einer Zeit toleranz liegt.
|
|
|
|
# Maximale Lücke zwischen männlicher Äußerung und weiblicher Wiederholung
|
|
max_gap_ms = 1500 # 1.5 Sekunden Toleranz für die Lücke
|
|
|
|
if start_ms - current_male_phrase['end'] <= max_gap_ms:
|
|
# Prüfe die Textähnlichkeit
|
|
male_text_normalized = current_male_phrase['text'].lower().strip()
|
|
female_text_normalized = current_text.lower().strip()
|
|
|
|
# Verbesserter Textvergleich mit fuzzywuzzy
|
|
# fuzz.ratio: Basierend auf der Levenshtein-Distanz
|
|
# fuzz.token_set_ratio: Besser für unterschiedliche Reihenfolge oder zusätzliche Wörter
|
|
similarity_score = fuzz.token_set_ratio(male_text_normalized, female_text_normalized)
|
|
|
|
# Schwellenwert für die Ähnlichkeit (anpassbar)
|
|
similarity_threshold = 80 # 80% Ähnlichkeit
|
|
|
|
if similarity_score >= similarity_threshold and len(female_text_normalized) > 0:
|
|
# Erfolgreich eine Phrase gefunden!
|
|
phrase_start_ms = current_male_phrase['start']
|
|
phrase_end_ms = end_ms # Ende der weiblichen Äußerung
|
|
|
|
phrase_audio_segment = audio_data[phrase_start_ms:phrase_end_ms]
|
|
|
|
# Dateiname bereinigen
|
|
clean_filename = "".join(c for c in female_text_normalized if c.isalnum() or c in " ").strip()
|
|
clean_filename = clean_filename.replace(" ", "_").replace("__", "_")[:60] # Max 60 Zeichen
|
|
|
|
# Optional: Wenn der Dateiname leer ist (z.B. bei Transkriptionsfehler), einen Fallback verwenden
|
|
if not clean_filename:
|
|
clean_filename = f"unnamed_phrase_{phrase_start_ms}"
|
|
|
|
output_filename = os.path.join(output_dir, f"{clean_filename}.wav") # Keine Zeitstempel im Namen
|
|
|
|
# Vermeide doppelte Dateinamen bei sehr ähnlichen Phrasen
|
|
counter = 1
|
|
base_filename = output_filename
|
|
while os.path.exists(output_filename):
|
|
output_filename = os.path.join(output_dir, f"{clean_filename}_{counter}.wav")
|
|
counter += 1
|
|
|
|
phrase_audio_segment.export(output_filename, format="wav")
|
|
print(
|
|
f"Gespeichert: {output_filename} (Männlich: '{current_male_phrase['text']}', Weiblich: '{current_text}', Ähnlichkeit: {similarity_score}%)")
|
|
|
|
phrases.append({
|
|
"male_text": current_male_phrase['text'],
|
|
"female_text": current_text,
|
|
"start": phrase_start_ms,
|
|
"end": phrase_end_ms,
|
|
"filename": output_filename
|
|
})
|
|
current_male_phrase = None # Reset für die nächste Phrase
|
|
else:
|
|
# Weibliche Äußerung war keine ausreichende Wiederholung oder zu unähnlich
|
|
current_male_phrase = None # Reset
|
|
else:
|
|
# Lücke zwischen männlich und weiblich zu groß
|
|
current_male_phrase = None # Reset
|
|
else:
|
|
# Wenn ein anderer Sprecher (z.B. weiblich) spricht, ohne dass ein männlicher Part vorherging,
|
|
# oder der aktuelle Sprecher ist nicht der weibliche Part nach dem männlichen Part,
|
|
# dann setzen wir den männlichen Part zurück, da die Sequenz unterbrochen ist.
|
|
current_male_phrase = None
|
|
|
|
return phrases
|
|
|
|
|
|
def main(input_audio_file):
|
|
# 1. Audio laden und vorbereiten
|
|
audio, temp_wav_path = load_audio(input_audio_file)
|
|
|
|
# 2. Sprecher-Diarisierung
|
|
diarization_segments = diarize_audio(temp_wav_path)
|
|
|
|
# 3. Spracherkennung
|
|
transcription_segments = transcribe_audio(temp_wav_path)
|
|
|
|
# 4. Phrasen identifizieren und speichern
|
|
identified_phrases = identify_phrases(diarization_segments, transcription_segments, audio, "31-50/output_phrases")
|
|
|
|
# 5. Temporäre Datei aufräumen
|
|
os.remove(temp_wav_path)
|
|
print(f"Zerlegt in {len(identified_phrases)} Phrasen.")
|
|
|
|
if __name__ == "__main__":
|
|
audio_file = "31-50/31-50.wav" # Ersetze dies durch deinen Dateipfad
|
|
main(audio_file) |