import requests # can't work with async from tqdm import tqdm from concurrent.futures import ThreadPoolExecutor, as_completed #? maybe require requests to ActivityPub endpoints to be signed # useful links: # https://federation.readthedocs.io/en/latest/usage.html # https://rss-bridge.github.io/rss-bridge/Bridge_Specific/ActivityPub_(Mastodon).html # https://github.com/RSS-Bridge/rss-bridge/blob/master/bridges/MastodonBridge.php # https://github.com/BentonEdmondson/servitor # https://codeberg.org/grunfink/snac2/src/branch/master/activitypub.c#L118 #! migration is not in ActivityPub's spec # https://docs.gotosocial.org/en/latest/federation/federating_with_gotosocial/#actor-migration-aliasing def valid_account(account): try: response = requests.get( account["url"], headers={"Accept": "application/activity+json"}, timeout=5 ) # print(response.json()) if response.status_code == 200: if response.json().get("movedTo"): return account["id"], 1 return account["id"], 2 # we can't check their last status in one single AP request so just follow our instance return account["id"], 3 except requests.exceptions.RequestException as e: # FIXME: other errors return account["id"], 3 def valid_accounts(accounts): with tqdm(total=len(accounts)) as pbar: with ThreadPoolExecutor(max_workers=16) as executor: futures = [executor.submit(valid_account, acc) for acc in accounts] for future in as_completed(futures): pbar.update(1) yield future.result() if __name__ == "__main__": import json with open("accounts.json", "r") as f: accounts = json.load(f) for k, v in accounts.items(): print(f"Validating {k}") if v is None: continue if k == "lists_members": for list_ in v: print(f"Validating list: {list_['title']}") for result in valid_accounts(list_["members"]): print(result) else: for result in valid_accounts(v): print(result) # state = valid_account("https://example.com/@example") # print(state)