Began implementation of Journaling and Envelope Journaling.
authorArt Cancro <ajc@citadel.org>
Wed, 4 Jan 2006 23:14:52 +0000 (23:14 +0000)
committerArt Cancro <ajc@citadel.org>
Wed, 4 Jan 2006 23:14:52 +0000 (23:14 +0000)
citadel/ChangeLog
citadel/Makefile.in
citadel/housekeeping.c
citadel/journaling.c [new file with mode: 0644]
citadel/journaling.h [new file with mode: 0644]
citadel/msgbase.c
citadel/server.h
citadel/techdoc/hack.txt

index 9b99736f6c0ea13234b95b1884e8b162ca3f8c39..745d3489a6775010ff014f0840aa44a66f755ca1 100644 (file)
@@ -1,5 +1,8 @@
 $Id$
 
+Wed Jan  4 18:14:13 EST 2006 ajc
+* Began implementation of Journaling and Envelope Journaling.
+
 Thu Dec 15 23:12:05 EST 2005 ajc
 * Include a pre-fixed parsedate.c in the source tree in order to eliminate
   the requirement for yacc or bison to build Citadel.  Also, 'make clean'
index 3a99502dfcac56c6cba42d82697dda11fb041447..06d1469ac279927f3716a21854efb13193e61b9d 100644 (file)
@@ -102,7 +102,7 @@ SOURCES=aidepost.c auth.c base64.c chkpwd.c citadel.c citadel_ipc.c \
        serv_vandelay.c serv_vcard.c server_main.c setup.c snprintf.c \
        stress.c support.c sysdep.c tools.c user_ops.c userlist.c \
        whobbs.c vcard.c serv_notes.c serv_fulltext.c ft_wordbreaker.c \
-       crc16.c
+       crc16.c journaling.c
 
 DEP_FILES=$(SOURCES:.c=.d)
 
@@ -135,7 +135,7 @@ SERV_OBJS = server_main.o \
        control.o policy.o config.o support.o room_ops.o \
        file_ops.o msgbase.o euidindex.o \
        locate_host.o housekeeping.o mime_parser.o html.o \
-       internet_addressing.o \
+       internet_addressing.o journaling.o \
        serv_crypto.o parsedate.o genstamp.o \
        clientsocket.o $(AUTH) $(SERV_MODULES)
 
index 7eee008ae05911837a9d1f092f69392e5172d498..1c833f0401af04687ddbfb05dd98e2a4021867f6 100644 (file)
@@ -40,7 +40,8 @@
 #include "sysdep_decls.h"
 #include "room_ops.h"
 #include "database.h"
-
+#include "msgbase.h"
+#include "journaling.h"
 
 
 
