Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added the aws and gcp connectors #187

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .env-dev.template
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ AZURE_SPEECH_REGION=
AZURE_TRANSLATION_KEY=
AZURE_TRANSLATION_RESOURCE_LOCATION=

# AWS Speech/Translation Keys
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=

# Storage
# -------
# Storage Configuation (Default is local)
Expand All @@ -38,6 +43,12 @@ PUBLIC_URL_PREFIX= # Set Tunnel URL if using local storage
# AZURE_STORAGE_ACCOUNT_KEY=
# AZURE_STORAGE_CONTAINER=

# Create the S3 bucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can u please change this sentence to "Set S3 Bucket name when using AWS speech services" in both .env.template and .env-dev.template files? Also you can move this S3_BUCKET_NAME to below AWS Speech/Translation Keys as they are only used there and nowhere else.

S3_BUCKET_NAME=

# Set GCP credentials (json)
GOOGLE_APPLICATION_CREDENTIALS=

# Encryption key for storing credentials
ENCRYPTION_KEY=

Expand Down
11 changes: 11 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ AZURE_SPEECH_REGION=
AZURE_TRANSLATION_KEY=
AZURE_TRANSLATION_RESOURCE_LOCATION=

# AWS Speech/Translation Keys
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=

# Storage
# -------
# Set Azure storage keys if using Azure Blob Storage
Expand All @@ -39,6 +44,12 @@ AZURE_STORAGE_ACCOUNT_URL=
AZURE_STORAGE_ACCOUNT_KEY=
AZURE_STORAGE_CONTAINER=

# Create the S3 bucket
S3_BUCKET_NAME=

# Set GCP credentials (json)
GOOGLE_APPLICATION_CREDENTIALS=

# Encryption key for storing credentials
ENCRYPTION_KEY=

Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ services:
- AZURE_STORAGE_ACCOUNT_KEY=${AZURE_STORAGE_ACCOUNT_KEY}
- AZURE_STORAGE_CONTAINER=${AZURE_STORAGE_CONTAINER}
- PUBLIC_URL_PREFIX=${PUBLIC_URL_PREFIX}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
Copy link
Contributor

@KaranrajM KaranrajM Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the three AWS speech/translation credentials along S3_BUCKET_NAME up to above STORAGE_TYPE=${STORAGE_TYPE} line?

- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
- S3_BUCKET_NAME=${S3_BUCKET_NAME}
- GOOGLE_APPLICATION_CREDENTIALS=${GOOGLE_APPLICATION_CREDENTIALS}
Copy link
Contributor

@KaranrajM KaranrajM Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to mount GOOGLE_APPLICATION_CREDENTIALS json file during runtime as local files cannot be accessed in a docker env without embedding them in docker image or mounting them during runtime. As the GCP creds file is highly sensitive, we can safely mount them during runtime. We can try something like this

environment:
- GOOGLE_APPLICATION_CREDENTIALS=/app/credentials/google-credentials.json
volumes:
- ${GOOGLE_APPLICATION_CREDENTIALS}:/app/credentials/google-credentials.json:ro

Let me know what you think and any other approach is also welcome. Once this change is made, please test them after building the language image and running the services through it.

depends_on:
- kafka
- postgres
Expand Down
849 changes: 637 additions & 212 deletions language/poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions language/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ azure-cognitiveservices-speech = "^1.38.0"
httpx = "^0.27.0"
aiohttp = "^3.10.0"
pydub = "^0.25.1"
boto3 = "^1.35.3"
google-cloud-speech = "^2.27.0"
google-cloud-texttospeech = "^2.17.1"
google-cloud-translate = "^3.16.0"
KaranrajM marked this conversation as resolved.
Show resolved Hide resolved

[tool.poetry.group.dev.dependencies]
lib = {path = "../jb-lib", develop = true}
Expand Down
8 changes: 5 additions & 3 deletions language/src/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
AzureSpeechProcessor,
CompositeSpeechProcessor,
DhruvaSpeechProcessor,
AWSSpeechProcessor,
GCPSpeechProcessor,
)
from .translator import AzureTranslator, CompositeTranslator, DhruvaTranslator
from .translator import AzureTranslator, CompositeTranslator, DhruvaTranslator, AWSTranslator, GCPTranslator

# ---- Speech Processor ----
speech_processor = CompositeSpeechProcessor(
DhruvaSpeechProcessor(), AzureSpeechProcessor()
DhruvaSpeechProcessor(), AzureSpeechProcessor(), AWSSpeechProcessor(), GCPSpeechProcessor()
)

# ---- Translator ----
translator = CompositeTranslator(DhruvaTranslator(), AzureTranslator())
translator = CompositeTranslator(DhruvaTranslator(), AzureTranslator(), AWSTranslator(), GCPTranslator())

