valid.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import requests # can't work with async
  2. from tqdm import tqdm
  3. from concurrent.futures import ThreadPoolExecutor, as_completed
  4. #? maybe require requests to ActivityPub endpoints to be signed
  5. # useful links:
  6. # https://federation.readthedocs.io/en/latest/usage.html
  7. # https://rss-bridge.github.io/rss-bridge/Bridge_Specific/ActivityPub_(Mastodon).html
  8. # https://github.com/RSS-Bridge/rss-bridge/blob/master/bridges/MastodonBridge.php
  9. # https://github.com/BentonEdmondson/servitor
  10. # https://codeberg.org/grunfink/snac2/src/branch/master/activitypub.c#L118
  11. #! migration is not in ActivityPub's spec
  12. # https://docs.gotosocial.org/en/latest/federation/federating_with_gotosocial/#actor-migration-aliasing
  13. def valid_account(account):
  14. try:
  15. response = requests.get(
  16. account["url"], headers={"Accept": "application/activity+json"}, timeout=5
  17. )
  18. # print(response.json())
  19. if response.status_code == 200:
  20. if response.json().get("movedTo"):
  21. return account["id"], 1
  22. return account["id"], 2 # we can't check their last status in one single AP request so just follow our instance
  23. return account["id"], 3
  24. except requests.exceptions.RequestException as e: # FIXME: other errors
  25. return account["id"], 3
  26. def valid_accounts(accounts):
  27. with tqdm(total=len(accounts)) as pbar:
  28. with ThreadPoolExecutor(max_workers=16) as executor:
  29. futures = [executor.submit(valid_account, acc) for acc in accounts]
  30. for future in as_completed(futures):
  31. pbar.update(1)
  32. yield future.result()
  33. if __name__ == "__main__":
  34. import json
  35. with open("accounts.json", "r") as f:
  36. accounts = json.load(f)
  37. for k, v in accounts.items():
  38. print(f"Validating {k}")
  39. if v is None:
  40. continue
  41. if k == "lists_members":
  42. for list_ in v:
  43. print(f"Validating list: {list_['title']}")
  44. for result in valid_accounts(list_["members"]):
  45. print(result)
  46. else:
  47. for result in valid_accounts(v):
  48. print(result)
  49. # state = valid_account("https://example.com/@example")
  50. # print(state)