Changed the BIGMSGS semantics.
[citadel.git] / citadel / server / msgbase.c
index 259a3e5ab2994febe129e78febbdca89f6b31015..199a5efa366b9da07a7ef7dddf87926883a91c1b 100644 (file)
@@ -2293,57 +2293,52 @@ int CtdlSaveMsgPointerInRoom(char *roomname, long msgid, int do_repl_check, stru
 //
 // This is the back end for CtdlSubmitMsg() and should not be directly
 // called by server-side modules.
-long CtdlSaveThisMessage(struct CtdlMessage *msg, long msgid, int Reply) {
-       long retval;
-       int is_bigmsg = 0;
-       char *holdM = NULL;
-       long holdMLen = 0;
-
-       // If the message is big, set its body aside for storage elsewhere and we hide the message body from the serializer
-       if (!CM_IsEmpty(msg, eMessageText) && msg->cm_lengths[eMessageText] > BIGMSG) {
-               is_bigmsg = 1;
-               holdM = msg->cm_fields[eMessageText];
-               msg->cm_fields[eMessageText] = NULL;
-               holdMLen = msg->cm_lengths[eMessageText];
-               msg->cm_lengths[eMessageText] = 0;
-       }
+long CtdlSaveThisMessage(struct CtdlMessage *msg, long msgid) {
+       long error_count = 0;
 
        // Serialize our data structure for storage in the database
        struct ser_ret smr = CtdlSerializeMessage(msg);
 
-       if (is_bigmsg) {
-               // put the message body back into the message
-               msg->cm_fields[eMessageText] = holdM;
-               msg->cm_lengths[eMessageText] = holdMLen;
+       if (smr.len == 0) {
+               syslog(LOG_ERR, "msgbase: CtdlSaveMessage() unable to serialize message");
+               return (-1);
        }
 
-       if (smr.len == 0) {
-               if (Reply) {
-                       cprintf("%d Unable to serialize message\n", ERROR + INTERNAL_ERROR);
-               }
-               else {
-                       syslog(LOG_ERR, "msgbase: CtdlSaveMessage() unable to serialize message");
+       // STORAGE STRATEGY:
+       // * If headers+content fit are <= 4K, store them together.  It's likely to be one
+       //   memory page, one disk sector, etc. so we benefit from a single disk operation.
+       // * If headers+content exceed 4K, store them separately so we don't end up fetching
+       //   many gigamegs of data when we're just scanning the headers.
+       // * We are using a threshold of 4000 bytes so that there's some room for overhead
+       //   if the database or OS adds any metadata to that one disk page.
+
+       if (smr.len <= 4000) {                                  // all together less than one page, store them together
+
+               if (cdb_store(CDB_MSGMAIN, &msgid, (int)sizeof(long), smr.ser, smr.len)) {
+                       ++error_count;
                }
-               return (-1L);
-       }
 
-       // Write our little bundle of joy into the message base
-       retval = cdb_store(CDB_MSGMAIN, &msgid, (int)sizeof(long), smr.ser, smr.len);
-       if (retval < 0) {
-               syslog(LOG_ERR, "msgbase: can't store message %ld: %ld", msgid, retval);
        }
-       else {
-               if (is_bigmsg) {
-                       retval = cdb_store(CDB_BIGMSGS, &msgid, (int)sizeof(long), holdM, (holdMLen + 1));
-                       if (retval < 0) {
-                               syslog(LOG_ERR, "msgbase: failed to store message body for msgid %ld: %ld", msgid, retval);
-                       }
+
+       else {                                                  // exceed one page, store headers in MSGMAIN, body in BIGMSGS
+
+               if (cdb_store(CDB_MSGMAIN, &msgid, (int)sizeof(long), smr.ser, (smr.msgstart - smr.ser))) {
+                       ++error_count;
                }
+
+               if (cdb_store(CDB_BIGMSGS, &msgid, (int)sizeof(long), smr.msgstart+1, (smr.len - (smr.msgstart - smr.ser) - 1) )) {
+                       ++error_count;
+               }
+
+       }
+
+       if (error_count > 0) {
+               syslog(LOG_ERR, "msgbase: encountered %d errors storing message %ld", error_count, msgid);
        }
 
        // Free the memory we used for the serialized message
        free(smr.ser);
-       return(retval);
+       return(error_count);
 }
 
 
@@ -2366,7 +2361,7 @@ long send_message(struct CtdlMessage *msg) {
                CM_SetField(msg, emessageId, msgidbuf);
        }
 
-       retval = CtdlSaveThisMessage(msg, newmsgid, 1);
+       retval = CtdlSaveThisMessage(msg, newmsgid);
 
        if (retval == 0) {
                retval = newmsgid;
@@ -2387,11 +2382,13 @@ struct ser_ret CtdlSerializeMessage(struct CtdlMessage *msg) {
        size_t wlen;
        int i;
 
+       ret.len = 0;
+       ret.ser = NULL;
+       ret.msgstart = NULL;
+
        // Check for valid message format
        if (CM_IsValidMsg(msg) == 0) {
                syslog(LOG_ERR, "msgbase: CtdlSerializeMessage() aborting due to invalid message");
-               ret.len = 0;
-               ret.ser = NULL;
                return(ret);
        }
 
@@ -2408,6 +2405,7 @@ struct ser_ret CtdlSerializeMessage(struct CtdlMessage *msg) {
                syslog(LOG_ERR, "msgbase: CtdlSerializeMessage() malloc(%ld) failed: %m", (long)ret.len);
                ret.len = 0;
                ret.ser = NULL;
+               ret.msgstart = NULL;
                return(ret);
        }
 
@@ -2418,7 +2416,9 @@ struct ser_ret CtdlSerializeMessage(struct CtdlMessage *msg) {
 
        for (i=0; i < NDiskFields; ++i) {
                if (msg->cm_fields[FieldOrder[i]] != NULL) {
-                                                                       // future improvement: do the bigmsg check right here
+                       if (FieldOrder[i] == eMessageText) {
+                               ret.msgstart = &ret.ser[wlen];          // Make a note where the message text begins
+                       }
                        ret.ser[wlen++] = (char)FieldOrder[i];
                        memcpy(&ret.ser[wlen], msg->cm_fields[FieldOrder[i]], msg->cm_lengths[FieldOrder[i]] + 1);
                        wlen = wlen + msg->cm_lengths[FieldOrder[i]] + 1;
@@ -3081,9 +3081,9 @@ int CtdlDeleteMessages(const char *room_name,     // which room
        int need_to_free_re = 0;
 
        if (content_type) if (!IsEmptyStr(content_type)) {
-                       regcomp(&re, content_type, 0);
-                       need_to_free_re = 1;
-               }
+               regcomp(&re, content_type, 0);
+               need_to_free_re = 1;
+       }
        syslog(LOG_DEBUG, "msgbase: CtdlDeleteMessages(%s, %d msgs, %s)", room_name, num_dmsgnums, content_type);
 
        // get room record, obtaining a lock...