Initial commit
This commit is contained in:
249
mp3/Trennen3.py
Normal file
249
mp3/Trennen3.py
Normal file
@@ -0,0 +1,249 @@
|
||||
import whisper
|
||||
from pyannote.audio import Pipeline
|
||||
import torch
|
||||
print(torch.__version__)
|
||||
from fuzzywuzzy import fuzz
|
||||
#print(torch.cuda.is_available()) # True, wenn GPU verfügbar und erkannt
|
||||
|
||||
# Im Skript, oder besser als Umgebungsvariable
|
||||
import os
|
||||
|
||||
import certifi
|
||||
os.environ['SSL_CERT_FILE'] = certifi.where()
|
||||
|
||||
#os.environ["HF_AUTH_TOKEN"] = "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"
|
||||
os.environ["HF_AUTH_TOKEN"] = "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"
|
||||
|
||||
|
||||
from pydub import AudioSegment
|
||||
import os
|
||||
|
||||
|
||||
def load_audio(file_path):
|
||||
"""Lädt eine Audiodatei und konvertiert sie bei Bedarf nach WAV."""
|
||||
audio = AudioSegment.from_file(file_path)
|
||||
# Optional: Sicherstellen, dass es Mono ist und eine Standard-Samplerate hat
|
||||
# für bessere Kompatibilität mit ASR/Diarisierung
|
||||
if audio.channels > 1:
|
||||
audio = audio.set_channels(1)
|
||||
if audio.frame_rate != 16000: # Viele Modelle bevorzugen 16kHz
|
||||
audio = audio.set_frame_rate(16000)
|
||||
|
||||
# Temporäre WAV-Datei speichern, da manche Bibliotheken Dateipfade bevorzugen
|
||||
temp_wav_path = "temp_audio.wav"
|
||||
audio.export(temp_wav_path, format="wav")
|
||||
return audio, temp_wav_path
|
||||
|
||||
|
||||
|
||||
# Authentifizierung mit Hugging Face Token
|
||||
#pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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")
|
||||
# Oder wenn als Umgebungsvariable gesetzt:
|
||||
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization",use_auth_token="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")
|
||||
|
||||
|
||||
def diarize_audio(audio_path):
|
||||
print("Starte Diarisierung...")
|
||||
try:
|
||||
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization") # token wird aus env-var geholt
|
||||
diarization_result = pipeline(audio_path)
|
||||
speakers = []
|
||||
for turn, _, speaker in diarization_result.itertracks(yield_label=True):
|
||||
speakers.append({
|
||||
"start": turn.start,
|
||||
"end": turn.end,
|
||||
"speaker": speaker
|
||||
})
|
||||
print("Diarisierung abgeschlossen.")
|
||||
return speakers
|
||||
except Exception as e:
|
||||
print(f"Fehler bei der Diarisierung: {e}")
|
||||
print("Stelle sicher, dass dein Hugging Face Token gültig ist und du die Modellbedingungen akzeptiert hast.")
|
||||
return []
|
||||
|
||||
|
||||
model = whisper.load_model("medium") # Nur einmal laden
|
||||
|
||||
|
||||
def transcribe_audio(audio_path, language="sv"):
|
||||
print(f"Starte Transkription in Sprache '{language}'...")
|
||||
try:
|
||||
# Hier ist die Zeile, die zuvor den Fehler verursacht hat, jetzt mit try-except
|
||||
model.eval() # Dies sollte jetzt funktionieren, wenn das Modell geladen wurde
|
||||
result = model.transcribe(audio_path, language=language)
|
||||
print("Transkription abgeschlossen.")
|
||||
return result["segments"]
|
||||
except Exception as e:
|
||||
print(f"Fehler bei der Transkription: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def identify_phrases(diarization_segments, transcription_segments, audio_data, output_dir="output_phrases"):
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Automatische Zuordnung der Sprecher-IDs (verbessert)
|
||||
male_speaker_id = None
|
||||
female_speaker_id = None
|
||||
|
||||
# Einfache Heuristik: Der erste Sprecher, der am längsten spricht, ist der erste Typ
|
||||
# und der nächste ist der zweite Typ. Annahme: Es gibt genau zwei Sprecher.
|
||||
speaker_durations = {}
|
||||
for seg in diarization_segments:
|
||||
speaker_durations[seg['speaker']] = speaker_durations.get(seg['speaker'], 0) + (seg['end'] - seg['start'])
|
||||
|
||||
if len(speaker_durations) >= 2:
|
||||
# Sortiere nach der Dauer, um die Hauptsprecher zu finden, aber wir brauchen ihre IDs
|
||||
# Eine einfachere Annahme ist, dass die ersten beiden unterschiedlichen Speaker_IDs
|
||||
# Männlich und Weiblich sind, basierend auf der Konsistenz deines Audios.
|
||||
|
||||
# Finde die ersten beiden unterschiedlichen Sprecher-IDs in der Reihenfolge ihres Auftretens
|
||||
seen_speakers = []
|
||||
for seg in diarization_segments:
|
||||
if seg['speaker'] not in seen_speakers:
|
||||
seen_speakers.append(seg['speaker'])
|
||||
if len(seen_speakers) == 2:
|
||||
break
|
||||
|
||||
if len(seen_speakers) == 2:
|
||||
male_speaker_id = seen_speakers[0] # Annahme: erster ist männlich
|
||||
female_speaker_id = seen_speakers[1] # Annahme: zweiter ist weiblich
|
||||
else:
|
||||
print("Konnte nicht genügend unterschiedliche Sprecher-IDs für die automatische Zuordnung finden.")
|
||||
return []
|
||||
else:
|
||||
print("Nicht genügend Sprecher für die Diarisierung gefunden. Benötige mindestens 2.")
|
||||
return []
|
||||
|
||||
print(f"Zugewiesene Sprecher-IDs: Männlich='{male_speaker_id}', Weiblich='{female_speaker_id}'")
|
||||
|
||||
phrases = []
|
||||
current_male_phrase = None
|
||||
|
||||
# Kombiniere Diarisierung und Transkription für eine bessere Analyse
|
||||
# Erstelle eine Liste von kombinierten Segmenten
|
||||
combined_segments = []
|
||||
for d_seg in diarization_segments:
|
||||
d_start_ms = int(d_seg['start'] * 1000)
|
||||
d_end_ms = int(d_seg['end'] * 1000)
|
||||
|
||||
# Transkriptionen für dieses Diarisierungssegment sammeln
|
||||
segment_transcription = ""
|
||||
for t_seg in transcription_segments:
|
||||
t_start_ms = int(t_seg['start'] * 1000)
|
||||
t_end_ms = int(t_seg['end'] * 1000)
|
||||
|
||||
# Überlappung prüfen (oder t_seg komplett innerhalb von d_seg)
|
||||
if max(d_start_ms, t_start_ms) < min(d_end_ms, t_end_ms):
|
||||
segment_transcription += t_seg['text'].strip() + " "
|
||||
|
||||
combined_segments.append({
|
||||
"start": d_start_ms,
|
||||
"end": d_end_ms,
|
||||
"speaker": d_seg['speaker'],
|
||||
"text": segment_transcription.strip()
|
||||
})
|
||||
|
||||
for i, seg in enumerate(combined_segments):
|
||||
speaker = seg['speaker']
|
||||
current_text = seg['text']
|
||||
start_ms = seg['start']
|
||||
end_ms = seg['end']
|
||||
|
||||
if speaker == male_speaker_id:
|
||||
# Wir haben einen männlichen Part. Speichere ihn als potenziellen Start einer Phrase.
|
||||
current_male_phrase = {
|
||||
"start": start_ms,
|
||||
"end": end_ms,
|
||||
"text": current_text
|
||||
}
|
||||
elif speaker == female_speaker_id and current_male_phrase:
|
||||
# Jetzt haben wir einen weiblichen Part nach einem männlichen Part.
|
||||
# Prüfe, ob die weibliche Äußerung die männliche wiederholt und innerhalb einer Zeit toleranz liegt.
|
||||
|
||||
# Maximale Lücke zwischen männlicher Äußerung und weiblicher Wiederholung
|
||||
max_gap_ms = 1500 # 1.5 Sekunden Toleranz für die Lücke
|
||||
|
||||
if start_ms - current_male_phrase['end'] <= max_gap_ms:
|
||||
# Prüfe die Textähnlichkeit
|
||||
male_text_normalized = current_male_phrase['text'].lower().strip()
|
||||
female_text_normalized = current_text.lower().strip()
|
||||
|
||||
# Verbesserter Textvergleich mit fuzzywuzzy
|
||||
# fuzz.ratio: Basierend auf der Levenshtein-Distanz
|
||||
# fuzz.token_set_ratio: Besser für unterschiedliche Reihenfolge oder zusätzliche Wörter
|
||||
similarity_score = fuzz.token_set_ratio(male_text_normalized, female_text_normalized)
|
||||
|
||||
# Schwellenwert für die Ähnlichkeit (anpassbar)
|
||||
similarity_threshold = 80 # 80% Ähnlichkeit
|
||||
|
||||
if similarity_score >= similarity_threshold and len(female_text_normalized) > 0:
|
||||
# Erfolgreich eine Phrase gefunden!
|
||||
phrase_start_ms = current_male_phrase['start']
|
||||
phrase_end_ms = end_ms # Ende der weiblichen Äußerung
|
||||
|
||||
phrase_audio_segment = audio_data[phrase_start_ms:phrase_end_ms]
|
||||
|
||||
# Dateiname bereinigen
|
||||
clean_filename = "".join(c for c in female_text_normalized if c.isalnum() or c in " ").strip()
|
||||
clean_filename = clean_filename.replace(" ", "_").replace("__", "_")[:60] # Max 60 Zeichen
|
||||
|
||||
# Optional: Wenn der Dateiname leer ist (z.B. bei Transkriptionsfehler), einen Fallback verwenden
|
||||
if not clean_filename:
|
||||
clean_filename = f"unnamed_phrase_{phrase_start_ms}"
|
||||
|
||||
output_filename = os.path.join(output_dir, f"{clean_filename}.wav") # Keine Zeitstempel im Namen
|
||||
|
||||
# Vermeide doppelte Dateinamen bei sehr ähnlichen Phrasen
|
||||
counter = 1
|
||||
base_filename = output_filename
|
||||
while os.path.exists(output_filename):
|
||||
output_filename = os.path.join(output_dir, f"{clean_filename}_{counter}.wav")
|
||||
counter += 1
|
||||
|
||||
phrase_audio_segment.export(output_filename, format="wav")
|
||||
print(
|
||||
f"Gespeichert: {output_filename} (Männlich: '{current_male_phrase['text']}', Weiblich: '{current_text}', Ähnlichkeit: {similarity_score}%)")
|
||||
|
||||
phrases.append({
|
||||
"male_text": current_male_phrase['text'],
|
||||
"female_text": current_text,
|
||||
"start": phrase_start_ms,
|
||||
"end": phrase_end_ms,
|
||||
"filename": output_filename
|
||||
})
|
||||
current_male_phrase = None # Reset für die nächste Phrase
|
||||
else:
|
||||
# Weibliche Äußerung war keine ausreichende Wiederholung oder zu unähnlich
|
||||
current_male_phrase = None # Reset
|
||||
else:
|
||||
# Lücke zwischen männlich und weiblich zu groß
|
||||
current_male_phrase = None # Reset
|
||||
else:
|
||||
# Wenn ein anderer Sprecher (z.B. weiblich) spricht, ohne dass ein männlicher Part vorherging,
|
||||
# oder der aktuelle Sprecher ist nicht der weibliche Part nach dem männlichen Part,
|
||||
# dann setzen wir den männlichen Part zurück, da die Sequenz unterbrochen ist.
|
||||
current_male_phrase = None
|
||||
|
||||
return phrases
|
||||
|
||||
|
||||
def main(input_audio_file):
|
||||
# 1. Audio laden und vorbereiten
|
||||
audio, temp_wav_path = load_audio(input_audio_file)
|
||||
|
||||
# 2. Sprecher-Diarisierung
|
||||
diarization_segments = diarize_audio(temp_wav_path)
|
||||
|
||||
# 3. Spracherkennung
|
||||
transcription_segments = transcribe_audio(temp_wav_path)
|
||||
|
||||
# 4. Phrasen identifizieren und speichern
|
||||
identified_phrases = identify_phrases(diarization_segments, transcription_segments, audio, "075/")
|
||||
|
||||
# 5. Temporäre Datei aufräumen
|
||||
os.remove(temp_wav_path)
|
||||
print(f"Zerlegt in {len(identified_phrases)} Phrasen.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
audio_file = "075/075.wav" # Ersetze dies durch deinen Dateipfad
|
||||
main(audio_file)
|
||||
Reference in New Issue
Block a user