Browse Source

The pool of threads now process q_items.

Also, the purge is commanded as a q_item.
default 2 years ago
parent
commit
b2d186cd0f
3 changed files with 74 additions and 76 deletions
  1. 8 0
      activitypub.c
  2. 65 75
      httpd.c
  3. 1 1
      snac.h

+ 8 - 0
activitypub.c

@@ -1188,6 +1188,14 @@ void process_queue_item(xs_dict *q_item)
             }
         }
     }
+    else
+    if (strcmp(type, "purge") == 0) {
+        srv_log(xs_dup("purge start"));
+
+        purge_all();
+
+        srv_log(xs_dup("purge end"));
+    }
 }
 
 

+ 65 - 75
httpd.c

@@ -237,81 +237,6 @@ void term_handler(int s)
 }
 
 
-static void *purge_thread(void *arg)
-/* spawned purge */
-{
-    srv_log(xs_dup("purge start"));
-
-    purge_all();
-
-    srv_log(xs_dup("purge end"));
-
-    return NULL;
-}
-
-
-static void *background_thread(void *arg)
-/* background thread (queue management and other things) */
-{
-    time_t purge_time;
-
-    /* first purge time */
-    purge_time = time(NULL) + 10 * 60;
-
-    srv_log(xs_fmt("background thread started"));
-
-    while (srv_running) {
-        time_t t;
-
-        {
-            xs *list = user_list();
-            char *p, *uid;
-
-            /* process queues for all users */
-            p = list;
-            while (xs_list_iter(&p, &uid)) {
-                snac snac;
-
-                if (user_open(&snac, uid)) {
-                    process_user_queue(&snac);
-                    user_free(&snac);
-                }
-            }
-        }
-
-        /* global queue */
-        process_queue();
-
-        /* time to purge? */
-        if ((t = time(NULL)) > purge_time) {
-            pthread_t pth;
-
-            pthread_create(&pth, NULL, purge_thread, NULL);
-            pthread_detach(pth);
-
-            /* next purge time is tomorrow */
-            purge_time = t + 24 * 60 * 60;
-        }
-
-        /* sleep 3 seconds */
-        pthread_mutex_t dummy_mutex = PTHREAD_MUTEX_INITIALIZER;
-        pthread_cond_t  dummy_cond  = PTHREAD_COND_INITIALIZER;
-        struct timespec ts;
-
-        clock_gettime(CLOCK_REALTIME, &ts);
-        ts.tv_sec += 3;
-
-        pthread_mutex_lock(&dummy_mutex);
-        while (pthread_cond_timedwait(&dummy_cond, &dummy_mutex, &ts) == 0);
-        pthread_mutex_unlock(&dummy_mutex);
-    }
-
-    srv_log(xs_fmt("background thread stopped"));
-
-    return NULL;
-}
-
-
 /** job control **/
 
 /* mutex to access the lists of jobs */
@@ -391,6 +316,10 @@ static void *job_thread(void *arg)
             if (f != NULL)
                 httpd_connection(f);
         }
+        else {
+            /* it's a q_item */
+            process_queue_item(job);
+        }
     }
 
     srv_debug(0, xs_fmt("job thread %ld stopped", pid));
@@ -399,6 +328,67 @@ static void *job_thread(void *arg)
 }
 
 
+static void *background_thread(void *arg)
+/* background thread (queue management and other things) */
+{
+    time_t purge_time;
+
+    /* first purge time */
+    purge_time = time(NULL) + 10 * 60;
+
+    srv_log(xs_fmt("background thread started"));
+
+    while (srv_running) {
+        time_t t;
+
+        {
+            xs *list = user_list();
+            char *p, *uid;
+
+            /* process queues for all users */
+            p = list;
+            while (xs_list_iter(&p, &uid)) {
+                snac snac;
+
+                if (user_open(&snac, uid)) {
+                    process_user_queue(&snac);
+                    user_free(&snac);
+                }
+            }
+        }
+
+        /* global queue */
+        process_queue();
+
+        /* time to purge? */
+        if ((t = time(NULL)) > purge_time) {
+            /* next purge time is tomorrow */
+            purge_time = t + 24 * 60 * 60;
+
+            xs *q_item = xs_dict_new();
+            q_item = xs_dict_append(q_item, "type", "purge");
+            job_post(q_item);
+        }
+
+        /* sleep 3 seconds */
+        pthread_mutex_t dummy_mutex = PTHREAD_MUTEX_INITIALIZER;
+        pthread_cond_t  dummy_cond  = PTHREAD_COND_INITIALIZER;
+        struct timespec ts;
+
+        clock_gettime(CLOCK_REALTIME, &ts);
+        ts.tv_sec += 3;
+
+        pthread_mutex_lock(&dummy_mutex);
+        while (pthread_cond_timedwait(&dummy_cond, &dummy_mutex, &ts) == 0);
+        pthread_mutex_unlock(&dummy_mutex);
+    }
+
+    srv_log(xs_fmt("background thread stopped"));
+
+    return NULL;
+}
+
+
 void httpd(void)
 /* starts the server */
 {

+ 1 - 1
snac.h

@@ -169,7 +169,7 @@ int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_s
 int is_msg_public(snac *snac, char *msg);
 
 void process_user_queue(snac *snac);
-
+void process_queue_item(xs_dict *q_item);
 void process_queue(void);
 
 void post(snac *snac, char *msg);