"""ASGI-Tools includes a `asgi_tools.Request` class that gives you a nicer interface onto the
incoming request.
"""
from __future__ import annotations
from http import cookies
from typing import TYPE_CHECKING, Any, AsyncGenerator, Callable, Iterator, Optional, Union
from yarl import URL
from ._compat import json_loads
from .constants import DEFAULT_CHARSET
from .errors import ASGIDecodeError
from .forms import read_formdata
from .types import TJSON, TASGIReceive, TASGIScope, TASGISend
from .utils import CIMultiDict, parse_headers, parse_options_header
if TYPE_CHECKING:
from multidict import MultiDict
[docs]
class Request(TASGIScope):
"""Represent a HTTP Request.
:param scope: HTTP ASGI Scope
:param receive: an asynchronous callable which lets the application
receive event messages from the client
:param send: an asynchronous callable which lets the application
send event messages to the client
"""
__slots__ = (
"scope",
"receive",
"send",
"_is_read",
"_url",
"_body",
"_form",
"_headers",
"_media",
"_cookies",
)
def __init__(self, scope: TASGIScope, receive: TASGIReceive, send: TASGISend):
"""Create a request based on the given scope."""
self.scope = scope
self.receive = receive
self.send = send
self._is_read: bool = False
self._url: Optional[URL] = None
self._body: Optional[bytes] = None
self._form: Optional[MultiDict] = None
self._headers: Optional[CIMultiDict] = None
self._media: Optional[dict[str, str]] = None
self._cookies: Optional[dict[str, str]] = None
def __str__(self) -> str:
"""Return the request's params."""
scope_type = self.scope["type"]
if scope_type == "websocket":
return f"{scope_type} {self.path}"
return f"{scope_type} {self.method} {self.url.path}"
def __repr__(self):
"""Represent the request."""
return f"<Request {self}>"
def __getitem__(self, key: str) -> Any:
"""Proxy the method to the scope."""
return self.scope[key]
def __setitem__(self, key: str, value: Any) -> None:
"""Proxy the method to the scope."""
self.scope[key] = value
def __delitem__(self, key: str) -> None:
"""Proxy the method to the scope."""
del self.scope[key]
def __iter__(self) -> Iterator[str]:
"""Proxy the method to the scope."""
return iter(self.scope)
def __len__(self) -> int:
"""Proxy the method to the scope."""
return len(self.scope)
def __getattr__(self, name: str) -> Any:
"""Proxy the request's unknown attributes to scope."""
return self.scope[name]
def __copy__(self, **mutations) -> Request:
"""Copy the request to a new one."""
return Request(dict(self.scope, **mutations), self.receive, self.send)
@property
def url(self) -> URL:
"""A lazy property that parses the current URL and returns :class:`yarl.URL` object.
.. code-block:: python
request = Request(scope)
assert str(request.url) == '... the full http URL ..'
assert request.url.scheme
assert request.url.host
assert request.url.query is not None
assert request.url.query_string is not None
See :py:mod:`yarl` documentation for further reference.
"""
if self._url is None:
scope = self.scope
host = self.headers.get("host")
if host is None:
if "server" in scope:
host, port = scope["server"]
if port:
host = f"{host}:{port}"
else:
host = "localhost"
self._url = URL.build(
host=host,
scheme=scope.get("scheme", "http"),
encoded=True,
path=f"{ scope.get('root_path', '') }{ scope['path'] }",
query_string=scope["query_string"].decode(encoding="ascii"),
)
return self._url
@property
def headers(self) -> CIMultiDict:
"""A lazy property that parses the current scope's headers, decodes them as strings and
returns case-insensitive multi-dict :py:class:`multidict.CIMultiDict`.
.. code-block:: python
request = Request(scope)
assert request.headers['content-type']
assert request.headers['authorization']
See :py:mod:`multidict` documentation for futher reference.
"""
if self._headers is None:
self._headers = parse_headers(self.scope["headers"])
return self._headers
@property
def cookies(self) -> dict[str, str]:
"""A lazy property that parses the current scope's cookies and returns a dictionary.
.. code-block:: python
request = Request(scope)
ses = request.cookies.get('session')
"""
if self._cookies is None:
self._cookies = {}
cookie = self.headers.get("cookie")
if cookie:
for chunk in cookie.split(";"):
key, _, val = chunk.partition("=")
self._cookies[key.strip()] = cookies._unquote(val.strip())
return self._cookies
@property
def media(self) -> dict[str, str]:
"""Prepare a media data for the request."""
if self._media is None:
conten_type_header = self.headers.get("content-type", "")
content_type, opts = parse_options_header(conten_type_header)
self._media = dict(opts, content_type=content_type)
return self._media
@property
def charset(self) -> str:
"""Get an encoding charset for the current scope."""
return self.media.get("charset", DEFAULT_CHARSET)
@property
def query(self) -> MultiDict:
"""A lazy property that parse the current query string and returns it as a
:py:class:`multidict.MultiDict`.
"""
return self.url.query
@property
def content_type(self) -> str:
"""Get a content type for the current scope."""
return self.media["content_type"]
[docs]
async def stream(self) -> AsyncGenerator:
"""Stream the request's body.
The method provides byte chunks without storing the entire body to memory.
Any subsequent calls to :py:meth:`body`, :py:meth:`form`, :py:meth:`json`
or :py:meth:`data` will raise an error.
.. warning::
You can only read stream once. Second call raises an error. Save a readed stream into a
variable if you need.
"""
if self._is_read:
if self._body is None:
raise RuntimeError("Stream has been read") # noqa: TRY003
yield self._body
else:
self._is_read = True
message = await self.receive()
yield message.get("body", b"")
while message.get("more_body"):
message = await self.receive()
yield message.get("body", b"")
[docs]
async def body(self) -> bytes:
"""Read and return the request's body as bytes.
`body = await request.body()`
"""
if self._body is None:
self._body = b"".join([chunk async for chunk in self.stream()])
return self._body
[docs]
async def text(self) -> str:
"""Read and return the request's body as a string.
`text = await request.text()`
"""
body = await self.body()
try:
return body.decode(self.charset or DEFAULT_CHARSET)
except (LookupError, ValueError) as exc:
raise ASGIDecodeError from exc
[docs]
async def json(self) -> TJSON:
"""Read and return the request's body as a JSON.
`json = await request.json()`
"""
try:
return json_loads(await self.body())
except (LookupError, ValueError) as exc:
raise ASGIDecodeError from exc
[docs]
async def data(self, *, raise_errors: bool = False) -> Union[str, bytes, MultiDict, TJSON]:
"""The method checks Content-Type Header and parse the request's data automatically.
:param raise_errors: Raise an error if the given data is invalid.
`data = await request.data()`
If `raise_errors` is false (by default) and the given data is invalid (ex. invalid json)
the request's body would be returned.
Returns data from :py:meth:`json` for `application/json`, :py:meth:`form` for
`application/x-www-form-urlencoded`, `multipart/form-data` and :py:meth:`text` otherwise.
"""
try:
if self.content_type in {
"application/x-www-form-urlencoded",
"multipart/form-data",
}:
return await self.form()
if self.content_type == "application/json":
return await self.json()
except ASGIDecodeError:
if raise_errors:
raise
return await self.body()
else:
return await self.text()