o
    ȳgT                  	   @   sH  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddl
mZmZmZmZmZmZmZmZmZmZmZmZ ddlmZmZ ddlmZ ddlmZmZ ddlm Z m!Z!m"Z"m#Z#m$Z$m%Z% erxddl&Z'ddl(m)  m*  m+Z, e-e.Z/d	Z0d
Z1dZ2d+ddZ3G dd dZ4e4 Z5e4 Z6G dd de7e	Z8G dd de!Z9G dd de!Z:G dd dZ;dee7 de<deee7 eej= f fddZ>ddde7fd d!Z?d"ed dee7ddf fd#d$Z@eeAe4f ZBe7ZCG d%d& d&ZDeDZEe7ZFG d'd( d(e9e:eeEeFf ZGee7eef ZHeAZIG d)d* d*e9e:eeHeIf ZJeGZKeJZLeDZMdS ),z*A common module for NVIDIA Riva Runnables.    N)Enum)TYPE_CHECKINGAnyAsyncGeneratorAsyncIteratorDict	GeneratorIteratorListOptionalTupleUnioncast)
AnyMessageBaseMessage)PromptValue)RunnableConfigRunnableSerializable)
AnyHttpUrl	BaseModelFieldparse_obj_asroot_validator	validator      ?i  )
.!?   ¡   ¿returnriva.clientc               
   C   s4   zddl } W | jS  ty } ztd|d}~ww )z5Import the riva client and raise an error on failure.r   NziCould not import the NVIDIA Riva client library. Please install it with `pip install nvidia-riva-client`.)riva.clientImportErrorclient)rivaerr r(   e/var/www/html/chatdoc2/venv/lib/python3.10/site-packages/langchain_community/utilities/nvidia_riva.py_import_riva_client1   s   
r*   c                   @   s   e Zd ZdZdS )	SentinelTzAn empty Sentinel type.N)__name__
__module____qualname____doc__r(   r(   r(   r)   r+   >   s    r+   c                   @   sL   e Zd ZdZdZdZdZdZdZdZ	e
ded	d fd
dZedddZdS )RivaAudioEncodinga  An enum of the possible choices for Riva audio encoding.

    The list of types exposed by the Riva GRPC Protobuf files can be found
    with the following commands:
    ```python
    import riva.client
    print(riva.client.AudioEncoding.keys())  # noqa: T201
    ```
    ALAWENCODING_UNSPECIFIEDFLAC
LINEAR_PCMMULAWOGGOPUSformat_coder!   c              
   C   sB   z| j | j| jd| W S  ty  } ztd| |d}~ww )zReturn the audio encoding specified by the format code in the wave file.

        ref: https://mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
        )         z>The following wave file format code is not supported by Riva: N)r4   r1   r5   KeyErrorNotImplementedError)clsr7   r'   r(   r(   r)   from_wave_format_codeX   s   z'RivaAudioEncoding.from_wave_format_coderiva.client.AudioEncodingc                 C   s   t  }t|j| S )z-Returns the Riva API object for the encoding.)r*   getattrAudioEncodingselfriva_clientr(   r(   r)   riva_pb2f   s   zRivaAudioEncoding.riva_pb2N)r!   r?   )r,   r-   r.   r/   r1   r2   r3   r4   r5   r6   classmethodintr>   propertyrE   r(   r(   r(   r)   r0   F   s    
r0   c                   @   s   e Zd ZU dZeeddddgdZeeef e	d< eddd	Z
ee e	d
< edddZeddddededefddZdS )RivaAuthMixinzBConfiguration for the authentication to a Riva service connection.zhttp://localhost:50051z1The full URL where the Riva service can be found.z"https://user@pass:riva.example.com)descriptionexamplesurlNz@A full path to the file where Riva's public ssl key can be read.rJ   ssl_certr!   riva.client.Authc                 C   sB   t  }tt| j}|jdk}t| jdd }|j| j||dS )z!Return a riva client auth object.https/   )rN   use_ssluri)	r*   r   r   rL   schemestrsplitAuthrN   )rC   rD   rL   rS   url_no_schemer(   r(   r)   authz   s   
zRivaAuthMixin.authT)preallow_reusevalc                 C   s$   t |trtttt|S tt|S )z:Do some initial conversations for the URL before checking.)
isinstancerV   r   r   r   )r=   r]   r(   r(   r)   _validate_url   s   

zRivaAuthMixin._validate_url)r!   rO   )r,   r-   r.   r/   r   r   rL   r   rV   __annotations__rN   r   rH   rZ   r   rF   r   r_   r(   r(   r(   r)   rI   m   s    
 
rI   c                   @   sP   e Zd ZU dZeejddZeed< edddZ	e
ed< edd	dZeed
< dS )RivaCommonConfigMixinz%A collection of common Riva settings.z!The encoding on the audio stream.)defaultrJ   encodingi@  z*The sample rate frequency of audio stream.sample_rate_hertzzen-USzaThe [BCP-47 language code](https://www.rfc-editor.org/rfc/bcp/bcp47.txt) for the target language.language_codeN)r,   r-   r.   r/   r   r0   r4   rc   r`   rd   rG   re   rV   r(   r(   r(   r)   ra      s   
 ra   c                   @   sf   e Zd ZU dZejed< ejed< dddZddd	Z	dd
dZ
defddZdddZdddZdS )_Eventz3A combined event that is threadsafe and async safe._event_aeventr!   Nc                 C   s   t  | _t | _dS )zInitialize the event.N)	threadingEventrg   asynciorh   rC   r(   r(   r)   __init__      
z_Event.__init__c                 C      | j   | j  dS zSet the event.N)rg   setrh   rl   r(   r(   r)   rq      rn   z
_Event.setc                 C   ro   rp   )rg   clearrh   rl   r(   r(   r)   rr      rn   z_Event.clearc                 C   
   | j  S )zIndicate if the event is set.)rg   is_setrl   r(   r(   r)   rt      s   
z_Event.is_setc                 C   s   | j   dS )zWait for the event to be set.N)rg   waitrl   r(   r(   r)   ru      s   z_Event.waitc                    s   | j  I dH  dS )z#Async wait for the event to be set.N)rh   ru   rl   r(   r(   r)   
async_wait   s   z_Event.async_waitr!   N)r,   r-   r.   r/   ri   rj   r`   rk   rm   rq   rr   boolrt   ru   rv   r(   r(   r(   r)   rf      s   
 





rf   output_directorysample_ratec                 C   sr   | r7t jddd| d}|j}W d   n1 sw   Y  t|d}|d |d || ||fS d	S )
zECreate a new wave file and return the wave write object and filename.bxz.wavF)modesuffixdeletedirNwbr8   rR   )NN)tempfileNamedTemporaryFilenamewaveopensetnchannelssetsampwidthsetframerate)ry   rz   fwav_file_namewav_filer(   r(   r)   _mk_wave_file   s   


r   r]   TTSInputTypec                 C   s.   t | tr	|  S t | trt| jS t| S )zAttempt to coerce the input value to a string.

    This is particularly useful for converting LangChain message to strings.
    )r^   r   	to_stringr   rV   contentr]   r(   r(   r)   _coerce_string   s
   


r   inputsc                 c   s    d}| D ]D}t |}tD ]}||v r(||d\}}|| | V  d}||v sq||7 }t|tkrItdt|tD ]}|||d  V  q;d}q|rQ|V  dS dS )z9Filter the input chunks are return strings ready for TTS. r8   r      N)r   _SENTENCE_TERMINATORSrW   len_MAX_TEXT_LENGTHrange)r   bufferchunk
terminatorlast_sentenceidxr(   r(   r)   _process_chunks   s(   
r   c                   @   sZ  e Zd ZU dZejed< ejed< ejed< e	ed< e	ed< e	ed< e
ej ed< d)d
eddfddZdeeddf fddZdee fddZedefddZedefddZedefddZedefddZd*dede
e ddfddZd*dede
e ddfdd Zd*de
e ddfd!d"Zd*de
e ddfd#d$Zd%ed& ddfd'd(ZdS )+AudioStreamz%A message containing streaming audio.	_put_lock_queueoutputhangupuser_talking
user_quiet_workerr   maxsizer!   Nc                 C   sD   t  | _tj|d| _t | _t | _t | _	t | _
d| _dS )zInitialize the queue.)r   N)ri   Lockr   queueQueuer   r   rf   r   r   r   r   )rC   r   r(   r(   r)   rm     s   


zAudioStream.__init__c                 c   sJ    	 z	| j dt}W n
 tjy   Y qw |tkrdS |V  | j   q)zReturn an error.TN)r   get_QUEUE_GET_TIMEOUTr   EmptyHANGUP	task_donerC   next_valr(   r(   r)   __iter__  s   
zAudioStream.__iter__c                 C  sZ   	 zt  d| jjdtI dH }W n
 tjy   Y qw |tkr$dS |V  | j	  q)z4Iterate through all items in the queue until HANGUP.TN)
rk   get_event_looprun_in_executorr   r   r   r   r   r   r   r   r(   r(   r)   	__aiter__&  s   
zAudioStream.__aiter__c                 C   rs   )z(Indicate if the audio stream has hungup.)r   rt   rl   r(   r(   r)   hungup9     
zAudioStream.hungupc                 C   rs   )z-Indicate in the input stream buffer is empty.)r   emptyrl   r(   r(   r)   r   >  r   zAudioStream.emptyc                 C   s4   | j o| j}| jduo| j  o| j }|o|S )z;Indicate if the audio stream has hungup and been processed.N)r   r   r   is_aliver   )rC   
input_doneoutput_doner(   r(   r)   completeC  s   

zAudioStream.completec                 C   s   | j r| j  S dS )z&Indicate if the ASR stream is running.F)r   r   rl   r(   r(   r)   runningN  s   
zAudioStream.runningitemtimeoutc                 C   s\   | j ! | jrtd|tu r| j  | jj||d W d   dS 1 s'w   Y  dS )zPut a new item into the queue.z?The audio stream has already been hungup. Cannot put more data.r   N)r   r   RuntimeErrorr   r   rq   r   put)rC   r   r   r(   r(   r)   r   U  s   
"zAudioStream.putc                    s,   t  }t |d| j||I dH  dS )z$Async put a new item into the queue.N)rk   r   wait_forr   r   )rC   r   r   loopr(   r(   r)   aput`  s   "zAudioStream.aputc                 C   s   |  t| dS )zSend the hangup signal.N)r   r   rC   r   r(   r(   r)   closee  s   zAudioStream.closec                    s   |  t|I dH  dS )zAsync send the hangup signal.N)r   r   r   r(   r(   r)   aclosei     zAudioStream.aclose	responseszrasr.StreamingRecognizeResponsec                    sZ   j rtdtjddd d fdd}tj|d	_d
j_j     dS )zIDrain the responses from the provided iterator and put them into a queue.z,An ASR instance has already been registered.rR   r   r   r!   Nc                     s       D ]<} | jsq| jD ]2}|jsq|jr2j  j  tt	|jd j
}j| qj sAj  j  qqdS )zConsume the ASR Generator.r   N)ru   resultsalternativesis_finalr   rr   r   rq   r   rV   
transcriptr   r   rt   )responseresultr   has_startedr   rC   r(   r)   workert  s$   