# ---- Storage ----
storage = StorageHandler.get_async_instance()
253 changes: 225 additions & 28 deletions language/src/speech_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@

import azure.cognitiveservices.speech as speechsdk
import httpx
import boto3
from google.cloud import speech_v1p1beta1 as speech
from google.cloud import texttospeech
import requests
from botocore.exceptions import BotoCoreError, ClientError
import asyncio
import requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove one requests import as there are two present.


from lib.model import InternalServerException, LanguageCodes
from .audio_converter import convert_wav_bytes_to_mp3_bytes
Expand Down Expand Up @@ -391,27 +398,198 @@ async def text_to_speech(
# )
return new_audio_content

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new line here as whenever a new class is present, two new lines should be there.

class AWSSpeechProcessor(SpeechProcessor):
def __init__(self):
# Set AWS credentials using environment variables
os.environ['AWS_ACCESS_KEY_ID'] = os.getenv('AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = os.getenv('AWS_SECRET_ACCESS_KEY')
os.environ['AWS_DEFAULT_REGION'] = os.getenv('AWS_DEFAULT_REGION')

self.transcribe = boto3.client('transcribe')
self.s3 = boto3.client('s3')
self.polly = boto3.client('polly')
self.bucket_name = os.get_env('S3_BUCKET_NAME')

self.language_dict = {
"EN": "en-US",
"HI": "hi-IN",
"BN": "bn-IN",
"GU": "gu-IN",
"MR": "mr-IN",
"KN": "kn-IN",
"LU": "lg-IN",
"EN-IN": "en-IN",
"MA": "ml-IN",
"OD": "or-IN",
"PA": "pa-IN",
"TA": "ta-IN",
"TE": "te-IN",
}
KaranrajM marked this conversation as resolved.
Show resolved Hide resolved

async def speech_to_text(
self,
wav_data: bytes,
input_language: LanguageCodes,
) -> str:
logger.info("Performing speech to text using AWS Transcribe")
logger.info(f"Input Language: {input_language.name}")

try:
# Upload the audio data to S3
file_name = f"temp_audio_{input_language.name}.wav"
self.s3.put_object(Bucket=self.bucket_name, Key=file_name, Body=wav_data)

# Generate the S3 URI
job_uri = f's3://{self.bucket_name}/{file_name}'

# Start transcription job
job_name = f"transcription_job_{input_language.name}"
self.transcribe.start_transcription_job(
TranscriptionJobName=job_name,
Media={'MediaFileUri': job_uri},
MediaFormat='wav',
LanguageCode=self.language_dict.get(input_language.name, 'en-US')
)

# Wait for the job to complete
while True:
status = self.transcribe.get_transcription_job(TranscriptionJobName=job_name)
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
break
await asyncio.sleep(5) # Wait for 5 seconds before checking again

if status['TranscriptionJob']['TranscriptionJobStatus'] == 'COMPLETED':
file_url = status['TranscriptionJob']['Transcript']['TranscriptFileUri']
response = requests.get(file_url)
data = response.json()
transcript = data['results']['transcripts'][0]['transcript']

# Clean up: delete the temporary audio file from S3
self.s3.delete_object(Bucket=self.bucket_name, Key=file_name)

return transcript
else:
raise Exception("Transcription job failed")

except (BotoCoreError, ClientError) as error:
error_message = f"AWS STT Request failed with this error: {error}"
logger.error(error_message)
raise InternalServerException(error_message)

async def text_to_speech(
self,
text: str,
input_language: LanguageCodes,
) -> bytes:
logger.info("Performing text to speech using AWS Polly")
logger.info(f"Input Language: {input_language.name}")
logger.info(f"Input Text: {text}")

try:
response = self.polly.synthesize_speech(
Text=text,
OutputFormat='mp3',
VoiceId='Joanna', # You might want to choose appropriate voices for different languages
LanguageCode=self.language_dict.get(input_language.name, 'en-US')
)

return response['AudioStream'].read()

except (BotoCoreError, ClientError) as error:
error_message = f"AWS TTS Request failed with this error: {error}"
logger.error(error_message)
raise InternalServerException(error_message)


class GCPSpeechProcessor(SpeechProcessor):
def __init__(self):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
self.speech_client = speech.SpeechClient()
self.tts_client = texttospeech.TextToSpeechClient()
self.language_dict = {
"EN": "en-US",
"HI": "hi-IN",
"BN": "bn-IN",
"GU": "gu-IN",
"MR": "mr-IN",
"OR": "or-IN",
"PA": "pa-Guru-IN",
"KN": "kn-IN",
"ML": "ml-IN",
"TA": "ta-IN",
"TE": "te-IN",
"UR": "ur-IN",
"EN-IN": "en-IN",
}

async def speech_to_text(
self,
wav_data: bytes,
input_language: LanguageCodes,
) -> str:
logger.info("Performing speech to text using Google Cloud Platform")
logger.info(f"Input Language: {input_language.name}")

audio = speech.RecognitionAudio(content=wav_data)
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
language_code=self.language_dict.get(input_language.name, "en-US"),
)

try:
response = self.speech_client.recognize(config=config, audio=audio)
transcribed_text = response.results[0].alternatives[0].transcript
return_message = "GCP speech to text is successful"
logger.info(return_message)
logger.info(f"Transcribed text: {transcribed_text}")
return transcribed_text
except Exception as exception:
error_message = f"GCP STT Request failed with this error: {exception}"
logger.error(error_message)
raise InternalServerException(error_message)

async def text_to_speech(
self,
text: str,
input_language: LanguageCodes,
) -> bytes:
logger.info("Performing text to speech using Google Cloud Platform")
logger.info(f"Input Language: {input_language.name}")
logger.info(f"Input Text: {text}")

synthesis_input = texttospeech.SynthesisInput(text=text)

voice = texttospeech.VoiceSelectionParams(
language_code=self.language_dict.get(input_language.name, "en-US"),
ssml_gender=texttospeech.SsmlVoiceGender.FEMALE,
)

audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)

