Browse Source

Call process_input_message() from the shared-inbox input.

This way, some garbage like unrequested Deletes from Mastodon
and other transient errors (like unaccessible authors) can be
short-circuited before propagating the message to the users.
default 1 year ago
parent
commit
888a79e58a
1 changed files with 47 additions and 26 deletions
  1. 47 26
      activitypub.c

+ 47 - 26
activitypub.c

@@ -2052,45 +2052,66 @@ void process_queue_item(xs_dict *q_item)
     }
     else
     if (strcmp(type, "input") == 0) {
-        /* redistribute the input message to all users */
-        char *ntid   = xs_dict_get(q_item, "ntid");
-        xs *tmpfn    = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid);
         xs_dict *msg = xs_dict_get(q_item, "message");
-        FILE *f;
+        xs_dict *req = xs_dict_get(q_item, "req");
+        int retries  = xs_number_get(xs_dict_get(q_item, "retries"));
+
+        /* do some instance-level checks */
+        int r = process_input_message(NULL, msg, req);
 
-        if ((f = fopen(tmpfn, "w")) != NULL) {
-            xs_json_dump(q_item, 4, f);
-            fclose(f);
+        if (r == 0) {
+            /* transient error? retry */
+            int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
+
+            if (retries > queue_retry_max)
+                srv_log(xs_fmt("shared input giving up"));
+            else {
+                /* reenqueue */
+                enqueue_shared_input(msg, req, retries + 1);
+                srv_log(xs_fmt("shared input requeue #%d", retries + 1));
+            }
         }
+        else
+        if (r == 2) {
+            /* redistribute the input message to all users */
+            char *ntid = xs_dict_get(q_item, "ntid");
+            xs *tmpfn  = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid);
+            FILE *f;
+
+            if ((f = fopen(tmpfn, "w")) != NULL) {
+                xs_json_dump(q_item, 4, f);
+                fclose(f);
+            }
 
-        xs *users = user_list();
-        xs_list *p = users;
-        char *v;
-        int cnt = 0;
+            xs *users = user_list();
+            xs_list *p = users;
+            char *v;
+            int cnt = 0;
 
-        while (xs_list_iter(&p, &v)) {
-            snac user;
+            while (xs_list_iter(&p, &v)) {
+                snac user;
 
-            if (user_open(&user, v)) {
-                if (is_msg_for_me(&user, msg)) {
-                    xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid);
+                if (user_open(&user, v)) {
+                    if (is_msg_for_me(&user, msg)) {
+                        xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid);
 
-                    snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn));
+                        snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn));
 
-                    if (link(tmpfn, fn) < 0)
-                        srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn));
+                        if (link(tmpfn, fn) < 0)
+                            srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn));
 
-                    cnt++;
-                }
+                        cnt++;
+                    }
 
-                user_free(&user);
+                    user_free(&user);
+                }
             }
-        }
 
-        unlink(tmpfn);
+            unlink(tmpfn);
 
-        if (cnt == 0)
-            srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn));
+            if (cnt == 0)
+                srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn));
+        }
     }
     else
         srv_log(xs_fmt("unexpected q_item type '%s'", type));