z$AudioStream.register.<locals>.worker)targetTrw   )	r   r   ri   BarrierThreadr   daemonstartru   )rC   r   r   r(   r   r)   registerm  s   
zAudioStream.register)r   N) r,   r-   r.   r/   ri   r   r`   r   r   rf   r   r   rG   rm   r   bytesr   r   StreamInputTyper   rH   rx   r   r   r   r   r   r   r   r   r	   r   r(   r(   r(   r)   r      s2   
 




r   c                	   @   s   e Zd ZU dZdZeed< dZeed< edddZ	e
ed	< ed
ddZeed< ed
ddZeed< ed
dedeeef deeef fddZed ddZd!ddZ	d"dedee dedefddZdS )#RivaASRzNA runnable that performs Automatic Speech Recognition (ASR) using NVIDIA Riva.nvidia_riva_asrr   zA Runnable for converting audio bytes to a string.This is useful for feeding an audio stream into a chain andpreprocessing that audio to create an LLM prompt.rJ   r8   z7The number of audio channels in the input audio stream.rM   audio_channel_countTz\Controls whether or not Riva should attempt to filter profanity out of the transcribed text.profanity_filterz]Controls whether Riva should attempt to correct senetence puncuation in the transcribed text.enable_automatic_punctuationr[   valuesr!   c                 C   
   t  }|S z4Validate the Python environment and input arguments.r*   r=   r   _r(   r(   r)   _validate_environment     zRivaASR._validate_environment&riva.client.StreamingRecognitionConfigc                 C   s4   t  }|jd|j| j| j| jd| j| j| jddS )z)Create and return the riva config object.Tr8   )rc   rd   r   max_alternativesr   r   re   )interim_resultsconfig)	r*   StreamingRecognitionConfigRecognitionConfigrc   rd   r   r   r   re   rB   r(   r(   r)   r     s   zRivaASR.configriva.client.ASRServicec              
   C   8   t  }z|| jW S  ty } ztd|d}~ww );Connect to the riva service and return the a client object.z5Error raised while connecting to the Riva ASR server.N)r*   
ASRServicerZ   	Exception
ValueErrorrC   rD   r'   r(   r(   r)   _get_service     zRivaASR._get_serviceNinputr   kwargsc                 K   s   |j s|  }|j|| jd}|| g }|jsl|jj |jjd}W d   n1 s/w   Y  |ri|j	 sZz
||j
 g7 }W n
 tjyO   Y q6w |j  |j	 r;tdt| d| S |jrdS )z3Transcribe the audio bytes into a string with Riva.)audio_chunksstreaming_configg?NzRiva ASR returning: %s r   )r   r   streaming_response_generatorr   r   r   r   	not_emptyru   r   
get_nowaitr   r   r   _LOGGERdebugreprjoinstrip)rC   r   r   r   servicer   full_responsereadyr(   r(   r)   invoke  s2   




zRivaASR.invoke)r!   r   )r!   r   r   )r,   r-   r.   r/   r   rV   r`   rJ   r   r   rG   r   rx   r   r   rF   r   r   r   rH   r   r   ASRInputTyper   r   ASROutputTyper
  r(   r(   r(   r)   r     s@   
 
&
r   c                   @   s0  e Zd ZU dZdZeed< dZeed< edddZ	eed	< ed
ddZ
ee ed< eddedeeef deeef fddZedededefddZd"ddZ	
d#dedee dedefddZ	
d#dee dee dee dee fddZ	
d#dee dee dee deed
f fd d!Zd
S )$RivaTTSz?A runnable that performs Text-to-Speech (TTS) with NVIDIA Riva.nvidia_riva_ttsr   z_A tool for converting text to speech.This is useful for converting LLM output into audio bytes.rJ   zEnglish-US.Female-1zThe voice model in Riva to use for speech. Pre-trained models are documented in [the Riva documentation](https://docs.nvidia.com/deeplearning/riva/user-guide/docs/tts/tts-overview.html).rM   
voice_nameNzThe directory where all audio files should be saved. A null value indicates that wave files should not be saved. This is useful for debugging purposes.ry   Tr   r   r!   c                 C   r   r   r   r   r(   r(   r)   r      r   zRivaTTS._validate_environmentvc                 C   s,   |rt |}|jddd t| S |S )NT)parentsexist_ok)pathlibPathmkdirrV   absolute)r=   r  dirpathr(   r(   r)   _output_directory_validator'  s
   
z#RivaTTS._output_directory_validator"riva.client.SpeechSynthesisServicec              
   C   r   )r   z5Error raised while connecting to the Riva TTS server.N)r*   SpeechSynthesisServicerZ   r   r   r   r(   r(   r)   r   0  r   zRivaTTS._get_servicer   r   r   c                 K   s   d | t|gS )zDPerform TTS by taking a string and outputting the entire audio file.    )r  	transformiter)rC   r   r   r   r(   r(   r)   r
  :  s   zRivaTTS.invokec                 k   s    |   }t| j| j\}}t|D ],}td| |j|| j| j	| j
