o
    ȳg`                     @  sR   d dl mZ d dlmZmZmZmZmZ er d dlm	Z	m
Z
mZ G dd dZdS )    )annotations)TYPE_CHECKINGAnyIterableListOptional)	DataFrameRowSparkSessionc                   @  s   e Zd ZdZ						d9d:ddZe	d;d<ddZd=ddZd=ddZd>dd Z	d;d?d"d#Z
d>d$d%Zd@d)d*ZdAd.d/ZdBdCd3d4Zd;d?d5d6ZdBdCd7d8ZdS )DSparkSQLz;SparkSQL is a utility class for interacting with Spark SQL.N   spark_sessionOptional[SparkSession]catalogOptional[str]schemaignore_tablesOptional[List[str]]include_tablessample_rows_in_table_infointc           
      C  s"  zddl m} W n ty   tdw |r|n|j | _|dur)| jj| |dur4| jj| t	| 
 | _|rAt	|nt	 | _| jrX| j| j }|rXtd| d|r^t	|nt	 | _| jru| j| j }|rutd| d|  }	|	rt	|	n| j| _t|tstd|| _dS )	a  Initialize a SparkSQL object.

        Args:
            spark_session: A SparkSession object.
              If not provided, one will be created.
            catalog: The catalog to use.
              If not provided, the default catalog will be used.
            schema: The schema to use.
              If not provided, the default schema will be used.
            ignore_tables: A list of tables to ignore.
              If not provided, all tables will be used.
            include_tables: A list of tables to include.
              If not provided, all tables will be used.
            sample_rows_in_table_info: The number of rows to include in the table info.
              Defaults to 3.
        r   r
   Fpyspark is not installed. Please install it with `pip install pyspark`Nzinclude_tables  not found in databasezignore_tables z,sample_rows_in_table_info must be an integer)pyspark.sqlr
   ImportErrorbuildergetOrCreate_sparkr   setCurrentCatalogsetCurrentDatabaseset_get_all_table_names_all_tables_include_tables
ValueError_ignore_tablesget_usable_table_names_usable_tables
isinstancer   	TypeError_sample_rows_in_table_info)
selfr   r   r   r   r   r   r
   missing_tablesusable_tables r/   c/var/www/html/chatdoc2/venv/lib/python3.10/site-packages/langchain_community/utilities/spark_sql.py__init__   sB   



zSparkSQL.__init__database_uristrengine_argsOptional[dict]kwargsr   returnc                 K  sH   zddl m} W n ty   tdw |j| }| |fi |S )zzCreating a remote Spark Session via Spark connect.
        For example: SparkSQL.from_uri("sc://localhost:15002")
        r   r   r   )r   r
   r   r   remoter   )clsr2   r4   r6   r
   sparkr/   r/   r0   from_uriK   s   zSparkSQL.from_uriIterable[str]c                 C  s   | j r| j S t| j| j S )zGet names of tables available.)r$   sortedr#   r&   )r,   r/   r/   r0   r'   \   s   zSparkSQL.get_usable_table_namesc                 C  s(   | j dd }ttdd |S )NzSHOW TABLES	tableNamec                 S     | j S N)r>   )rowr/   r/   r0   <lambda>e       z/SparkSQL._get_all_table_names.<locals>.<lambda>)r   sqlselectcollectlistmap)r,   rowsr/   r/   r0   r"   c   s   zSparkSQL._get_all_table_namestablec                 C  s6   | j d|  d j}|d}|d | d S )NzSHOW CREATE TABLE r   USING;)r   rD   rF   createtab_stmtfind)r,   rJ   	statementusing_clause_indexr/   r/   r0   _get_create_table_stmtg   s   
zSparkSQL._get_create_table_stmttable_namesc                 C  s   |   }|d urt||}|rtd| d|}g }|D ]"}| |}| jr<|d7 }|d| | d7 }|d7 }|| qd|}|S )Nztable_names r   z

/*
z*/z

)	r'   r!   
differencer%   rQ   r+   _get_sample_spark_rowsappendjoin)r,   rR   all_table_namesr-   tables
table_name
table_info	final_strr/   r/   r0   get_table_infoo   s    

zSparkSQL.get_table_infoc                 C  s   d| d| j  }| j|}dttdd |jj}z| |}ddd |D }W n t	y9   d	}Y nw | j  d
| d| d| S )NzSELECT * FROM z LIMIT 	c                 S  r?   r@   )name)fr/   r/   r0   rB      rC   z1SparkSQL._get_sample_spark_rows.<locals>.<lambda>rS   c                 S  s   g | ]}d  |qS )r^   )rW   ).0rA   r/   r/   r0   
<listcomp>   s    z3SparkSQL._get_sample_spark_rows.<locals>.<listcomp> z rows from z table:
)
r+   r   rD   rW   rG   rH   r   fields_get_dataframe_results	Exception)r,   rJ   querydfcolumns_strsample_rowssample_rows_strr/   r/   r0   rU      s   
zSparkSQL._get_sample_spark_rowsrA   r	   tuplec                 C  s   t tt|  S r@   )rl   rH   r3   asDictvalues)r,   rA   r/   r/   r0   _convert_row_as_tuple   s   zSparkSQL._convert_row_as_tuplerh   r   rG   c                 C  s   t t| j| S r@   )rG   rH   ro   rF   )r,   rh   r/   r/   r0   re      s   zSparkSQL._get_dataframe_resultsallcommandfetchc                 C  s,   | j |}|dkr|d}t| |S )None   )r   rD   limitr3   re   )r,   rq   rr   rh   r/   r/   r0   run   s   
zSparkSQL.runc              
   C  s>   z|  |W S  ty } z	 d| W  Y d}~S d}~ww )af  Get information about specified tables.

        Follows best practices as specified in: Rajkumar et al, 2022
        (https://arxiv.org/abs/2204.00498)

        If `sample_rows_in_table_info`, the specified number of sample rows will be
        appended to each table description. This can increase performance as
        demonstrated in the paper.
        Error: N)r]   r%   )r,   rR   er/   r/   r0   get_table_info_no_throw   s   
z SparkSQL.get_table_info_no_throwc              
   C  s@   z|  ||W S  ty } z	 d| W  Y d}~S d}~ww )a*  Execute a SQL command and return a string representing the results.

        If the statement returns rows, a string of the results is returned.
        If the statement returns no rows, an empty string is returned.

        If the statement throws an error, the error message is returned.
        rw   N)rv   rf   )r,   rq   rr   rx   r/   r/   r0   run_no_throw   s   zSparkSQL.run_no_throw)NNNNNr   )r   r   r   r   r   r   r   r   r   r   r   r   r@   )r2   r3   r4   r5   r6   r   r7   r   )r7   r<   )rJ   r3   r7   r3   )rR   r   r7   r3   )rA   r	   r7   rl   )rh   r   r7   rG   )rp   )rq   r3   rr   r3   r7   r3   )__name__
__module____qualname____doc__r1   classmethodr;   r'   r"   rQ   r]   rU   ro   re   rv   ry   rz   r/   r/   r/   r0   r   	   s,    ?





r   N)
__future__r   typingr   r   r   r   r   r   r   r	   r
   r   r/   r/   r/   r0   <module>   s
    