utils.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. from datetime import datetime, timedelta
  2. from kddit.settings import HEADERS, TIMESHIFT, YDL_OPTS, CLIENT_ID, CLIENT_SECRET, URL, UA
  3. from kddit.settings import SUBREDDIT_OPTIONS, SAFE_SUBS, USER_OPTIONS
  4. import timeago
  5. import re
  6. import requests
  7. import youtube_dl
  8. from glom import glom as g
  9. from glom import Coalesce
  10. from bs4 import BeautifulSoup
  11. from html import unescape
  12. from bottle import request, abort
  13. import requests.auth
  14. ydl = youtube_dl.YoutubeDL(YDL_OPTS)
  15. def get_query():
  16. return dict(request.query)
  17. def human_format(num):
  18. num = float('{:.3g}'.format(num))
  19. magnitude = 0
  20. while abs(num) >= 1000:
  21. magnitude += 1
  22. num /= 1000.0
  23. return '{}{}'.format('{:f}'.format(num).rstrip('0').rstrip('.'), ['', 'K', 'M', 'B', 'T'][magnitude])
  24. def get_time(timestamp):
  25. date = datetime.fromtimestamp(
  26. timestamp) - timedelta(hours=TIMESHIFT)
  27. now = datetime.now()
  28. return timeago.format(date, now)
  29. def get_thumbnail(url):
  30. try:
  31. with ydl:
  32. info = ydl.extract_info(url, download=False)
  33. return info["thumbnail"]
  34. except:
  35. return ""
  36. def replace_tag(bs_tag, html_tag):
  37. tag = html_tag[0] if isinstance(html_tag, tuple) else html_tag
  38. soup = BeautifulSoup(tag.render(), "html.parser")
  39. bs_tag.replace_with(soup)
  40. def get_metadata(data, name):
  41. output = g(data, Coalesce(f"media_metadata.{name}.s.u", f"media_metadata.{name}.s.gif"), default=None)
  42. return unescape(output) if output else None
  43. external_preview_re = re.compile("https://external-preview.redd.it/")
  44. preview_re = re.compile("https://preview.redd.it/")
  45. image_re = re.compile("https://i.redd.it/")
  46. processing_re = re.compile("Processing img (.*)...")
  47. video_re = re.compile("https://reddit.com/link/.*/video/(.*)/player")
  48. def get_token():
  49. client_auth = requests.auth.HTTPBasicAuth(CLIENT_ID, CLIENT_SECRET)
  50. post_data = {"grant_type": "client_credentials"}
  51. response = requests.post("https://www.reddit.com/api/v1/access_token",
  52. auth=client_auth,
  53. data=post_data)
  54. if (response.status_code == 200):
  55. token_json = response.json()
  56. return token_json.get("access_token")
  57. return None
  58. def req_url(url, params=None):
  59. r = requests.get(url, params=params, headers={"User-Agent": UA})
  60. return r
  61. def req(path, params=None):
  62. r = requests.get(URL+path, params=params, headers=HEADERS)
  63. if (r.status_code == 401 or r.status_code == 403) and CLIENT_SECRET and CLIENT_ID:
  64. if token := get_token():
  65. HEADERS.update({"Authorization": "bearer "+token})
  66. r = requests.get(URL+path, params=params, headers=HEADERS)
  67. return r
  68. def success(r):
  69. return r.status_code == 200
  70. def builder(*args):
  71. obj = ()
  72. args = list(args)
  73. last = args.pop()
  74. args.reverse()
  75. for arg in args:
  76. if obj:
  77. obj = (arg(obj),)
  78. else:
  79. obj = (arg(last),)
  80. return obj
  81. def tuplefy(func):
  82. def inner(*args, **kwargs):
  83. result = func(*args, **kwargs)
  84. return (result,)
  85. return inner
  86. def get_subreddit_url():
  87. sub = request.url_args.get("subreddit")
  88. return f"/r/{sub}" if sub else ""
  89. def get_subreddit():
  90. sub = request.url_args.get("subreddit")
  91. return f"r/{sub}" if sub else ""
  92. def get_option():
  93. option = request.url_args.get("option")
  94. return option
  95. def verify_subreddit_option():
  96. option = get_option()
  97. if option and option not in SUBREDDIT_OPTIONS:
  98. return abort(404)
  99. def verify_user_option():
  100. option = get_option()
  101. if option and option not in USER_OPTIONS:
  102. return abort(404)
  103. def nsfw_mode(subreddit):
  104. return not subreddit or subreddit in SAFE_SUBS