1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- import requests # can't work with async
- from tqdm import tqdm
- from concurrent.futures import ThreadPoolExecutor, as_completed
- #? maybe require requests to ActivityPub endpoints to be signed
- # useful links:
- # https://federation.readthedocs.io/en/latest/usage.html
- # https://rss-bridge.github.io/rss-bridge/Bridge_Specific/ActivityPub_(Mastodon).html
- # https://github.com/RSS-Bridge/rss-bridge/blob/master/bridges/MastodonBridge.php
- # https://github.com/BentonEdmondson/servitor
- # https://codeberg.org/grunfink/snac2/src/branch/master/activitypub.c#L118
- #! migration is not in ActivityPub's spec
- # https://docs.gotosocial.org/en/latest/federation/federating_with_gotosocial/#actor-migration-aliasing
- def valid_account(account):
- try:
- response = requests.get(
- account["url"], headers={"Accept": "application/activity+json"}, timeout=5
- )
- # print(response.json())
- if response.status_code == 200:
- if response.json().get("movedTo"):
- return account["id"], 1
- return account["id"], 2 # we can't check their last status in one single AP request so just follow our instance
- return account["id"], 3
- except requests.exceptions.RequestException as e: # FIXME: other errors
- return account["id"], 3
- def valid_accounts(accounts):
- with tqdm(total=len(accounts)) as pbar:
- with ThreadPoolExecutor(max_workers=16) as executor:
- futures = [executor.submit(valid_account, acc) for acc in accounts]
- for future in as_completed(futures):
- pbar.update(1)
- yield future.result()
- if __name__ == "__main__":
- import json
- with open("accounts.json", "r") as f:
- accounts = json.load(f)
- for k, v in accounts.items():
- print(f"Validating {k}")
- if v is None:
- continue
- if k == "lists_members":
- for list_ in v:
- print(f"Validating list: {list_['title']}")
- for result in valid_accounts(list_["members"]):
- print(result)
- else:
- for result in valid_accounts(v):
- print(result)
- # state = valid_account("https://example.com/@example")
- # print(state)
|