o
    /iD                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlZddlmZmZ ddlmZmZ e dZG d	d
 d
eZG dd deZdS )    N)suppress)datetime)quote   )AbstractBufferedFileAbstractFileSystem)infer_storage_optionstokenizewebhdfsc                       sH  e Zd ZdZee ZdZ											dA fdd	Z	e
d	d
 Zdd ZdBddZ					dCddZedd Zedd Zedd Zdd Zdd Zdd ZdDdd Zd!d" Zd#d$ Zd%d& ZdEd'd(Zd)d* Zd+d, Zd-d. ZdFd/d0Zd1d2 Z d3d4 Z!dDd5d6Z"d7d8 Z#dDd9d:Z$d;d< Z%d=d> Z&d?d@ Z'  Z(S )GWebHDFSa  
    Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways.

    Four auth mechanisms are supported:

    insecure: no auth is done, and the user is assumed to be whoever they
        say they are (parameter ``user``), or a predefined value such as
        "dr.who" if not given
    spnego: when kerberos authentication is enabled, auth is negotiated by
        requests_kerberos https://github.com/requests/requests-kerberos .
        This establishes a session based on existing kinit login and/or
        specified principal/password; parameters are passed with ``kerb_kwargs``
    token: uses an existing Hadoop delegation token from another secured
        service. Indeed, this client can also generate such tokens when
        not insecure. Note that tokens expire, but can be renewed (by a
        previously specified user) and may allow for proxying.
    basic-auth: used when both parameter ``user`` and parameter ``password``
        are provided.

    )r
   ZwebHDFS  FNTc                    s  | j rdS t jdi | |
rdnd d| d| d| _|| _|p$i | _i | _|	p,i | _|durC|dus:|dur>td|| jd< || _	|| _
|durV|du rUtd	n	|dur_|| jd
< |durh|| jd< |rr|durrtd|| _|| _|   dt|| | _dS )a  
        Parameters
        ----------
        host: str
            Name-node address
        port: int
            Port for webHDFS
        kerberos: bool
            Whether to authenticate with kerberos for this connection
        token: str or None
            If given, use this token on every call to authenticate. A user
            and user-proxy may be encoded in the token and should not be also
            given
        user: str or None
            If given, assert the user name to connect with
        password: str or None
            If given, assert the password to use for basic auth. If password
            is provided, user must be provided also
        proxy_to: str or None
            If given, the user has the authority to proxy, and this value is
            the user in who's name actions are taken
        kerb_kwargs: dict
            Any extra arguments for HTTPKerberosAuth, see
            `<https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py>`_
        data_proxy: dict, callable or None
            If given, map data-node addresses. This can be necessary if the
            HDFS cluster is behind a proxy, running on Docker or otherwise has
            a mismatch between the host-names given by the name-node and the
            address by which to refer to them from the client. If a dict,
            maps host names ``host->data_proxy[host]``; if a callable, full
            URLs are passed, and function must conform to
            ``url->data_proxy(url)``.
        use_https: bool
            Whether to connect to the Name-node using HTTPS instead of HTTP
        session_cert: str or Tuple[str, str] or None
            Path to a certificate file, or tuple of (cert, key) files to use
            for the requests.Session
        session_verify: str, bool or None
            Path to a certificate file to use for verifying the requests.Session.
        kwargs
        Nhttpshttpz://:z/webhdfs/v1z_If passing a delegation token, must not set user or proxy_to, as these are encoded in the tokenZ
delegationzQIf passing a password, the user must also beset in order to set up the basic-authz	user.nameZdoaszJIf using Kerberos auth, do not specify the user, this is handled by kinit.Zwebhdfs_ )_cachedsuper__init__urlkerbkerb_kwargsparsproxy
ValueErroruserpasswordsession_certsession_verify_connectr	   _fsid)selfhostportZkerberostokenr   r   Zproxy_tor   Z
data_proxyZ	use_httpsr   r   kwargs	__class__r   X/home/kim/smarthome/.venv/lib/python3.10/site-packages/fsspec/implementations/webhdfs.pyr   .   sD   9 




