import whisper from pyannote.audio import Pipeline import torch print(torch.__version__) from fuzzywuzzy import fuzz #print(torch.cuda.is_available()) # True, wenn GPU verfügbar und erkannt # Im Skript, oder besser als Umgebungsvariable import os import certifi os.environ['SSL_CERT_FILE'] = certifi.where() #os.environ["HF_AUTH_TOKEN"] = "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" os.environ["HF_AUTH_TOKEN"] = "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" from pydub import AudioSegment import os def load_audio(file_path): """Lädt eine Audiodatei und konvertiert sie bei Bedarf nach WAV.""" audio = AudioSegment.from_file(file_path) # Optional: Sicherstellen, dass es Mono ist und eine Standard-Samplerate hat # für bessere Kompatibilität mit ASR/Diarisierung if audio.channels > 1: audio = audio.set_channels(1) if audio.frame_rate != 16000: # Viele Modelle bevorzugen 16kHz audio = audio.set_frame_rate(16000) # Temporäre WAV-Datei speichern, da manche Bibliotheken Dateipfade bevorzugen temp_wav_path = "temp_audio.wav" audio.export(temp_wav_path, format="wav") return audio, temp_wav_path # Authentifizierung mit Hugging Face Token #pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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") # Oder wenn als Umgebungsvariable gesetzt: pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization",use_auth_token="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") def diarize_audio(audio_path): print("Starte Diarisierung...") try: pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization") # token wird aus env-var geholt diarization_result = pipeline(audio_path) speakers = [] for turn, _, speaker in diarization_result.itertracks(yield_label=True): speakers.append({ "start": turn.start, "end": turn.end, "speaker": speaker }) print("Diarisierung abgeschlossen.") return speakers except Exception as e: print(f"Fehler bei der Diarisierung: {e}") print("Stelle sicher, dass dein Hugging Face Token gültig ist und du die Modellbedingungen akzeptiert hast.") return [] model = whisper.load_model("medium") # Nur einmal laden def transcribe_audio(audio_path, language="sv"): print(f"Starte Transkription in Sprache '{language}'...") try: # Hier ist die Zeile, die zuvor den Fehler verursacht hat, jetzt mit try-except model.eval() # Dies sollte jetzt funktionieren, wenn das Modell geladen wurde result = model.transcribe(audio_path, language=language) print("Transkription abgeschlossen.") return result["segments"] except Exception as e: print(f"Fehler bei der Transkription: {e}") return [] def identify_phrases(diarization_segments, transcription_segments, audio_data, output_dir="output_phrases"): os.makedirs(output_dir, exist_ok=True) # Automatische Zuordnung der Sprecher-IDs (verbessert) male_speaker_id = None female_speaker_id = None # Einfache Heuristik: Der erste Sprecher, der am längsten spricht, ist der erste Typ # und der nächste ist der zweite Typ. Annahme: Es gibt genau zwei Sprecher. speaker_durations = {} for seg in diarization_segments: speaker_durations[seg['speaker']] = speaker_durations.get(seg['speaker'], 0) + (seg['end'] - seg['start']) if len(speaker_durations) >= 2: # Sortiere nach der Dauer, um die Hauptsprecher zu finden, aber wir brauchen ihre IDs # Eine einfachere Annahme ist, dass die ersten beiden unterschiedlichen Speaker_IDs # Männlich und Weiblich sind, basierend auf der Konsistenz deines Audios. # Finde die ersten beiden unterschiedlichen Sprecher-IDs in der Reihenfolge ihres Auftretens seen_speakers = [] for seg in diarization_segments: if seg['speaker'] not in seen_speakers: seen_speakers.append(seg['speaker']) if len(seen_speakers) == 2: break if len(seen_speakers) == 2: male_speaker_id = seen_speakers[0] # Annahme: erster ist männlich female_speaker_id = seen_speakers[1] # Annahme: zweiter ist weiblich else: print("Konnte nicht genügend unterschiedliche Sprecher-IDs für die automatische Zuordnung finden.") return [] else: print("Nicht genügend Sprecher für die Diarisierung gefunden. Benötige mindestens 2.") return [] print(f"Zugewiesene Sprecher-IDs: Männlich='{male_speaker_id}', Weiblich='{female_speaker_id}'") phrases = [] current_male_phrase = None # Kombiniere Diarisierung und Transkription für eine bessere Analyse # Erstelle eine Liste von kombinierten Segmenten combined_segments = [] for d_seg in diarization_segments: d_start_ms = int(d_seg['start'] * 1000) d_end_ms = int(d_seg['end'] * 1000) # Transkriptionen für dieses Diarisierungssegment sammeln segment_transcription = "" for t_seg in transcription_segments: t_start_ms = int(t_seg['start'] * 1000) t_end_ms = int(t_seg['end'] * 1000) # Überlappung prüfen (oder t_seg komplett innerhalb von d_seg) if max(d_start_ms, t_start_ms) < min(d_end_ms, t_end_ms): segment_transcription += t_seg['text'].strip() + " " combined_segments.append({ "start": d_start_ms, "end": d_end_ms, "speaker": d_seg['speaker'], "text": segment_transcription.strip() }) for i, seg in enumerate(combined_segments): speaker = seg['speaker'] current_text = seg['text'] start_ms = seg['start'] end_ms = seg['end'] if speaker == male_speaker_id: # Wir haben einen männlichen Part. Speichere ihn als potenziellen Start einer Phrase. current_male_phrase = { "start": start_ms, "end": end_ms, "text": current_text } elif speaker == female_speaker_id and current_male_phrase: # Jetzt haben wir einen weiblichen Part nach einem männlichen Part. # Prüfe, ob die weibliche Äußerung die männliche wiederholt und innerhalb einer Zeit toleranz liegt. # Maximale Lücke zwischen männlicher Äußerung und weiblicher Wiederholung max_gap_ms = 1500 # 1.5 Sekunden Toleranz für die Lücke if start_ms - current_male_phrase['end'] <= max_gap_ms: # Prüfe die Textähnlichkeit male_text_normalized = current_male_phrase['text'].lower().strip() female_text_normalized = current_text.lower().strip() # Verbesserter Textvergleich mit fuzzywuzzy # fuzz.ratio: Basierend auf der Levenshtein-Distanz # fuzz.token_set_ratio: Besser für unterschiedliche Reihenfolge oder zusätzliche Wörter similarity_score = fuzz.token_set_ratio(male_text_normalized, female_text_normalized) # Schwellenwert für die Ähnlichkeit (anpassbar) similarity_threshold = 80 # 80% Ähnlichkeit if similarity_score >= similarity_threshold and len(female_text_normalized) > 0: # Erfolgreich eine Phrase gefunden! phrase_start_ms = current_male_phrase['start'] phrase_end_ms = end_ms # Ende der weiblichen Äußerung phrase_audio_segment = audio_data[phrase_start_ms:phrase_end_ms] # Dateiname bereinigen clean_filename = "".join(c for c in female_text_normalized if c.isalnum() or c in " ").strip() clean_filename = clean_filename.replace(" ", "_").replace("__", "_")[:60] # Max 60 Zeichen # Optional: Wenn der Dateiname leer ist (z.B. bei Transkriptionsfehler), einen Fallback verwenden if not clean_filename: clean_filename = f"unnamed_phrase_{phrase_start_ms}" output_filename = os.path.join(output_dir, f"{clean_filename}.wav") # Keine Zeitstempel im Namen # Vermeide doppelte Dateinamen bei sehr ähnlichen Phrasen counter = 1 base_filename = output_filename while os.path.exists(output_filename): output_filename = os.path.join(output_dir, f"{clean_filename}_{counter}.wav") counter += 1 phrase_audio_segment.export(output_filename, format="wav") print( f"Gespeichert: {output_filename} (Männlich: '{current_male_phrase['text']}', Weiblich: '{current_text}', Ähnlichkeit: {similarity_score}%)") phrases.append({ "male_text": current_male_phrase['text'], "female_text": current_text, "start": phrase_start_ms, "end": phrase_end_ms, "filename": output_filename }) current_male_phrase = None # Reset für die nächste Phrase else: # Weibliche Äußerung war keine ausreichende Wiederholung oder zu unähnlich current_male_phrase = None # Reset else: # Lücke zwischen männlich und weiblich zu groß current_male_phrase = None # Reset else: # Wenn ein anderer Sprecher (z.B. weiblich) spricht, ohne dass ein männlicher Part vorherging, # oder der aktuelle Sprecher ist nicht der weibliche Part nach dem männlichen Part, # dann setzen wir den männlichen Part zurück, da die Sequenz unterbrochen ist. current_male_phrase = None return phrases def main(input_audio_file): # 1. Audio laden und vorbereiten audio, temp_wav_path = load_audio(input_audio_file) # 2. Sprecher-Diarisierung diarization_segments = diarize_audio(temp_wav_path) # 3. Spracherkennung transcription_segments = transcribe_audio(temp_wav_path) # 4. Phrasen identifizieren und speichern identified_phrases = identify_phrases(diarization_segments, transcription_segments, audio, "075/") # 5. Temporäre Datei aufräumen os.remove(temp_wav_path) print(f"Zerlegt in {len(identified_phrases)} Phrasen.") if __name__ == "__main__": audio_file = "075/075.wav" # Ersetze dies durch deinen Dateipfad main(audio_file)