_datetime_parse.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. """
  2. This file contains code from https://github.com/pydantic/pydantic/blob/main/pydantic/v1/datetime_parse.py
  3. without the Pydantic v1 specific errors.
  4. """
  5. from __future__ import annotations
  6. import re
  7. from typing import Dict, Union, Optional
  8. from datetime import date, datetime, timezone, timedelta
  9. from .._types import StrBytesIntFloat
  10. date_expr = r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})"
  11. time_expr = (
  12. r"(?P<hour>\d{1,2}):(?P<minute>\d{1,2})"
  13. r"(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d{0,6})?)?"
  14. r"(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$"
  15. )
  16. date_re = re.compile(f"{date_expr}$")
  17. datetime_re = re.compile(f"{date_expr}[T ]{time_expr}")
  18. EPOCH = datetime(1970, 1, 1)
  19. # if greater than this, the number is in ms, if less than or equal it's in seconds
  20. # (in seconds this is 11th October 2603, in ms it's 20th August 1970)
  21. MS_WATERSHED = int(2e10)
  22. # slightly more than datetime.max in ns - (datetime.max - EPOCH).total_seconds() * 1e9
  23. MAX_NUMBER = int(3e20)
  24. def _get_numeric(value: StrBytesIntFloat, native_expected_type: str) -> Union[None, int, float]:
  25. if isinstance(value, (int, float)):
  26. return value
  27. try:
  28. return float(value)
  29. except ValueError:
  30. return None
  31. except TypeError:
  32. raise TypeError(f"invalid type; expected {native_expected_type}, string, bytes, int or float") from None
  33. def _from_unix_seconds(seconds: Union[int, float]) -> datetime:
  34. if seconds > MAX_NUMBER:
  35. return datetime.max
  36. elif seconds < -MAX_NUMBER:
  37. return datetime.min
  38. while abs(seconds) > MS_WATERSHED:
  39. seconds /= 1000
  40. dt = EPOCH + timedelta(seconds=seconds)
  41. return dt.replace(tzinfo=timezone.utc)
  42. def _parse_timezone(value: Optional[str]) -> Union[None, int, timezone]:
  43. if value == "Z":
  44. return timezone.utc
  45. elif value is not None:
  46. offset_mins = int(value[-2:]) if len(value) > 3 else 0
  47. offset = 60 * int(value[1:3]) + offset_mins
  48. if value[0] == "-":
  49. offset = -offset
  50. return timezone(timedelta(minutes=offset))
  51. else:
  52. return None
  53. def parse_datetime(value: Union[datetime, StrBytesIntFloat]) -> datetime:
  54. """
  55. Parse a datetime/int/float/string and return a datetime.datetime.
  56. This function supports time zone offsets. When the input contains one,
  57. the output uses a timezone with a fixed offset from UTC.
  58. Raise ValueError if the input is well formatted but not a valid datetime.
  59. Raise ValueError if the input isn't well formatted.
  60. """
  61. if isinstance(value, datetime):
  62. return value
  63. number = _get_numeric(value, "datetime")
  64. if number is not None:
  65. return _from_unix_seconds(number)
  66. if isinstance(value, bytes):
  67. value = value.decode()
  68. assert not isinstance(value, (float, int))
  69. match = datetime_re.match(value)
  70. if match is None:
  71. raise ValueError("invalid datetime format")
  72. kw = match.groupdict()
  73. if kw["microsecond"]:
  74. kw["microsecond"] = kw["microsecond"].ljust(6, "0")
  75. tzinfo = _parse_timezone(kw.pop("tzinfo"))
  76. kw_: Dict[str, Union[None, int, timezone]] = {k: int(v) for k, v in kw.items() if v is not None}
  77. kw_["tzinfo"] = tzinfo
  78. return datetime(**kw_) # type: ignore
  79. def parse_date(value: Union[date, StrBytesIntFloat]) -> date:
  80. """
  81. Parse a date/int/float/string and return a datetime.date.
  82. Raise ValueError if the input is well formatted but not a valid date.
  83. Raise ValueError if the input isn't well formatted.
  84. """
  85. if isinstance(value, date):
  86. if isinstance(value, datetime):
  87. return value.date()
  88. else:
  89. return value
  90. number = _get_numeric(value, "date")
  91. if number is not None:
  92. return _from_unix_seconds(number).date()
  93. if isinstance(value, bytes):
  94. value = value.decode()
  95. assert not isinstance(value, (float, int))
  96. match = date_re.match(value)
  97. if match is None:
  98. raise ValueError("invalid date format")
  99. kw = {k: int(v) for k, v in match.groupdict().items()}
  100. try:
  101. return date(**kw)
  102. except ValueError:
  103. raise ValueError("invalid date format") from None