summaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/~ip/_internal/network/xmlrpc.py
diff options
context:
space:
mode:
authorblackhao <13851610112@163.com>2025-08-22 02:51:50 -0500
committerblackhao <13851610112@163.com>2025-08-22 02:51:50 -0500
commit4aab4087dc97906d0b9890035401175cdaab32d4 (patch)
tree4e2e9d88a711ec5b1cfa02e8ac72a55183b99123 /.venv/lib/python3.12/site-packages/~ip/_internal/network/xmlrpc.py
parentafa8f50d1d21c721dabcb31ad244610946ab65a3 (diff)
2.0
Diffstat (limited to '.venv/lib/python3.12/site-packages/~ip/_internal/network/xmlrpc.py')
-rw-r--r--.venv/lib/python3.12/site-packages/~ip/_internal/network/xmlrpc.py62
1 files changed, 62 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/~ip/_internal/network/xmlrpc.py b/.venv/lib/python3.12/site-packages/~ip/_internal/network/xmlrpc.py
new file mode 100644
index 0000000..22ec8d2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/~ip/_internal/network/xmlrpc.py
@@ -0,0 +1,62 @@
+"""xmlrpclib.Transport implementation
+"""
+
+import logging
+import urllib.parse
+import xmlrpc.client
+from typing import TYPE_CHECKING, Tuple
+
+from pip._internal.exceptions import NetworkConnectionError
+from pip._internal.network.session import PipSession
+from pip._internal.network.utils import raise_for_status
+
+if TYPE_CHECKING:
+ from xmlrpc.client import _HostType, _Marshallable
+
+ from _typeshed import SizedBuffer
+
+logger = logging.getLogger(__name__)
+
+
+class PipXmlrpcTransport(xmlrpc.client.Transport):
+ """Provide a `xmlrpclib.Transport` implementation via a `PipSession`
+ object.
+ """
+
+ def __init__(
+ self, index_url: str, session: PipSession, use_datetime: bool = False
+ ) -> None:
+ super().__init__(use_datetime)
+ index_parts = urllib.parse.urlparse(index_url)
+ self._scheme = index_parts.scheme
+ self._session = session
+
+ def request(
+ self,
+ host: "_HostType",
+ handler: str,
+ request_body: "SizedBuffer",
+ verbose: bool = False,
+ ) -> Tuple["_Marshallable", ...]:
+ assert isinstance(host, str)
+ parts = (self._scheme, host, handler, None, None, None)
+ url = urllib.parse.urlunparse(parts)
+ try:
+ headers = {"Content-Type": "text/xml"}
+ response = self._session.post(
+ url,
+ data=request_body,
+ headers=headers,
+ stream=True,
+ )
+ raise_for_status(response)
+ self.verbose = verbose
+ return self.parse_response(response.raw)
+ except NetworkConnectionError as exc:
+ assert exc.response
+ logger.critical(
+ "HTTP error %s while getting %s",
+ exc.response.status_code,
+ url,
+ )
+ raise