try:
response = self.tts_client.synthesize_speech(
input=synthesis_input, voice=voice, audio_config=audio_config
)
return_message = "GCP text to speech is successful"
logger.info(return_message)
return response.audio_content
except Exception as exception:
error_message = f"GCP TTS Request failed with this error: {exception}"
logger.error(error_message)
raise InternalServerException(error_message)


class CompositeSpeechProcessor(SpeechProcessor):
def __init__(self, *speech_processors: SpeechProcessor):
self.speech_processors = speech_processors
self.european_language_codes = [
"EN",
"AF",
"AR",
"ZH",
"FR",
"DE",
"ID",
"IT",
"JA",
"KO",
"PT",
"RU",
"ES",
"TR",
"EN", "AF", "AR", "ZH", "FR", "DE", "ID", "IT", "JA", "KO", "PT", "RU", "ES", "TR"
]
self.azure_not_supported_language_codes = ["OR", "PA"]
self.gcp_not_supported_language_codes = [] # Add any unsupported languages for GCP
self.aws_not_supported_language_codes = [] # Add any unsupported languages for AWS

async def speech_to_text(
self,
Expand All @@ -420,21 +598,30 @@ async def speech_to_text(
) -> str:
excs = []
for speech_processor in self.speech_processors:
# try:
if input_language.name in self.european_language_codes and isinstance(
speech_processor, DhruvaSpeechProcessor
):
pass
continue
elif (
input_language.name in self.azure_not_supported_language_codes
and isinstance(speech_processor, AzureSpeechProcessor)
):
pass
continue
elif (
input_language.name in self.gcp_not_supported_language_codes
and isinstance(speech_processor, GCPSpeechProcessor)
):
continue
elif (
input_language.name in self.aws_not_supported_language_codes
and isinstance(speech_processor, AWSSpeechProcessor)
):
continue
else:
return await speech_processor.speech_to_text(wav_data, input_language)
# except Exception as exc:
# print("EXCEPTION", exc)
# excs.append(exc)
try:
return await speech_processor.speech_to_text(wav_data, input_language)
except Exception as exc:
excs.append(exc)

raise ExceptionGroup("CompositeSpeechProcessor speech to text failed", excs)

Expand All @@ -445,19 +632,29 @@ async def text_to_speech(
) -> bytes:
excs = []
for speech_processor in self.speech_processors:
# try:
if input_language.name in self.european_language_codes and isinstance(
speech_processor, DhruvaSpeechProcessor
):
pass
continue
elif (
input_language.name in self.azure_not_supported_language_codes
and isinstance(speech_processor, AzureSpeechProcessor)
):
pass
continue
elif (
input_language.name in self.gcp_not_supported_language_codes
and isinstance(speech_processor, GCPSpeechProcessor)
):
continue
elif (
input_language.name in self.aws_not_supported_language_codes
and isinstance(speech_processor, AWSSpeechProcessor)
):
continue
else:
return await speech_processor.text_to_speech(text, input_language)
# except Exception as exc:
# excs.append(exc)
try:
return await speech_processor.text_to_speech(text, input_language)
except Exception as exc:
excs.append(exc)

# raise ExceptionGroup("CompositeSpeechProcessor text to speech failed", excs)
raise ExceptionGroup("CompositeSpeechProcessor text to speech failed", excs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a new line at the end of the file?

Loading