j| jd}|D ]}	tt|	j}
|r:||
 |
V  q+q|rM|  td| dS dS )zHPerform TTS by taking a stream of characters and streaming output bytes.zRiva TTS chunk: %s)textr  re   rc   sample_rate_hzzRiva TTS wrote file: %sN)r   r   ry   rd   r   r  r  synthesize_onliner  re   rc   rE   r   r   audiowriteframesrawr   )rC   r   r   r   r  r   r   r   r   respr!  r(   r(   r)   r  C  s0   	
zRivaTTS.transformc           	        s   t  t t  dfdd}dtt ffdddfdd d fd	d
}| }| }	 zt  dI dH }W n t j	j
yV   Y q=w   |tu r`n|V  q>|I dH  |I dH  dS )zGIntercept async transforms and route them to the synchronous transform.r!   Nc                     s.    2 z3 dH W }  |  q6  t dS )z#Produce input into the input queue.N)
put_nowait_TRANSFORM_ENDr   )r   input_queuer(   r)   	_produceru  s
   z%RivaTTS.atransform.<locals>._producerc                  3   s>    	 z j dd} W n
 tjy   Y qw | tkrdS | V  q)zIterate over the input_queue.Tr   r   N)r   r   r   r%  r   )r&  r(   r)   _input_iterator{  s   z+RivaTTS.atransform.<locals>._input_iteratorc                     s*      D ]} |  qt dS )z!Consume the input with transform.N)r  r$  r%  r   )r(  	out_queuerC   r(   r)   	_consumer  s   z%RivaTTS.atransform.<locals>._consumerc                      s    d I dH  dS )z"Coroutine that wraps the consumer.N)r   r(   )r*  r   r(   r)   _consumer_coro  r   z*RivaTTS.atransform.<locals>._consumer_coroTr   rw   )rk   get_running_loopr   r   r	   r   create_taskr   r   
exceptionsTimeoutErrorr   r%  )	rC   r   r   r   r'  r+  producerconsumerr]   r(   )r*  r(  r   r&  r   r)  rC   r)   
atransformj  s.   
zRivaTTS.atransform)r!   r  r   )r,   r-   r.   r/   r   rV   r`   rJ   r   r  ry   r   r   rF   r   r   r   r   r  r   r   r   TTSOutputTyper
  r	   r  r   r   r2  r(   r(   r(   r)   r     sd   
 
		&


*
r  )r!   r"   )Nr/   rk   loggingr  r   r   ri   r   enumr   typingr   r   r   r   r   r   r	   r
   r   r   r   r   langchain_core.messagesr   r   langchain_core.prompt_valuesr   langchain_core.runnablesr   r   pydanticr   r   r   r   r   r   r#   r&   riva.client.proto.riva_asr_pb2r%   protoriva_asr_pb2rasr	getLoggerr,   r  r   r   r   r*   r+   r   r%  rV   r0   rI   ra   rf   float
Wave_writer   r   r   r   r   StreamOutputTyper   r  r  r   r   r3  r  NVIDIARivaASRNVIDIARivaTTSNVIDIARivaStreamr(   r(   r(   r)   <module>   sx    8 	

'!"
  

h

 $