-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
19dd94c
commit a7c295f
Showing
2 changed files
with
5,870 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,73 +1,125 @@ | ||
# !pip install --upgrade google-cloud-speech | ||
# !pip install pydub | ||
from google.cloud import speech | ||
from pydub import AudioSegment | ||
from pathlib import Path as path | ||
from pathlib import Path | ||
from google.cloud import storage | ||
from loguru import logger | ||
from llm_experiments.utils import here | ||
|
||
from llm_experiments.utils import run_command, here | ||
|
||
proj = run_command("!gcloud config set project {name}", name="motorway-genai") | ||
def split_and_convert_audio(file_path, output_format="wav", sample_width=2): | ||
""" | ||
Splits a stereo audio file into mono and converts it to specified format and bit depth. | ||
Args: | ||
file_path (Path): Path to the input audio file. | ||
output_format (str): The desired output format (default is 'wav'). | ||
sample_width (int): The desired sample width in bytes (default is 2 for 16-bit). | ||
# Load the audio file from the provided path | ||
audio_path = here() / "data/audio/S3T06.wav" | ||
Returns: | ||
Path: Path to the converted audio file. | ||
""" | ||
audio = AudioSegment.from_file(file_path) | ||
logger.info(f"Splitting audio to mono from {file_path=}") | ||
channels = audio.split_to_mono() | ||
output_path = ( | ||
here() / f"{file_path.parent / file_path.stem}_isolated_channel.{output_format}" | ||
) | ||
# Export the first channel and set to 16-bit | ||
channels[0].set_sample_width(sample_width).export(output_path, format=output_format) | ||
return output_path | ||
|
||
|
||
def transcribe_gcs(gcs_uri: str) -> str: | ||
"""Asynchronously transcribes the audio file specified by the gcs_uri. | ||
def upload_to_gcs(local_file_path, gcs_uri): | ||
""" | ||
Uploads a file to Google Cloud Storage. | ||
Args: | ||
gcs_uri: The Google Cloud Storage path to an audio file. | ||
Returns: | ||
The generated transcript from the audio file provided. | ||
local_file_path (str): The path to the local file to be uploaded. | ||
gcs_uri (str): The GCS URI where the file will be uploaded, in the format 'gs://bucket_name/path/to/object'. | ||
""" | ||
storage_client = storage.Client() | ||
bucket_name, object_name = gcs_uri.replace("gs://", "").split("/", 1) | ||
bucket = storage_client.bucket(bucket_name) | ||
blob = bucket.blob(object_name) | ||
blob.upload_from_filename(local_file_path) | ||
logger.info(f"File {local_file_path} uploaded to {gcs_uri}.") | ||
|
||
client = speech.SpeechClient() | ||
|
||
def transcribe_gcs(gcs_uri: str) -> str: | ||
""" | ||
Asynchronously transcribes the audio file specified by the gcs_uri. | ||
""" | ||
client = speech.SpeechClient() | ||
audio = speech.RecognitionAudio(uri=gcs_uri) | ||
config = speech.RecognitionConfig( | ||
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, | ||
# sample_rate_hertz=44100, | ||
language_code="en-UK", | ||
) | ||
|
||
operation = client.long_running_recognize(config=config, audio=audio) | ||
|
||
print("Waiting for operation to complete...") | ||
response = operation.result(timeout=180) | ||
|
||
transcript_builder = [] | ||
# Each result is for a consecutive portion of the audio. Iterate through | ||
# them to get the transcripts for the entire audio file. | ||
for result in response.results: | ||
# The first alternative is the most likely one for this portion. | ||
transcript_builder.append(f"\nTranscript: {result.alternatives[0].transcript}") | ||
transcript_builder.append(f"\nConfidence: {result.alternatives[0].confidence}") | ||
|
||
transcript = "".join(transcript_builder) | ||
print(transcript) | ||
|
||
return transcript | ||
|
||
|
||
audio = AudioSegment.from_file(audio_path) | ||
# Check if the first channel has content by splitting the audio into its separate channels | ||
channels = audio.split_to_mono() | ||
# Save the first channel to a separate file | ||
isolated_channel_path = ( | ||
here() / f"data/audio/{path(audio_path).stem}_isolated_channel2.wav" | ||
) | ||
# Now, channels[0] is a mono audio segment of the first channel | ||
# set_sample_width(2) ensures that the audio is 16-bit | ||
channels[0].set_sample_width(2).export(isolated_channel_path, format="wav") | ||
|
||
gcs_out_uri = f"gs://gen-ai-test-playground/audio-files-marketing/{path(isolated_channel_path).name}" | ||
|
||
|
||
print(f"{isolated_channel_path=}") | ||
print(f"{gcs_out_uri=}") | ||
|
||
# upload file to gcs bucket | ||
run_command(f"!gsutil cp {isolated_channel_path} {gcs_out_uri}") | ||
transcript = transcribe_gcs(gcs_out_uri) | ||
logger.info("Waiting for operation to complete...") | ||
response = operation.result(timeout=6000) | ||
|
||
confidence_transcript = "\n".join( | ||
[ | ||
f"Transcript: {result.alternatives[0].transcript}\nConfidence: {result.alternatives[0].confidence}" | ||
for result in response.results | ||
] | ||
) | ||
logger.trace(confidence_transcript) | ||
# Joining all transcripts into a single string | ||
complete_transcript = " ".join( | ||
[result.alternatives[0].transcript for result in response.results] | ||
) | ||
return complete_transcript, confidence_transcript | ||
|
||
|
||
# Main script | ||
if __name__ == "__main__": | ||
# read all files in file_path | ||
# for each file, split into mono and convert to 16-bit wav | ||
for audio_path in here().glob("data/audio/files_to_read/*.wav"): | ||
logger.info(f"Starting {audio_path=}") | ||
converted_audio_path = split_and_convert_audio(audio_path) | ||
logger.info(f"Converted to mono at {converted_audio_path=}") | ||
|
||
gcs_out_uri = f"gs://gen-ai-test-playground/audio-files-marketing/{converted_audio_path.name}" | ||
logger.info(f"Beginning upload to {gcs_out_uri=}") | ||
upload_to_gcs(converted_audio_path, gcs_out_uri) | ||
logger.info(f"Uploaded to {gcs_out_uri=}") | ||
|
||
logger.info(f"Beginning transcription of {gcs_out_uri}") | ||
transcript, _ = transcribe_gcs(gcs_out_uri) | ||
logger.info(f"Transcription complete {transcript[:500]=} {transcript[500:]=}") | ||
logger.info(f"Words: {len(transcript.split())=}") | ||
|
||
out_path = ( | ||
here() | ||
/ audio_path.parent.parent | ||
/ "outs" | ||
/ (audio_path.stem + "_transcript.txt") | ||
) | ||
logger.info(f"Writing to outpath: {out_path=}") | ||
with open( | ||
out_path, | ||
"w", | ||
) as f: | ||
f.write(transcript) | ||
f.close() | ||
# | ||
# audio_path = here() / "data/audio/S6T01.wav" | ||
# converted_audio_path = split_and_convert_audio(audio_path) | ||
# | ||
# gcs_out_uri = ( | ||
# f"gs://gen-ai-test-playground/audio-files-marketing/{converted_audio_path.name}" | ||
# ) | ||
# upload_to_gcs(converted_audio_path, gcs_out_uri) | ||
# | ||
# transcript, _ = transcribe_gcs(gcs_out_uri) | ||
# logger.info(transcript) | ||
# | ||
# # write the transcript to a file | ||
# with open( | ||
# here() / audio_path.parent / (audio_path.stem + "_transcript.txt"), "w" | ||
# ) as f: | ||
# f.write(transcript) | ||
# f.close() |
Oops, something went wrong.