Files
txt2apkg/GoetheVerlag/mp3/Trennen3.py
2025-05-31 19:33:05 +02:00

249 lines
10 KiB
Python

import whisper
from pyannote.audio import Pipeline
import torch
print(torch.__version__)
from fuzzywuzzy import fuzz
#print(torch.cuda.is_available()) # True, wenn GPU verfügbar und erkannt
# Im Skript, oder besser als Umgebungsvariable
import os
import certifi
os.environ['SSL_CERT_FILE'] = certifi.where()
#os.environ["HF_AUTH_TOKEN"] = "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"
os.environ["HF_AUTH_TOKEN"] = "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"
from pydub import AudioSegment
import os
def load_audio(file_path):
"""Lädt eine Audiodatei und konvertiert sie bei Bedarf nach WAV."""
audio = AudioSegment.from_file(file_path)
# Optional: Sicherstellen, dass es Mono ist und eine Standard-Samplerate hat
# für bessere Kompatibilität mit ASR/Diarisierung
if audio.channels > 1:
audio = audio.set_channels(1)
if audio.frame_rate != 16000: # Viele Modelle bevorzugen 16kHz
audio = audio.set_frame_rate(16000)
# Temporäre WAV-Datei speichern, da manche Bibliotheken Dateipfade bevorzugen
temp_wav_path = "temp_audio.wav"
audio.export(temp_wav_path, format="wav")
return audio, temp_wav_path
# Authentifizierung mit Hugging Face Token
#pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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")
# Oder wenn als Umgebungsvariable gesetzt:
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization",use_auth_token="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")
def diarize_audio(audio_path):
print("Starte Diarisierung...")
try:
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization") # token wird aus env-var geholt
diarization_result = pipeline(audio_path)
speakers = []
for turn, _, speaker in diarization_result.itertracks(yield_label=True):
speakers.append({
"start": turn.start,
"end": turn.end,
"speaker": speaker
})
print("Diarisierung abgeschlossen.")
return speakers
except Exception as e:
print(f"Fehler bei der Diarisierung: {e}")
print("Stelle sicher, dass dein Hugging Face Token gültig ist und du die Modellbedingungen akzeptiert hast.")
return []
model = whisper.load_model("medium") # Nur einmal laden
def transcribe_audio(audio_path, language="sv"):
print(f"Starte Transkription in Sprache '{language}'...")
try:
# Hier ist die Zeile, die zuvor den Fehler verursacht hat, jetzt mit try-except
model.eval() # Dies sollte jetzt funktionieren, wenn das Modell geladen wurde
result = model.transcribe(audio_path, language=language)
print("Transkription abgeschlossen.")
return result["segments"]
except Exception as e:
print(f"Fehler bei der Transkription: {e}")
return []
def identify_phrases(diarization_segments, transcription_segments, audio_data, output_dir="output_phrases"):
os.makedirs(output_dir, exist_ok=True)
# Automatische Zuordnung der Sprecher-IDs (verbessert)
male_speaker_id = None
female_speaker_id = None
# Einfache Heuristik: Der erste Sprecher, der am längsten spricht, ist der erste Typ
# und der nächste ist der zweite Typ. Annahme: Es gibt genau zwei Sprecher.
speaker_durations = {}
for seg in diarization_segments:
speaker_durations[seg['speaker']] = speaker_durations.get(seg['speaker'], 0) + (seg['end'] - seg['start'])
if len(speaker_durations) >= 2:
# Sortiere nach der Dauer, um die Hauptsprecher zu finden, aber wir brauchen ihre IDs
# Eine einfachere Annahme ist, dass die ersten beiden unterschiedlichen Speaker_IDs
# Männlich und Weiblich sind, basierend auf der Konsistenz deines Audios.
# Finde die ersten beiden unterschiedlichen Sprecher-IDs in der Reihenfolge ihres Auftretens
seen_speakers = []
for seg in diarization_segments:
if seg['speaker'] not in seen_speakers:
seen_speakers.append(seg['speaker'])
if len(seen_speakers) == 2:
break
if len(seen_speakers) == 2:
male_speaker_id = seen_speakers[0] # Annahme: erster ist männlich
female_speaker_id = seen_speakers[1] # Annahme: zweiter ist weiblich
else:
print("Konnte nicht genügend unterschiedliche Sprecher-IDs für die automatische Zuordnung finden.")
return []
else:
print("Nicht genügend Sprecher für die Diarisierung gefunden. Benötige mindestens 2.")
return []
print(f"Zugewiesene Sprecher-IDs: Männlich='{male_speaker_id}', Weiblich='{female_speaker_id}'")
phrases = []
current_male_phrase = None
# Kombiniere Diarisierung und Transkription für eine bessere Analyse
# Erstelle eine Liste von kombinierten Segmenten
combined_segments = []
for d_seg in diarization_segments:
d_start_ms = int(d_seg['start'] * 1000)
d_end_ms = int(d_seg['end'] * 1000)
# Transkriptionen für dieses Diarisierungssegment sammeln
segment_transcription = ""
for t_seg in transcription_segments:
t_start_ms = int(t_seg['start'] * 1000)
t_end_ms = int(t_seg['end'] * 1000)
# Überlappung prüfen (oder t_seg komplett innerhalb von d_seg)
if max(d_start_ms, t_start_ms) < min(d_end_ms, t_end_ms):
segment_transcription += t_seg['text'].strip() + " "
combined_segments.append({
"start": d_start_ms,
"end": d_end_ms,
"speaker": d_seg['speaker'],
"text": segment_transcription.strip()
})
for i, seg in enumerate(combined_segments):
speaker = seg['speaker']
current_text = seg['text']
start_ms = seg['start']
end_ms = seg['end']
if speaker == male_speaker_id:
# Wir haben einen männlichen Part. Speichere ihn als potenziellen Start einer Phrase.
current_male_phrase = {
"start": start_ms,
"end": end_ms,
"text": current_text
}
elif speaker == female_speaker_id and current_male_phrase:
# Jetzt haben wir einen weiblichen Part nach einem männlichen Part.
# Prüfe, ob die weibliche Äußerung die männliche wiederholt und innerhalb einer Zeit toleranz liegt.
# Maximale Lücke zwischen männlicher Äußerung und weiblicher Wiederholung
max_gap_ms = 1500 # 1.5 Sekunden Toleranz für die Lücke
if start_ms - current_male_phrase['end'] <= max_gap_ms:
# Prüfe die Textähnlichkeit
male_text_normalized = current_male_phrase['text'].lower().strip()
female_text_normalized = current_text.lower().strip()
# Verbesserter Textvergleich mit fuzzywuzzy
# fuzz.ratio: Basierend auf der Levenshtein-Distanz
# fuzz.token_set_ratio: Besser für unterschiedliche Reihenfolge oder zusätzliche Wörter
similarity_score = fuzz.token_set_ratio(male_text_normalized, female_text_normalized)
# Schwellenwert für die Ähnlichkeit (anpassbar)
similarity_threshold = 80 # 80% Ähnlichkeit
if similarity_score >= similarity_threshold and len(female_text_normalized) > 0:
# Erfolgreich eine Phrase gefunden!
phrase_start_ms = current_male_phrase['start']
phrase_end_ms = end_ms # Ende der weiblichen Äußerung
phrase_audio_segment = audio_data[phrase_start_ms:phrase_end_ms]
# Dateiname bereinigen
clean_filename = "".join(c for c in female_text_normalized if c.isalnum() or c in " ").strip()
clean_filename = clean_filename.replace(" ", "_").replace("__", "_")[:60] # Max 60 Zeichen
# Optional: Wenn der Dateiname leer ist (z.B. bei Transkriptionsfehler), einen Fallback verwenden
if not clean_filename:
clean_filename = f"unnamed_phrase_{phrase_start_ms}"
output_filename = os.path.join(output_dir, f"{clean_filename}.wav") # Keine Zeitstempel im Namen
# Vermeide doppelte Dateinamen bei sehr ähnlichen Phrasen
counter = 1
base_filename = output_filename
while os.path.exists(output_filename):
output_filename = os.path.join(output_dir, f"{clean_filename}_{counter}.wav")
counter += 1
phrase_audio_segment.export(output_filename, format="wav")
print(
f"Gespeichert: {output_filename} (Männlich: '{current_male_phrase['text']}', Weiblich: '{current_text}', Ähnlichkeit: {similarity_score}%)")
phrases.append({
"male_text": current_male_phrase['text'],
"female_text": current_text,
"start": phrase_start_ms,
"end": phrase_end_ms,
"filename": output_filename
})
current_male_phrase = None # Reset für die nächste Phrase
else:
# Weibliche Äußerung war keine ausreichende Wiederholung oder zu unähnlich
current_male_phrase = None # Reset
else:
# Lücke zwischen männlich und weiblich zu groß
current_male_phrase = None # Reset
else:
# Wenn ein anderer Sprecher (z.B. weiblich) spricht, ohne dass ein männlicher Part vorherging,
# oder der aktuelle Sprecher ist nicht der weibliche Part nach dem männlichen Part,
# dann setzen wir den männlichen Part zurück, da die Sequenz unterbrochen ist.
current_male_phrase = None
return phrases
def main(input_audio_file):
# 1. Audio laden und vorbereiten
audio, temp_wav_path = load_audio(input_audio_file)
# 2. Sprecher-Diarisierung
diarization_segments = diarize_audio(temp_wav_path)
# 3. Spracherkennung
transcription_segments = transcribe_audio(temp_wav_path)
# 4. Phrasen identifizieren und speichern
identified_phrases = identify_phrases(diarization_segments, transcription_segments, audio, "31-50/output_phrases")
# 5. Temporäre Datei aufräumen
os.remove(temp_wav_path)
print(f"Zerlegt in {len(identified_phrases)} Phrasen.")
if __name__ == "__main__":
audio_file = "31-50/31-50.wav" # Ersetze dies durch deinen Dateipfad
main(audio_file)