zWebHDFS.__init__c                 C   s   | j S N)r   r    r   r   r'   fsid   s   zWebHDFS.fsidc                 C   s   t  | _| jr| j| j_| j| j_| jr%ddlm	} |di | j
| j_| jd ur@| jd urBddlm} || j| j| j_d S d S d S )Nr   )HTTPKerberosAuth)HTTPBasicAuthr   )requestsSessionsessionr   certr   verifyr   Zrequests_kerberosr+   r   authr   r   Zrequests.authr,   )r    r+   r,   r   r   r'   r      s   


zWebHDFS._connectgetc              	   K   s   |d ur	|  |nd}| | jt|dd }| }|| j | |d< t	d|| | j
j| ||||d}	|	jdv rzz|	 }
|
d d	 }|
d d
 }W n ttfy]   Y nw |dv rft||dv rnt||dv rvt|t||	  |	S )N z/=)safeopzsending %s with %s)methodr   paramsdataallow_redirects)i  i  i  i  i  ZRemoteExceptionmessage	exception)ZIllegalArgumentExceptionZUnsupportedOperationException)ZSecurityExceptionZAccessControlException)ZFileNotFoundException)_strip_protocol_apply_proxyr   r   copyupdater   upperloggerdebugr/   requeststatus_codejsonr   KeyErrorPermissionErrorFileNotFoundErrorRuntimeErrorraise_for_status)r    r6   r7   pathr9   redirectr$   r   argsouterrmsgexpr   r   r'   _call   s<   
zWebHDFS._callrbc              
   K   s$   |p| j }t| |||| j|||dS )a^  

        Parameters
        ----------
        path: str
            File location
        mode: str
            'rb', 'wb', etc.
        block_size: int
            Client buffer size for read-ahead or write buffer
        autocommit: bool
            If False, writes to temporary file that only gets put in final
            location upon commit
        replication: int
            Number of copies of file on the cluster, write mode only
        permissions: str or int
            posix permissions, write mode only
        kwargs

        Returns
        -------
        WebHDFile instance
        )mode
block_sizetempdir
autocommitreplicationpermissions)	blocksize	WebHDFilerW   )r    rL   rU   rV   rX   rY   rZ   r$   r   r   r'   _open   s   
!zWebHDFS._openc                 C   s    | d   | d< | d | d< | S )Ntypelengthsize)lower)infor   r   r'   _process_info   s   zWebHDFS._process_infoc                 C   s   t |d S )NrL   )r   )clsrL   r   r   r'   r=      s   zWebHDFS._strip_protocolc                 C   s:   t | }|dd  |dd  d|v r|d|d< |S )NrL   protocolusernamer   )r   pop)ZurlpathrO   r   r   r'   _get_kwargs_from_urls  s   zWebHDFS._get_kwargs_from_urlsc                 C   s,   | j d|d}| d }||d< | |S )NZGETFILESTATUSrL   
FileStatusname)rS   rF   rc   )r    rL   rO   rb   r   r   r'   rb   
  s   
zWebHDFS.infoc                 C   4   |  |}|dd}|durt|d S td)z=Return the created timestamp of a file as a datetime.datetimemodificationTimeN  z5Could not retrieve creation time (modification time).rb   r3   r   fromtimestamprJ   r    rL   rb   mtimer   r   r'   created  s
   
zWebHDFS.createdc                 C   rl   )z>Return the modified timestamp of a file as a datetime.datetimerm   Nrn   z%Could not retrieve modification time.ro   rq   r   r   r'   modified  s
   
