pyropmCaptcha.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import asyncio
  4. import base64
  5. import hashlib
  6. import hmac
  7. import platform
  8. import sys
  9. import time
  10. from uuid import uuid4 as uuidgen
  11. import redis
  12. import uvloop
  13. from pyrogram import Client, filters, types
  14. import logging
  15. logging.basicConfig(format='[%(asctime)s] - [%(levelname)s] : %(message)s',level=logging.INFO)
  16. import pyroSecrets
  17. uvloop.install()
  18. app = Client(name="pmcaptcha_myoungram",
  19. api_id=pyroSecrets.PYRO_API_ID,
  20. api_hash=pyroSecrets.PYRO_API_SECRET,
  21. app_version=pyroSecrets.PYRO_MY_TITLE + " v1.0",
  22. device_model=platform.node())
  23. VERIF_TMPL = """
  24. 请在 90s 内点击 [此处]({veriurl}) 完成验证。如到期未完成,请联系 {botuser} 解封。
  25. Please click [here]({veriurl}) within 90s to finish verification if you are a human. If not, you will be blocked, contact {botuser} to unblock.
  26. Current Time: {tsstr}
  27. Powered By MYounGram v1.0
  28. """
  29. VERIF_FAIL = """
  30. 人机验证失败,验证不通过,请联系 {botuser} 解封。
  31. Human Verification Failed, Contact {botuser} to unblock. ERRCODE: {errcode}
  32. Powered By MYounGram v1.0
  33. """
  34. VERIF_PASS = """
  35. 人机验证通过,感谢您的使用。
  36. Human Verification Pass! Congrats!
  37. Powered By MYounGram v1.0
  38. """
  39. VERIF_500 = """
  40. 内部异常,验证结果待定,请联系 {botuser} 报障。
  41. We've encountered internal error, please contact {botuser} to report issue. ERRCODE: {errcode}
  42. Powered By MYounGram v1.0
  43. """
  44. redis_cli = redis.Redis(host=pyroSecrets.DB_REDIS_IP, port=pyroSecrets.DB_REDIS_PORT,
  45. db=pyroSecrets.DB_REDIS_DB, password=pyroSecrets.DB_REDIS_PASS,
  46. username=pyroSecrets.DB_REDIS_USER)
  47. @app.on_message(filters=filters.private)
  48. async def captcha_pm(client: Client, message: types.Message):
  49. # if msg is a bot, ignore
  50. if message.from_user.is_bot:
  51. return
  52. # if msg is from self or contact, ignore
  53. from_user = message.from_user
  54. if from_user.is_contact and not from_user.is_self:
  55. return
  56. # others, means strangers
  57. msg_chat_id = message.chat.id
  58. # if already whitelisted, ignore
  59. uStatus = redis_cli.get("ulist_" + str(msg_chat_id))
  60. if uStatus == b"1":
  61. return
  62. # If message is outgoing, means already known, add to k-v for bypass
  63. # if message is from original user and not send to saved message, it should be auto unban
  64. if message.outgoing and message.chat.id != message.from_user.id:
  65. if redis_cli.set("ulist_" + str(msg_chat_id), 1):
  66. logging.info("User " + str(msg_chat_id) + " added to whitelist due to outgoing first.")
  67. await client.unblock_user(msg_chat_id)
  68. return
  69. # bypass self message or message from verified user like telegram official
  70. if message.from_user.is_self or message.from_user.is_verified or message.chat.is_support:
  71. return
  72. # If already blocked, return
  73. if redis_cli.get("ulist_" + str(msg_chat_id)) == b"2":
  74. logging.info("User " + str(msg_chat_id) + " is already blocked.")
  75. await message.reply(VERIF_FAIL.format(errcode=9001,botuser=pyroSecrets.PYRO_MY_BOTNAME))
  76. await client.block_user(msg_chat_id)
  77. return
  78. # automatically send captcha and delete all message before captcha finished
  79. # if captcha is correct, add to k-v for bypass
  80. # if incorrect, block directly
  81. if not message.outgoing:
  82. # check pmstat_ in redis
  83. pmstat = redis_cli.get("pmstat_" + str(msg_chat_id))
  84. # pmstat_ not found, means new pm, send captcha
  85. if pmstat is None:
  86. # send captcha
  87. sessionUUID = str(uuidgen())
  88. curTs = int(time.time()) + 95
  89. veriurl = pyroSecrets.WEB_HostName + "/show" + pyroSecrets.WEB_UrlPrefix + "/" \
  90. + sessionUUID + "/" + str(msg_chat_id) + "/" + str(curTs)
  91. if message.from_user.is_verified:
  92. await message.reply("Premium User need to verify twice! (Just a joke)")
  93. await message.reply(VERIF_TMPL.format(veriurl=veriurl, botuser=pyroSecrets.PYRO_MY_BOTNAME, tsstr=time.strftime("%Y-%m-%d %H:%M:%S",
  94. time.localtime(int(time.time())))))
  95. logging.info("Captcha sent to " + str(msg_chat_id))
  96. # set pmstat_ and uinverify_ in redis
  97. ret = redis_cli.set("pmstat_" + str(msg_chat_id), curTs, ex=600)
  98. if ret is None:
  99. logging.info("[ERROR] pmstat_" + str(msg_chat_id) + " set stat failed")
  100. ret = redis_cli.set("uinverify_" + str(msg_chat_id), sessionUUID + "," + str(curTs), ex=95)
  101. if ret is None:
  102. logging.info("[ERROR] uinverify_" + str(msg_chat_id) + " set stat failed")
  103. return
  104. # pmstat_ found, means captcha already sent, check if sent sig is correct
  105. # if correct, add to k-v for bypass
  106. # first, check uinverify_ exists or not, if not, already expired
  107. # block user and return
  108. else:
  109. # check if value expired
  110. if int(pmstat.decode()) < int(time.time()):
  111. await message.reply(VERIF_FAIL.format(errcode=9002, botuser=pyroSecrets.PYRO_MY_BOTNAME))
  112. logging.info("Captcha expired, block user " + str(msg_chat_id))
  113. await client.block_user(msg_chat_id)
  114. # set ulist_ in redis
  115. ret = redis_cli.set("ulist_" + str(msg_chat_id), 2)
  116. if ret is None:
  117. logging.info("[ERROR] ulist_" + str(msg_chat_id) + " set stat failed")
  118. return
  119. # check uinverify_ in redis
  120. uinverify = redis_cli.get("uinverify_" + str(msg_chat_id))
  121. if uinverify is None:
  122. # uinverify_ not found, already expired, block user and return
  123. await message.reply(VERIF_FAIL.format(errcode=9004, botuser=pyroSecrets.PYRO_MY_BOTNAME))
  124. await client.block_user(msg_chat_id)
  125. logging.info("Captcha expired, block user " + str(msg_chat_id))
  126. ret = redis_cli.set("ulist_" + str(msg_chat_id), 2)
  127. if ret is None:
  128. logging.info("[ERROR] ulist_" + str(msg_chat_id) + " set block failed")
  129. return
  130. # uinverify_ found, check if sig is correct
  131. # if correct, add to k-v for bypass
  132. else:
  133. # retrieve text from message
  134. textSig = message.text
  135. userid = str(msg_chat_id)
  136. # retrieve ts from uinverify_
  137. # ts = [1], sessionUUID = [0]
  138. tsAndUUID = uinverify.decode().split(",")
  139. try:
  140. oriSignTxt = tsAndUUID[0] + "/" + userid + "/" + tsAndUUID[1]
  141. except KeyError:
  142. await message.reply(VERIF_500.format(errcode=9098, botuser=pyroSecrets.PYRO_MY_BOTNAME))
  143. logging.info("[ERROR] KeyError in uinverify_" + str(msg_chat_id))
  144. return
  145. # generate sig
  146. secretKeyB64 = pyroSecrets.HMAC_KEY_B64_URLSAFE_NOPAD
  147. while len(secretKeyB64) % 4 != 0:
  148. secretKeyB64 += "="
  149. secretKey = secretKeyB64.encode("utf-8")
  150. secretKeyBytes = base64.urlsafe_b64decode(secretKey)
  151. sigBytes = hmac.new(secretKeyBytes,
  152. oriSignTxt.encode(),
  153. hashlib.sha256)
  154. sigB64 = base64.b64encode(sigBytes.digest()).decode("utf-8")
  155. logging.info("Debug Sig: " + sigB64)
  156. # compare sig
  157. if sigB64 == textSig:
  158. logging.info("Captcha correct, add to whitelist " + str(msg_chat_id))
  159. # sig correct, add to k-v for bypass
  160. ret = redis_cli.set("ulist_" + str(msg_chat_id), 1)
  161. if ret is None:
  162. logging.info("[ERROR] ulist_" + str(msg_chat_id) + " set ok")
  163. await message.reply(VERIF_500.format(errcode=9099, botuser=pyroSecrets.PYRO_MY_BOTNAME))
  164. return
  165. else:
  166. logging.info("Verification Passed for: " + str(msg_chat_id))
  167. await message.reply(VERIF_PASS)
  168. return
  169. else:
  170. # sig incorrect, block user and return
  171. logging.info("Captcha SIG incorrect, block user " + str(msg_chat_id))
  172. await message.reply(VERIF_FAIL.format(errcode=9003, botuser=pyroSecrets.PYRO_MY_BOTNAME))
  173. await client.block_user(msg_chat_id)
  174. ret = redis_cli.set("ulist_" + str(msg_chat_id), 2)
  175. if ret is None:
  176. logging.info("[ERROR] ulist_" + str(msg_chat_id) + " set block failed")
  177. return
  178. def main():
  179. # Connect to redis
  180. try:
  181. redis_cli.ping()
  182. logging.info("Redis Connected.")
  183. except:
  184. logging.info("Redis connection failed")
  185. sys.exit(1)
  186. logging.info("PyroPM Captcha Bot is starting...")
  187. main()
  188. app.run()