api.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. from os.path import splitext, basename
  2. from typing import List, BinaryIO, Optional, Set, Union
  3. try:
  4. from os import PathLike
  5. except ImportError:
  6. PathLike = Union[str, 'os.PathLike[str]'] # type: ignore
  7. from charset_normalizer.constant import TOO_SMALL_SEQUENCE, TOO_BIG_SEQUENCE, IANA_SUPPORTED
  8. from charset_normalizer.md import mess_ratio
  9. from charset_normalizer.models import CharsetMatches, CharsetMatch
  10. from warnings import warn
  11. import logging
  12. from charset_normalizer.utils import any_specified_encoding, is_multi_byte_encoding, identify_sig_or_bom, \
  13. should_strip_sig_or_bom, is_cp_similar, iana_name
  14. from charset_normalizer.cd import coherence_ratio, encoding_languages, mb_encoding_languages, merge_coherence_ratios
  15. logger = logging.getLogger("charset_normalizer")
  16. logger.setLevel(logging.DEBUG)
  17. handler = logging.StreamHandler()
  18. handler.setFormatter(logging.Formatter('%(asctime)s | %(levelname)s | %(message)s'))
  19. logger.addHandler(handler)
  20. def from_bytes(
  21. sequences: bytes,
  22. steps: int = 5,
  23. chunk_size: int = 512,
  24. threshold: float = 0.2,
  25. cp_isolation: List[str] = None,
  26. cp_exclusion: List[str] = None,
  27. preemptive_behaviour: bool = True,
  28. explain: bool = False
  29. ) -> CharsetMatches:
  30. """
  31. Given a raw bytes sequence, return the best possibles charset usable to render str objects.
  32. If there is no results, it is a strong indicator that the source is binary/not text.
  33. By default, the process will extract 5 blocs of 512o each to assess the mess and coherence of a given sequence.
  34. And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
  35. The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
  36. but never take it for granted. Can improve the performance.
  37. You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
  38. purpose.
  39. This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
  40. """
  41. if not explain:
  42. logger.setLevel(logging.CRITICAL)
  43. else:
  44. logger.setLevel(logging.INFO)
  45. length = len(sequences) # type: int
  46. if length == 0:
  47. logger.warning("Given content is empty, stopping the process very early, returning empty utf_8 str match")
  48. return CharsetMatches(
  49. [
  50. CharsetMatch(
  51. sequences,
  52. "utf_8",
  53. 0.,
  54. False,
  55. [],
  56. ""
  57. )
  58. ]
  59. )
  60. if cp_isolation is not None:
  61. logger.warning('cp_isolation is set. use this flag for debugging purpose. '
  62. 'limited list of encoding allowed : %s.',
  63. ', '.join(cp_isolation))
  64. cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
  65. else:
  66. cp_isolation = []
  67. if cp_exclusion is not None:
  68. logger.warning(
  69. 'cp_exclusion is set. use this flag for debugging purpose. '
  70. 'limited list of encoding excluded : %s.',
  71. ', '.join(cp_exclusion))
  72. cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
  73. else:
  74. cp_exclusion = []
  75. if length <= (chunk_size * steps):
  76. logger.warning(
  77. 'override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.',
  78. steps, chunk_size, length)
  79. steps = 1
  80. chunk_size = length
  81. if steps > 1 and length / steps < chunk_size:
  82. chunk_size = int(length / steps)
  83. is_too_small_sequence = len(sequences) < TOO_SMALL_SEQUENCE # type: bool
  84. is_too_large_sequence = len(sequences) >= TOO_BIG_SEQUENCE # type: bool
  85. if is_too_small_sequence:
  86. warn('Trying to detect encoding from a tiny portion of ({}) byte(s).'.format(length))
  87. prioritized_encodings = [] # type: List[str]
  88. specified_encoding = any_specified_encoding(sequences) if preemptive_behaviour is True else None # type: Optional[str]
  89. if specified_encoding is not None:
  90. prioritized_encodings.append(specified_encoding)
  91. logger.info('Detected declarative mark in sequence. Priority +1 given for %s.', specified_encoding)
  92. tested = set() # type: Set[str]
  93. tested_but_hard_failure = [] # type: List[str]
  94. tested_but_soft_failure = [] # type: List[str]
  95. fallback_ascii = None # type: Optional[CharsetMatch]
  96. fallback_u8 = None # type: Optional[CharsetMatch]
  97. fallback_specified = None # type: Optional[CharsetMatch]
  98. single_byte_hard_failure_count = 0 # type: int
  99. single_byte_soft_failure_count = 0 # type: int
  100. results = CharsetMatches() # type: CharsetMatches
  101. sig_encoding, sig_payload = identify_sig_or_bom(sequences)
  102. if sig_encoding is not None:
  103. prioritized_encodings.append(sig_encoding)
  104. logger.info('Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.', len(sig_payload), sig_encoding)
  105. prioritized_encodings.append("ascii")
  106. if "utf_8" not in prioritized_encodings:
  107. prioritized_encodings.append("utf_8")
  108. for encoding_iana in prioritized_encodings+IANA_SUPPORTED:
  109. if cp_isolation and encoding_iana not in cp_isolation:
  110. continue
  111. if cp_exclusion and encoding_iana in cp_exclusion:
  112. continue
  113. if encoding_iana in tested:
  114. continue
  115. tested.add(encoding_iana)
  116. decoded_payload = None # type: Optional[str]
  117. bom_or_sig_available = sig_encoding == encoding_iana # type: bool
  118. strip_sig_or_bom = bom_or_sig_available and should_strip_sig_or_bom(encoding_iana) # type: bool
  119. if encoding_iana in {"utf_16", "utf_32"} and bom_or_sig_available is False:
  120. logger.info("Encoding %s wont be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", encoding_iana)
  121. continue
  122. try:
  123. is_multi_byte_decoder = is_multi_byte_encoding(encoding_iana) # type: bool
  124. except (ModuleNotFoundError, ImportError):
  125. logger.debug("Encoding %s does not provide an IncrementalDecoder", encoding_iana)
  126. continue
  127. try:
  128. if is_too_large_sequence and is_multi_byte_decoder is False:
  129. str(
  130. sequences[:int(50e4)] if strip_sig_or_bom is False else sequences[len(sig_payload):int(50e4)],
  131. encoding=encoding_iana
  132. )
  133. else:
  134. decoded_payload = str(
  135. sequences if strip_sig_or_bom is False else sequences[len(sig_payload):],
  136. encoding=encoding_iana
  137. )
  138. except UnicodeDecodeError as e:
  139. logger.warning('Code page %s does not fit given bytes sequence at ALL. %s', encoding_iana, str(e))
  140. tested_but_hard_failure.append(encoding_iana)
  141. if not is_multi_byte_decoder:
  142. single_byte_hard_failure_count += 1
  143. continue
  144. except LookupError:
  145. tested_but_hard_failure.append(encoding_iana)
  146. if not is_multi_byte_decoder:
  147. single_byte_hard_failure_count += 1
  148. continue
  149. similar_soft_failure_test = False # type: bool
  150. for encoding_soft_failed in tested_but_soft_failure:
  151. if is_cp_similar(encoding_iana, encoding_soft_failed):
  152. similar_soft_failure_test = True
  153. break
  154. if similar_soft_failure_test:
  155. logger.warning("%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", encoding_iana, encoding_soft_failed)
  156. continue
  157. r_ = range(
  158. 0 if bom_or_sig_available is False else len(sig_payload),
  159. length,
  160. int(length / steps)
  161. )
  162. multi_byte_bonus = is_multi_byte_decoder and decoded_payload is not None and len(decoded_payload) < length # type: bool
  163. if multi_byte_bonus:
  164. logger.info('Code page %s is a multi byte encoding table and it appear that at least one character was encoded using n-bytes.', encoding_iana)
  165. max_chunk_gave_up = int(len(r_) / 4) # type: int
  166. if max_chunk_gave_up < 2:
  167. max_chunk_gave_up = 2
  168. early_stop_count = 0 # type: int
  169. md_chunks = [] # type: List[str]
  170. md_ratios = []
  171. for i in r_:
  172. cut_sequence = sequences[i:i + chunk_size]
  173. if bom_or_sig_available and strip_sig_or_bom is False:
  174. cut_sequence = sig_payload+cut_sequence
  175. chunk = cut_sequence.decode(encoding_iana, errors="ignore") # type: str
  176. md_chunks.append(chunk)
  177. md_ratios.append(
  178. mess_ratio(
  179. chunk,
  180. threshold
  181. )
  182. )
  183. if md_ratios[-1] >= threshold:
  184. early_stop_count += 1
  185. if (early_stop_count >= max_chunk_gave_up) or (bom_or_sig_available and strip_sig_or_bom is False):
  186. break
  187. if md_ratios:
  188. mean_mess_ratio = sum(md_ratios) / len(md_ratios) # type: float
  189. else:
  190. mean_mess_ratio = 0.
  191. if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
  192. tested_but_soft_failure.append(encoding_iana)
  193. if not is_multi_byte_decoder:
  194. single_byte_soft_failure_count += 1
  195. logger.warning('%s was excluded because of initial chaos probing. Gave up %i time(s). '
  196. 'Computed mean chaos is %f %%.',
  197. encoding_iana,
  198. early_stop_count,
  199. round(mean_mess_ratio * 100, ndigits=3))
  200. # Preparing those fallbacks in case we got nothing.
  201. if encoding_iana in ["ascii", "utf_8", specified_encoding]:
  202. fallback_entry = CharsetMatch(
  203. sequences,
  204. encoding_iana,
  205. threshold,
  206. False,
  207. [],
  208. decoded_payload
  209. )
  210. if encoding_iana == specified_encoding:
  211. fallback_specified = fallback_entry
  212. elif encoding_iana == "ascii":
  213. fallback_ascii = fallback_entry
  214. else:
  215. fallback_u8 = fallback_entry
  216. continue
  217. logger.info(
  218. '%s passed initial chaos probing. Mean measured chaos is %f %%',
  219. encoding_iana,
  220. round(mean_mess_ratio * 100, ndigits=3)
  221. )
  222. if not is_multi_byte_decoder:
  223. target_languages = encoding_languages(encoding_iana) # type: List[str]
  224. else:
  225. target_languages = mb_encoding_languages(encoding_iana)
  226. if target_languages:
  227. logger.info("{} should target any language(s) of {}".format(encoding_iana, str(target_languages)))
  228. cd_ratios = []
  229. for chunk in md_chunks:
  230. chunk_languages = coherence_ratio(chunk, 0.1, ",".join(target_languages) if target_languages else None)
  231. cd_ratios.append(
  232. chunk_languages
  233. )
  234. cd_ratios_merged = merge_coherence_ratios(cd_ratios)
  235. if cd_ratios_merged:
  236. logger.info("We detected language {} using {}".format(cd_ratios_merged, encoding_iana))
  237. results.append(
  238. CharsetMatch(
  239. sequences,
  240. encoding_iana,
  241. mean_mess_ratio,
  242. bom_or_sig_available,
  243. cd_ratios_merged,
  244. decoded_payload
  245. )
  246. )
  247. if encoding_iana in [specified_encoding, "ascii", "utf_8"] and mean_mess_ratio < 0.1:
  248. logger.info("%s is most likely the one. Stopping the process.", encoding_iana)
  249. return CharsetMatches(
  250. [results[encoding_iana]]
  251. )
  252. if encoding_iana == sig_encoding:
  253. logger.info(
  254. "%s is most likely the one as we detected a BOM or SIG within the beginning of the sequence.",
  255. encoding_iana
  256. )
  257. return CharsetMatches(
  258. [results[encoding_iana]]
  259. )
  260. if results[-1].languages:
  261. logger.info(
  262. "Using %s code page we detected the following languages: %s",
  263. encoding_iana,
  264. results[encoding_iana]._languages
  265. )
  266. if len(results) == 0:
  267. if fallback_u8 or fallback_ascii or fallback_specified:
  268. logger.warning("Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.")
  269. if fallback_specified:
  270. logger.warning("%s will be used as a fallback match", fallback_specified.encoding)
  271. results.append(fallback_specified)
  272. elif (fallback_u8 and fallback_ascii is None) or (fallback_u8 and fallback_u8.fingerprint != fallback_ascii.fingerprint):
  273. logger.warning("utf_8 will be used as a fallback match")
  274. results.append(fallback_u8)
  275. elif fallback_ascii:
  276. logger.warning("ascii will be used as a fallback match")
  277. results.append(fallback_ascii)
  278. return results
  279. def from_fp(
  280. fp: BinaryIO,
  281. steps: int = 5,
  282. chunk_size: int = 512,
  283. threshold: float = 0.20,
  284. cp_isolation: List[str] = None,
  285. cp_exclusion: List[str] = None,
  286. preemptive_behaviour: bool = True,
  287. explain: bool = False
  288. ) -> CharsetMatches:
  289. """
  290. Same thing than the function from_bytes but using a file pointer that is already ready.
  291. Will not close the file pointer.
  292. """
  293. return from_bytes(
  294. fp.read(),
  295. steps,
  296. chunk_size,
  297. threshold,
  298. cp_isolation,
  299. cp_exclusion,
  300. preemptive_behaviour,
  301. explain
  302. )
  303. def from_path(
  304. path: PathLike,
  305. steps: int = 5,
  306. chunk_size: int = 512,
  307. threshold: float = 0.20,
  308. cp_isolation: List[str] = None,
  309. cp_exclusion: List[str] = None,
  310. preemptive_behaviour: bool = True,
  311. explain: bool = False
  312. ) -> CharsetMatches:
  313. """
  314. Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
  315. Can raise IOError.
  316. """
  317. with open(path, 'rb') as fp:
  318. return from_fp(fp, steps, chunk_size, threshold, cp_isolation, cp_exclusion, preemptive_behaviour, explain)
  319. def normalize(path: PathLike, steps: int = 5, chunk_size: int = 512, threshold: float = 0.20, cp_isolation: List[str] = None, cp_exclusion: List[str] = None, preemptive_behaviour: bool = True) -> CharsetMatch:
  320. """
  321. Take a (text-based) file path and try to create another file next to it, this time using UTF-8.
  322. """
  323. results = from_path(
  324. path,
  325. steps,
  326. chunk_size,
  327. threshold,
  328. cp_isolation,
  329. cp_exclusion,
  330. preemptive_behaviour
  331. )
  332. filename = basename(path)
  333. target_extensions = list(splitext(filename))
  334. if len(results) == 0:
  335. raise IOError('Unable to normalize "{}", no encoding charset seems to fit.'.format(filename))
  336. result = results.best()
  337. target_extensions[0] += '-' + result.encoding # type: ignore
  338. with open('{}'.format(path.replace(filename, ''.join(target_extensions))), 'wb') as fp:
  339. fp.write(
  340. result.output() # type: ignore
  341. )
  342. return result # type: ignore