ai-content-maker/.venv/Lib/site-packages/httpx/_transports/wsgi.py

147 lines
4.7 KiB
Python

from __future__ import annotations
import io
import itertools
import sys
import typing
from .._models import Request, Response
from .._types import SyncByteStream
from .base import BaseTransport
if typing.TYPE_CHECKING:
from _typeshed import OptExcInfo # pragma: no cover
from _typeshed.wsgi import WSGIApplication # pragma: no cover
_T = typing.TypeVar("_T")
def _skip_leading_empty_chunks(body: typing.Iterable[_T]) -> typing.Iterable[_T]:
body = iter(body)
for chunk in body:
if chunk:
return itertools.chain([chunk], body)
return []
class WSGIByteStream(SyncByteStream):
def __init__(self, result: typing.Iterable[bytes]) -> None:
self._close = getattr(result, "close", None)
self._result = _skip_leading_empty_chunks(result)
def __iter__(self) -> typing.Iterator[bytes]:
for part in self._result:
yield part
def close(self) -> None:
if self._close is not None:
self._close()
class WSGITransport(BaseTransport):
"""
A custom transport that handles sending requests directly to an WSGI app.
The simplest way to use this functionality is to use the `app` argument.
```
client = httpx.Client(app=app)
```
Alternatively, you can setup the transport instance explicitly.
This allows you to include any additional configuration arguments specific
to the WSGITransport class:
```
transport = httpx.WSGITransport(
app=app,
script_name="/submount",
remote_addr="1.2.3.4"
)
client = httpx.Client(transport=transport)
```
Arguments:
* `app` - The WSGI application.
* `raise_app_exceptions` - Boolean indicating if exceptions in the application
should be raised. Default to `True`. Can be set to `False` for use cases
such as testing the content of a client 500 response.
* `script_name` - The root path on which the WSGI application should be mounted.
* `remote_addr` - A string indicating the client IP of incoming requests.
```
"""
def __init__(
self,
app: WSGIApplication,
raise_app_exceptions: bool = True,
script_name: str = "",
remote_addr: str = "127.0.0.1",
wsgi_errors: typing.TextIO | None = None,
) -> None:
self.app = app
self.raise_app_exceptions = raise_app_exceptions
self.script_name = script_name
self.remote_addr = remote_addr
self.wsgi_errors = wsgi_errors
def handle_request(self, request: Request) -> Response:
request.read()
wsgi_input = io.BytesIO(request.content)
port = request.url.port or {"http": 80, "https": 443}[request.url.scheme]
environ = {
"wsgi.version": (1, 0),
"wsgi.url_scheme": request.url.scheme,
"wsgi.input": wsgi_input,
"wsgi.errors": self.wsgi_errors or sys.stderr,
"wsgi.multithread": True,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
"REQUEST_METHOD": request.method,
"SCRIPT_NAME": self.script_name,
"PATH_INFO": request.url.path,
"QUERY_STRING": request.url.query.decode("ascii"),
"SERVER_NAME": request.url.host,
"SERVER_PORT": str(port),
"SERVER_PROTOCOL": "HTTP/1.1",
"REMOTE_ADDR": self.remote_addr,
}
for header_key, header_value in request.headers.raw:
key = header_key.decode("ascii").upper().replace("-", "_")
if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
key = "HTTP_" + key
environ[key] = header_value.decode("ascii")
seen_status = None
seen_response_headers = None
seen_exc_info = None
def start_response(
status: str,
response_headers: list[tuple[str, str]],
exc_info: OptExcInfo | None = None,
) -> typing.Callable[[bytes], typing.Any]:
nonlocal seen_status, seen_response_headers, seen_exc_info
seen_status = status
seen_response_headers = response_headers
seen_exc_info = exc_info
return lambda _: None
result = self.app(environ, start_response)
stream = WSGIByteStream(result)
assert seen_status is not None
assert seen_response_headers is not None
if seen_exc_info and seen_exc_info[0] and self.raise_app_exceptions:
raise seen_exc_info[1]
status_code = int(seen_status.split()[0])
headers = [
(key.encode("ascii"), value.encode("ascii"))
for key, value in seen_response_headers
]
return Response(status_code, headers=headers, stream=stream)