@@ -136,6 +137,7 @@ void do_housekeeping(void) {
        static int housekeeping_in_progress = 0;
        static time_t last_timer = 0L;
        int do_housekeeping_now = 0;
+       int do_perminute_housekeeping_now = 0;
        time_t now;
 
        /*
@@ -144,11 +146,12 @@ void do_housekeeping(void) {
         * potentially have multiple concurrent mutexes in progress.
         */
        begin_critical_section(S_HOUSEKEEPING);
-       now = time(NULL);
-       if ( (now - last_timer) > (time_t)60 ) {
-               if (housekeeping_in_progress == 0) {
-                       do_housekeeping_now = 1;
-                       housekeeping_in_progress = 1;
+       if (housekeeping_in_progress == 0) {
+               do_housekeeping_now = 1;
+               housekeeping_in_progress = 1;
+               now = time(NULL);
+               if ( (now - last_timer) > (time_t)60 ) {
+                       do_perminute_housekeeping_now = 1;
                        last_timer = time(NULL);
                }
        }
@@ -163,8 +166,14 @@ void do_housekeeping(void) {
         * loop.  Everything below this point is real work.
         */
 
-       cdb_check_handles();                    /* suggested by Justin Case */
-       PerformSessionHooks(EVT_TIMER);         /* Run any timer hooks */
+       /* First, do the "as often as needed" stuff... */
+       JournalRunQueue();
+
+       /* Then, do the "once per minute" stuff... */
+       if (do_perminute_housekeeping_now) {
+               cdb_check_handles();                    /* suggested by Justin Case */
+               PerformSessionHooks(EVT_TIMER);         /* Run any timer hooks */
+       }
 
        /*
         * All done.
diff --git a/citadel/journaling.c b/citadel/journaling.c
new file mode 100644 (file)
index 0000000..cd1f5d4
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * $Id:  $
+ *
+ * Message journaling functions.
+ *
+ */
+
+#include "sysdep.h"
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <fcntl.h>
+
+#if TIME_WITH_SYS_TIME
+# include <sys/time.h>
+# include <time.h>
+#else
+# if HAVE_SYS_TIME_H
+#  include <sys/time.h>
+# else
+#  include <time.h>
+# endif
+#endif
+
+
+#include <ctype.h>
+#include <string.h>
+#include <limits.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <sys/stat.h>
+#include "citadel.h"
+#include "server.h"
+#include "serv_extensions.h"
+#include "database.h"
+#include "msgbase.h"
+#include "support.h"
+#include "sysdep_decls.h"
+#include "citserver.h"
+#include "room_ops.h"
+#include "user_ops.h"
+#include "file_ops.h"
+#include "config.h"
+#include "control.h"
+#include "tools.h"
+#include "mime_parser.h"
+#include "html.h"
+#include "genstamp.h"
+#include "internet_addressing.h"
+#include "journaling.h"
+
+struct jnlq *jnlq = NULL;      /* journal queue */
+
+/*
+ * Hand off a copy of a message to be journalized.
+ */
+void JournalBackgroundSubmit(struct CtdlMessage *msg,
+                       char *saved_rfc822_version,
+                       struct recptypes *recps) {
+
+       struct jnlq *jptr = NULL;
+
+       /* Avoid double journaling! */
+       if (msg->cm_fields['J'] != NULL) {
+               free(saved_rfc822_version);
+               return;
+       }
+
+       jptr = (struct jnlq *)malloc(sizeof(struct jnlq));
+       if (jptr == NULL) {
+               free(saved_rfc822_version);
+               return;
+       }
+       memset(jptr, 0, sizeof(struct jnlq));
+       if (recps != NULL) memcpy(&jptr->recps, recps, sizeof(struct recptypes));
+       if (msg->cm_fields['A'] != NULL) jptr->from = strdup(msg->cm_fields['A']);
+       if (msg->cm_fields['N'] != NULL) jptr->node = strdup(msg->cm_fields['N']);
+       if (msg->cm_fields['F'] != NULL) jptr->rfca = strdup(msg->cm_fields['F']);
+       if (msg->cm_fields['U'] != NULL) jptr->subj = strdup(msg->cm_fields['U']);
+       if (msg->cm_fields['I'] != NULL) jptr->msgn = strdup(msg->cm_fields['I']);
+       jptr->rfc822 = saved_rfc822_version;
+
+       /* Add to the queue */
+       begin_critical_section(S_JOURNAL_QUEUE);
+       jptr->next = jnlq;
+       jnlq = jptr;
+       end_critical_section(S_JOURNAL_QUEUE);
+}
+
+
+/*
+ * Called by JournalRunQueue() to send an individual message.
+ */
+void JournalRunQueueMsg(struct jnlq *jmsg) {
+
+       struct CtdlMessage *journal_msg = NULL;
+       struct recptypes *journal_recps = NULL;
+       char *message_text = NULL;
+       char mime_boundary[256];
+       static int seq = 0;
+
+       journal_recps = validate_recipients("FIXME@FIXME.com"); /* FIXME */
+       if (journal_recps != NULL) {
+
+               if (  (journal_recps->num_local > 0)
+                  || (journal_recps->num_internet > 0)
+                  || (journal_recps->num_ignet > 0)
+                  || (journal_recps->num_room > 0)
+               ) {
+
+                       /*
+                        * Construct journal message.
+                        * Note that we are transferring ownership of some of the memory here.
+                        */
+                       journal_msg = malloc(sizeof(struct CtdlMessage));
+                       memset(journal_msg, 0, sizeof(struct CtdlMessage));
+                       journal_msg->cm_magic = CTDLMESSAGE_MAGIC;
+                       journal_msg->cm_anon_type = MES_NORMAL;
+                       journal_msg->cm_format_type = FMT_RFC822;
+                       journal_msg->cm_fields['J'] = strdup("is journal");
+                       journal_msg->cm_fields['A'] = jmsg->from;
+                       journal_msg->cm_fields['N'] = jmsg->node;
+                       journal_msg->cm_fields['F'] = jmsg->rfca;
+                       journal_msg->cm_fields['U'] = jmsg->subj;
+
+                       sprintf(mime_boundary, "---Citadel-Message-Journal-%08lx-%04x---", time(NULL), ++seq);
+                       message_text = malloc(strlen(jmsg->rfc822) + sizeof(struct recptypes) + 1024);
+
+                       sprintf(message_text,
+                               "Content-type: multipart/mixed; boundary=\"%s\"\r\n"
+                               "MIME-Version: 1.0\r\n"
+                               "\n"
+                               "--%s\r\n"
+                               "Content-type: text/plain\r\n"
+                               "\r\n"
+                               "FIXME PUT MEMO HERE\r\n"
+                               "--%s\r\n"
+                               "Content-type: message/rfc822\r\n"
+                               "\r\n"
+                               "%s"
+                               "--%s--\r\n"
+                       ,
+                               mime_boundary,
+                               mime_boundary,
+                               mime_boundary,
+                               jmsg->rfc822,
+                               mime_boundary
+                       );
+
+                       journal_msg->cm_fields['M'] = message_text;
+                       free(jmsg->rfc822);
+                       free(jmsg->msgn);
+                       
+                       /* Submit journal message */
+                       CtdlSubmitMsg(journal_msg, journal_recps, "");
+                       CtdlFreeMessage(journal_msg);
+               }
+
+               free(journal_recps);
+       }
+
+       /* We are responsible for freeing this memory. */
+       free(jmsg);
+}
+
+
+/*
+ * Run the queue.
+ */
+void JournalRunQueue(void) {
+       struct jnlq *jptr;
+
+       while (jnlq != NULL) {
+               begin_critical_section(S_JOURNAL_QUEUE);
+               if (jnlq != NULL) {
+                       jptr = jnlq;
+                       jnlq = jnlq->next;
+               }
+               end_critical_section(S_JOURNAL_QUEUE);
+               JournalRunQueueMsg(jptr);
+       }
+}
+
+
diff --git a/citadel/journaling.h b/citadel/journaling.h
new file mode 100644 (file)
index 0000000..2432dde
--- /dev/null
@@ -0,0 +1,18 @@
+/* $Id: $ */
+
+struct jnlq {
+       struct jnlq *next;
+       struct recptypes recps;
+       char *from;
+       char *node;
+       char *rfca;
+       char *subj;
+       char *msgn;
+       char *rfc822;
+};
+
+void JournalBackgroundSubmit(struct CtdlMessage *msg,
+                        char *saved_rfc822_version,
+                        struct recptypes *recps);
+void JournalRunQueueMsg(struct jnlq *jmsg);
+void JournalRunQueue(void);
index c0e5eaa2ce1d110d48b89d15322bfd9d053a545a..5c832a28d0d6c15859b272f77bbbd4129e9bea02 100644 (file)
@@ -50,6 +50,7 @@
 #include "serv_fulltext.h"
 #include "vcard.h"
 #include "euidindex.h"
+#include "journaling.h"
 
 long config_msgnum;
 struct addresses_to_be_filed *atbf = NULL;
@@ -82,7 +83,8 @@ char *msgkeys[] = {
        NULL, 
        "hnod",
        "msgn",
-       NULL, NULL, NULL,
+       "jrnl",
+       NULL, NULL,
        "text",
        "node",
        "room",
@@ -2137,6 +2139,8 @@ long CtdlSubmitMsg(struct CtdlMessage *msg,       /* message to save */
        char *hold_R, *hold_D;
        char *collected_addresses = NULL;
        struct addresses_to_be_filed *aptr = NULL;
+       char *saved_rfc822_version = NULL;
+       int qualified_for_journaling = 0;
 
        lprintf(CTDL_DEBUG, "CtdlSubmitMsg() called\n");
        if (is_valid_message(msg) == 0) return(-1);     /* self check */
@@ -2265,13 +2269,16 @@ long CtdlSubmitMsg(struct CtdlMessage *msg,     /* message to save */
        safestrncpy(smi.meta_content_type, content_type,
                        sizeof smi.meta_content_type);
 
-       /* As part of the new metadata record, measure how
-        * big this message will be when displayed as RFC822.
-        * Both POP and IMAP use this, and it's best to just take the hit now
-        * instead of having to potentially measure thousands of messages when
-        * a mailbox is opened later.
+       /*
+        * Measure how big this message will be when rendered as RFC822.
+        * We do this for two reasons:
+        * 1. We need the RFC822 length for the new metadata record, so the
+        *    POP and IMAP services don't have to calculate message lengths
+        *    while the user is waiting (multiplied by potentially hundreds
+        *    or thousands of messages).
+        * 2. If journaling is enabled, we will need an RFC822 version of the
+        *    message to attach to the journalized copy.
         */
-
        if (CC->redirect_buffer != NULL) {
                lprintf(CTDL_ALERT, "CC->redirect_buffer is not NULL during message submission!\n");
                abort();
@@ -2281,7 +2288,7 @@ long CtdlSubmitMsg(struct CtdlMessage *msg,       /* message to save */
        CC->redirect_alloc = SIZ;
        CtdlOutputPreLoadedMsg(msg, MT_RFC822, HEADERS_ALL, 0, 1);
        smi.meta_rfc822_length = CC->redirect_len;
-       free(CC->redirect_buffer);
+       saved_rfc822_version = CC->redirect_buffer;
        CC->redirect_buffer = NULL;
        CC->redirect_len = 0;
        CC->redirect_alloc = 0;
@@ -2432,6 +2439,7 @@ long CtdlSubmitMsg(struct CtdlMessage *msg,       /* message to save */
                imsg->cm_anon_type = MES_NORMAL;
                imsg->cm_format_type = FMT_RFC822;
                imsg->cm_fields['A'] = strdup("Citadel");
+               imsg->cm_fields['J'] = strdup("do not journal");
                imsg->cm_fields['M'] = instr;
                CtdlSubmitMsg(imsg, NULL, SMTP_SPOOLOUT_ROOM);
                CtdlFreeMessage(imsg);
@@ -2456,11 +2464,34 @@ long CtdlSubmitMsg(struct CtdlMessage *msg,     /* message to save */
                atbf = aptr;
                end_critical_section(S_ATBF);
        }
-       return(newmsgid);
-}
 
+       /*
+        * Determine whether this message qualifies for journaling.
+        */
+       if (msg->cm_fields['J'] != NULL) {
+               qualified_for_journaling = 0;
+       }
+       else {
+               qualified_for_journaling = 1;   /* FIXME */
+       }
 
+       /*
+        * Do we have to perform journaling?  If so, hand off the saved
+        * RFC822 version will be handed off to the journaler for background
+        * submit.  Otherwise, we have to free the memory ourselves.
+        */
+       if (saved_rfc822_version != NULL) {
+               if (qualified_for_journaling) {
+                       JournalBackgroundSubmit(msg, saved_rfc822_version, recps);
+               }
+               else {
+                       free(saved_rfc822_version);
+               }
+       }
 
+       /* Done. */
+       return(newmsgid);
+}
 
 
 
index 610de480d7cefa5e825dae646533e44476f50ecd..2b60be19372c40d96a2253db6f05a85c65d11771 100644 (file)
@@ -219,6 +219,7 @@ enum {
        S_FLOORCACHE,
        S_DEBUGMEMLEAKS,
        S_ATBF,
+       S_JOURNAL_QUEUE,
        MAX_SEMAPHORES
 };
 
@@ -466,6 +467,6 @@ struct ser_ret {
 /*               **********                    Important fields */
 /*                         ***************     Semi-important fields */
 /*                                        *    Message text (MUST be last) */
-#define FORDER "IPTAFONHRDBCEGJKLQSVWXZYUM"
+#define FORDER "IPTAFONHRDBCEJGKLQSVWXZYUM"
 
 #endif /* SERVER_H */
index e1616b3d791c11b4eb5575181b111695bc520d79..aea90918f98b664049ea7a4658b993b82f51d5e5 100644 (file)
@@ -257,6 +257,10 @@ F  rFc822 address  For Internet mail, this is the delivery address of the
 H      Human node name Human-readable name of system message originated on.
 I      Original ID     A 32-bit integer containing the message ID on the
                        system the message *originated* on.
+J      Journal         The presence of this field indicates that the message
+                       is disqualified from being journaled, perhaps because
+                       it is itself a journalized message and we wish to
+                       avoid double journaling.
 M      Message Text    Normal ASCII, newlines seperated by CR's or LF's,
                         null terminated as always.
 N      Nodename        Contains node name of system message originated on.