zWebHDFS.modifiedc                 K   sr   | j d|d}| d d }|D ]}| | |dd |d  |d< q|r0t|dd	 d
S tdd |D S )NZ
LISTSTATUSri   ZFileStatusesrj   /Z
pathSuffixrk   c                 S   s   | d S )Nrk   r   )ir   r   r'   <lambda>(  s    zWebHDFS.ls.<locals>.<lambda>)keyc                 s   s    | ]}|d  V  qdS )rk   Nr   ).0rb   r   r   r'   	<genexpr>*  s    zWebHDFS.ls.<locals>.<genexpr>)rS   rF   rc   rstripsorted)r    rL   detailr$   rO   infosrb   r   r   r'   ls!  s   
z
WebHDFS.lsc                 C   s   | j d|d}| d S )z8Total numbers of files, directories and bytes under pathZGETCONTENTSUMMARYri   ZContentSummaryrS   rF   )r    rL   rO   r   r   r'   content_summary,  s   zWebHDFS.content_summaryc                 C   s^   | j d|dd}d|jv r%| |jd }| j|}|  | d S |  | d S )z/Checksum info of file, giving method and resultZGETFILECHECKSUMF)rL   rM   LocationZFileChecksum)rS   headersr>   r/   r3   rK   rF   )r    rL   rO   locationout2r   r   r'   ukey1  s   
zWebHDFS.ukeyc                 C   s   |  d}| d S )zGet user's home directoryZGETHOMEDIRECTORYPathr   )r    rO   r   r   r'   home_directory=  s   
zWebHDFS.home_directoryc                 C   sB   |r
| j d|d}n|  d}| d }|du rtd|d S )zRetrieve token which can give the same authority to other uses

        Parameters
        ----------
        renewer: str or None
            User who may use this token; if None, will be current user
        ZGETDELEGATIONTOKEN)renewerTokenNz1No token available for this user/security contextZ	urlString)rS   rF   r   )r    r   rO   tr   r   r'   get_delegation_tokenB  s   
zWebHDFS.get_delegation_tokenc                 C   s   | j dd|d}| d S )z/Make token live longer. Returns new expiry timeZRENEWDELEGATIONTOKENputr7   r#   longr   )r    r#   rO   r   r   r'   renew_delegation_tokenS  s   zWebHDFS.renew_delegation_tokenc                 C   s   | j dd|d dS )z Stop the token from being usefulZCANCELDELEGATIONTOKENr   r   NrS   )r    r#   r   r   r'   cancel_delegation_tokenX  s   zWebHDFS.cancel_delegation_tokenc                 C   s   | j dd||d dS )a  Set the permission at path

        Parameters
        ----------
        path: str
            location to set (file or directory)
        mod: str or int
            posix epresentation or permission, give as oct string, e.g, '777'
            or 0o777
        ZSETPERMISSIONr   )r7   rL   Z
permissionNr   )r    rL   modr   r   r'   chmod\  s   zWebHDFS.chmodc                 C   s>   i }|dur
||d< |dur||d< | j dd|d| dS )zChange owning user and/or groupNownergroupSETOWNERr   r7   rL   )r   r   )r    rL   r   r   r$   r   r   r'   chowni  s   zWebHDFS.chownc                 C   s   | j d|d|d dS )a9  
        Set file replication factor

        Parameters
        ----------
        path: str
            File location (not for directories)
        replication: int
            Number of copies of file on the cluster. Should be smaller than
            number of data nodes; normally 3 on most systems.
        ZSETREPLICATIONr   )rL   r7   rY   Nr   )r    rL   rY   r   r   r'   set_replicationr  s   zWebHDFS.set_replicationc                 K   s   | j dd|d d S )NZMKDIRSr   r   r   r    rL   r$   r   r   r'   mkdir  s   zWebHDFS.mkdirc                 C   s(   |du r|  |rt|| | d S )NF)existsFileExistsErrorr   )r    rL   exist_okr   r   r'   makedirs  s   zWebHDFS.makedirsc                 K   s   | j dd||d d S )NZRENAMEr   )r7   rL   Zdestinationr   )r    Zpath1Zpath2r$   r   r   r'   mv     z
WebHDFS.mvc                 K   s   | j dd||r	dndd d S )NDELETEdeletetruefalse)r7   rL   	recursiver   )r    rL   r   r$   r   r   r'   rm  s   

z
WebHDFS.rmc                 K   s   |  | d S r(   )r   r   r   r   r'   rm_file  s   zWebHDFS.rm_filec                 K   s   |  |_}d| |dtd g}z$|  |d}t|| W d    n1 s.w   Y  | || W n! ty[   t	t
 | | W d     1 sUw   Y   w W d    d S 1 sgw   Y  d S )Nru   z.tmp.   wb)openjoinZ_parentsecretsZ	token_hexshutilcopyfileobjr   BaseExceptionr   rI   r   )r    ZlpathZrpathr$   ZlstreamZ	tmp_fnameZrstreamr   r   r'   cp_file  s"    

"zWebHDFS.cp_filec                 C   sJ   | j rt| j r|  |}|S | j r#| j  D ]\}}|||d}q|S )N   )r   callableitemsreplace)r    r   kvr   r   r'   r>     s   
zWebHDFS._apply_proxy)r   FNNNNNNFNT)r3   NNT)rT   NTNNFr(   )NN))__name__
__module____qualname____doc__strtempfile
gettempdirrW   re   r   propertyr*   r   rS   r]   staticmethodrc   classmethodr=   rh   rb   rs   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r>   __classcell__r   r   r%   r'   r      sh    e

$
-


	


	

r   c                       sJ   e Zd ZdZ fddZdddZdd Zd	d
 Zdd Zdd Z	  Z
S )r\   z"A file living in HDFS over webHDFSc                    s   t  j||fi | | }|dd d u r|dd  |dd d u r+|dd  |dd| _|d}|dddu rQ| j| _tj	|t
t | _d S d S )NrZ   rY   i  rW   rX   F)r   r   r?   r3   rg   rZ   rL   targetosr   r   uuiduuid4)r    fsrL   r$   rW   r%   r   r'   r     s   
zWebHDFile.__init__Fc                 C   s,   | j jj| j| j ddid}|  dS )zWrite one part of a multi-block file upload

        Parameters
        ==========
        final: bool
            This is the last block, so should complete file, if
            self.autocommit is True.
        content-typeapplication/octet-stream)r9   r   T)r   r/   postr   buffergetvaluerK   )r    finalrO   r   r   r'   _upload_chunk  s   	zWebHDFile._upload_chunkc                 C   s   | j  }d| jv rd\}}nd\}}d|d< | jj||| jfddi|}| j|jd }d	| jv r^| jjj	|d
did}|
  | jjdd| jfddi|}| j|jd | _dS dS )zCreate remote file/uploada)APPENDPOST)ZCREATEPUTr   	overwriterM   Fr   wr   r   )r   r   r   N)r$   r?   rU   r   rS   rL   r>   r   r/   r   rK   r   )r    r$   r6   r7   rO   r   r   r   r   r'   _initiate_upload  s   



zWebHDFile._initiate_uploadc                 C   s   t |d}t| j|}||ks|| jkrdS | jjd| j||| dd}|  d|jv r@|jd }| jj	| j
|}|jS |jS )Nr       ZOPENF)rL   offsetr_   rM   r   )maxminr`   r   rS   rL   rK   r   r/   r3   r>   content)r    startendrO   r   r   r   r   r'   _fetch_range  s   


zWebHDFile._fetch_rangec                 C   s   | j | j| j d S r(   )r   r   rL   r   r)   r   r   r'   commit  r   zWebHDFile.commitc                 C   s   | j | j d S r(   )r   r   rL   r)   r   r   r'   discard  s   zWebHDFile.discardr   )r   r   r   r   r   r   r   r   r   r   r   r   r   r%   r'   r\     s    
r\   )loggingr   r   r   r   r   
contextlibr   r   urllib.parser   r-   specr   r   utilsr   r	   	getLoggerrB   r   r\   r   r   r   r'   <module>   s$   
   