From baa76dc30501a088c025fa10408ac50b4f452cba Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Tue, 5 Feb 2013 23:23:06 +0100 Subject: [PATCH] RNCFG: reduce the work needed to be done during the active rncfg lock --- citadel/citserver.h | 1 + citadel/modules/network/netspool.h | 19 +++- citadel/modules/network/serv_netmail.c | 93 ++++++++--------- citadel/modules/network/serv_netspool.c | 133 +++++++++++++++++------- citadel/modules/network/serv_network.c | 43 +++++++- 5 files changed, 196 insertions(+), 93 deletions(-) diff --git a/citadel/citserver.h b/citadel/citserver.h index 59660496c..94c7e8d90 100644 --- a/citadel/citserver.h +++ b/citadel/citserver.h @@ -22,6 +22,7 @@ struct RoomProcList { char name[ROOMNAMELEN]; char lcname[ROOMNAMELEN]; long namelen; + long lastsent; long key; long QRNum; }; diff --git a/citadel/modules/network/netspool.h b/citadel/modules/network/netspool.h index 0fcb1808b..e62e8cfb3 100644 --- a/citadel/modules/network/netspool.h +++ b/citadel/modules/network/netspool.h @@ -31,20 +31,33 @@ typedef struct SpoolControl SpoolControl; struct SpoolControl { OneRoomNetCfg *RNCfg; + struct ctdlroom room; + StrBuf *Users[maxRoomNetCfg]; FILE *digestfp; int num_msgs_spooled; long lastsent; HashList *working_ignetcfg; HashList *the_netmap; + + SpoolControl *next; }; -void network_spoolout_room(RoomProcList *room_to_spool, - HashList *working_ignetcfg, - HashList *the_netmap); +void network_spoolout_room(SpoolControl *sc); + +void InspectQueuedRoom(SpoolControl **pSC, + RoomProcList *room_to_spool, + HashList *working_ignetcfg, + HashList *the_netmap); + +int HaveSpoolConfig(OneRoomNetCfg* RNCfg); + + void network_do_spoolin(HashList *working_ignetcfg, HashList *the_netmap, int *netmap_changed); void network_consolidate_spoolout(HashList *working_ignetcfg, HashList *the_netmap); void free_spoolcontrol_struct(SpoolControl **scc); int writenfree_spoolcontrol_file(SpoolControl **scc, char *filename); int read_spoolcontrol_file(SpoolControl **scc, char *filename); + +void aggregate_recipients(StrBuf **recps, RoomNetCfg Which, OneRoomNetCfg *OneRNCfg); diff --git a/citadel/modules/network/serv_netmail.c b/citadel/modules/network/serv_netmail.c index d9cd51093..d18f04540 100644 --- a/citadel/modules/network/serv_netmail.c +++ b/citadel/modules/network/serv_netmail.c @@ -128,10 +128,12 @@ void network_deliver_digest(SpoolControl *sc) struct CtdlMessage *msg = NULL; long msglen; StrBuf *recps = NULL; - char *precps; struct recptypes *valid; char bounce_to[256]; + if (sc->Users[listrecp] == NULL) + return; + if (sc->num_msgs_spooled < 1) { fclose(sc->digestfp); sc->digestfp = NULL; @@ -181,6 +183,8 @@ void network_deliver_digest(SpoolControl *sc) sc->digestfp = NULL; /* Now generate the delivery instructions */ + if (sc->Users[listrecp] == NULL) + return; aggregate_recipients(&recps, digestrecp, sc->RNCfg); /* Where do we want bounces and other noise to be heard? @@ -188,9 +192,7 @@ void network_deliver_digest(SpoolControl *sc) snprintf(bounce_to, sizeof bounce_to, "room_aide@%s", config.c_fqdn); /* Now submit the message */ - precps = SmashStrBuf(&recps); - valid = validate_recipients(precps, NULL, 0); - free(precps); + valid = validate_recipients(ChrPtr(sc->Users[listrecp]), NULL, 0); if (valid != NULL) { valid->bounce_to = strdup(bounce_to); valid->envelope_from = strdup(bounce_to); @@ -209,7 +211,7 @@ void network_process_digest(SpoolControl *sc, struct CtdlMessage *omsg, long *de /* * Process digest recipients */ - if ((sc->RNCfg->NetConfigs[digestrecp] == NULL) || + if ((sc->Users[digestrecp] == NULL)|| (sc->digestfp == NULL)) return; @@ -268,6 +270,7 @@ void network_process_digest(SpoolControl *sc, struct CtdlMessage *omsg, long *de void network_process_list(SpoolControl *sc, struct CtdlMessage *omsg, long *delete_after_send) { + struct CitContext *CCC = CC; int rlen; char *pCh; StrBuf *Subject, *FlatSubject; @@ -277,7 +280,7 @@ void network_process_list(SpoolControl *sc, struct CtdlMessage *omsg, long *dele /* * Process mailing list recipients */ - if (sc->RNCfg->NetConfigs[listrecp] == NULL) + if (sc->Users[listrecp] == NULL) return; /* create our own copy of the message. @@ -312,8 +315,8 @@ void network_process_list(SpoolControl *sc, struct CtdlMessage *omsg, long *dele msg->cm_fields['L'] = malloc(1024); snprintf(msg->cm_fields['L'], 1024, "%s <%ld.list-id.%s>", - CC->room.QRname, - CC->room.QRnumber, + CCC->room.QRname, + CCC->room.QRnumber, config.c_fqdn ); @@ -328,8 +331,8 @@ void network_process_list(SpoolControl *sc, struct CtdlMessage *omsg, long *dele FlatSubject = NewStrBufPlain(NULL, StrLength(Subject)); StrBuf_RFC822_to_Utf8(FlatSubject, Subject, NULL, NULL); - rlen = strlen(CC->room.QRname); - pCh = strstr(ChrPtr(FlatSubject), CC->room.QRname); + rlen = strlen(CCC->room.QRname); + pCh = strstr(ChrPtr(FlatSubject), CCC->room.QRname); if ((pCh == NULL) || (*(pCh + rlen) != ']') || (pCh == ChrPtr(FlatSubject)) || @@ -339,7 +342,7 @@ void network_process_list(SpoolControl *sc, struct CtdlMessage *omsg, long *dele StrBuf *tmp; StrBufPlain(Subject, HKEY("[")); StrBufAppendBufPlain(Subject, - CC->room.QRname, + CCC->room.QRname, rlen, 0); StrBufAppendBufPlain(Subject, HKEY("] "), 0); StrBufAppendBuf(Subject, FlatSubject, 0); @@ -372,7 +375,7 @@ void network_process_list(SpoolControl *sc, struct CtdlMessage *omsg, long *dele msg->cm_fields['R'] = malloc(256); snprintf(msg->cm_fields['R'], 256, - "room_%s@%s", CC->room.QRname, + "room_%s@%s", CCC->room.QRname, config.c_fqdn); for (i=0; msg->cm_fields['R'][i]; ++i) { if (isspace(msg->cm_fields['R'][i])) { @@ -382,7 +385,7 @@ void network_process_list(SpoolControl *sc, struct CtdlMessage *omsg, long *dele } /* Handle delivery */ - network_deliver_list(msg, sc, CC->room.QRname); + network_deliver_list(msg, sc, CCC->room.QRname); CtdlFreeMessage(msg); } @@ -391,29 +394,21 @@ void network_process_list(SpoolControl *sc, struct CtdlMessage *omsg, long *dele */ void network_deliver_list(struct CtdlMessage *msg, SpoolControl *sc, const char *RoomName) { - StrBuf *recps = NULL; - char *precps = NULL; struct recptypes *valid; char bounce_to[256]; /* Don't do this if there were no recipients! */ - if (sc->RNCfg->NetConfigs[listrecp] == NULL) return; + if (sc->Users[listrecp] == NULL) + return; /* Now generate the delivery instructions */ - /* - * Figure out how big a buffer we need to allocate - */ - aggregate_recipients(&recps, listrecp, sc->RNCfg); - /* Where do we want bounces and other noise to be heard? * Surely not the list members! */ snprintf(bounce_to, sizeof bounce_to, "room_aide@%s", config.c_fqdn); /* Now submit the message */ - precps = SmashStrBuf(&recps); - valid = validate_recipients(precps, NULL, 0); - free(precps); + valid = validate_recipients(ChrPtr(sc->Users[listrecp]), NULL, 0); if (valid != NULL) { valid->bounce_to = strdup(bounce_to); valid->envelope_from = strdup(bounce_to); @@ -436,7 +431,7 @@ void network_process_participate(SpoolControl *sc, struct CtdlMessage *omsg, lon /* * Process client-side list participations for this room */ - if (sc->RNCfg->NetConfigs[participate] == NULL) + if (sc->Users[participate] == NULL) return; msg = CtdlDuplicateMessage(omsg); @@ -465,9 +460,6 @@ void network_process_participate(SpoolControl *sc, struct CtdlMessage *omsg, lon } if (ok_to_participate) { - StrBuf *recps = NULL; - char *precps; - if (msg->cm_fields['F'] != NULL) { free(msg->cm_fields['F']); } @@ -476,20 +468,22 @@ void network_process_participate(SpoolControl *sc, struct CtdlMessage *omsg, lon * actual author with the email address of the * room itself, so the remote listserv doesn't * reject us. - * FIXME I want to be able to pick any address */ - snprintf(msg->cm_fields['F'], SIZ, - "room_%s@%s", CC->room.QRname, - config.c_fqdn); + if (sc->Users[roommailalias] != NULL) + snprintf(msg->cm_fields['F'], SIZ, + "%s", ChrPtr(sc->Users[roommailalias])); + else + snprintf(msg->cm_fields['F'], SIZ, + "room_%s@%s", CC->room.QRname, + config.c_fqdn); + for (i=0; msg->cm_fields['F'][i]; ++i) { if (isspace(msg->cm_fields['F'][i])) { msg->cm_fields['F'][i] = '_'; } } - aggregate_recipients(&recps, participate, sc->RNCfg); - precps = SmashStrBuf(&recps); - valid = validate_recipients(precps, NULL, 0); + valid = validate_recipients(ChrPtr(sc->Users[participate]) , NULL, 0); if (msg->cm_fields['R'] != NULL) { free(msg->cm_fields['R']); @@ -504,6 +498,8 @@ void network_process_participate(SpoolControl *sc, struct CtdlMessage *omsg, lon void network_process_ignetpush(SpoolControl *sc, struct CtdlMessage *omsg, long *delete_after_send) { + StrBuf *Recipient; + const char *Pos = NULL; struct CtdlMessage *msg = NULL; struct CitContext *CCC = CC; struct ser_ret sermsg; @@ -512,14 +508,14 @@ void network_process_ignetpush(SpoolControl *sc, struct CtdlMessage *omsg, long FILE *fp; size_t newpath_len; char *newpath = NULL; - RoomNetCfgLine* mptr; StrBuf *Buf = NULL; int i; int bang = 0; int send = 1; - if (sc->RNCfg->NetConfigs[ignet_push_share] == NULL) + if (sc->Users[ignet_push_share] == NULL) return; + /* * Process IGnet push shares */ @@ -551,12 +547,12 @@ void network_process_ignetpush(SpoolControl *sc, struct CtdlMessage *omsg, long } /* Now send it to every node */ - for (mptr = sc->RNCfg->NetConfigs[ignet_push_share]; - mptr != NULL; - mptr = mptr->next) + Recipient = NewStrBufPlain(NULL, StrLength(sc->Users[ignet_push_share])); + while ((Pos != StrBufNOTNULL) && + StrBufExtract_NextToken(Recipient, sc->Users[ignet_push_share], &Pos, '|')) { send = 1; - NewStrBufDupAppendFlush(&Buf, mptr->Value[0], NULL, 1); + NewStrBufDupAppendFlush(&Buf, Recipient, NULL, 1); /* Check for valid node name */ if (CtdlIsValidNode(NULL, @@ -567,7 +563,7 @@ void network_process_ignetpush(SpoolControl *sc, struct CtdlMessage *omsg, long { QN_syslog(LOG_ERR, "Invalid node <%s>\n", - ChrPtr(mptr->Value[0])); + ChrPtr(Recipient)); send = 0; } @@ -583,8 +579,8 @@ void network_process_ignetpush(SpoolControl *sc, struct CtdlMessage *omsg, long sizeof buf); QN_syslog(LOG_DEBUG, "Compare <%s> to <%s>\n", - buf, ChrPtr(mptr->Value[0])) ; - if (!strcasecmp(buf, ChrPtr(mptr->Value[0]))) { + buf, ChrPtr(Recipient)) ; + if (!strcasecmp(buf, ChrPtr(Recipient))) { send = 0; break; } @@ -593,7 +589,7 @@ void network_process_ignetpush(SpoolControl *sc, struct CtdlMessage *omsg, long QN_syslog(LOG_INFO, "%sSending to %s\n", (send)?"":"Not ", - ChrPtr(mptr->Value[0])); + ChrPtr(Recipient)); } /* Send the message */ @@ -607,9 +603,9 @@ void network_process_ignetpush(SpoolControl *sc, struct CtdlMessage *omsg, long if (msg->cm_fields['C'] != NULL) { free(msg->cm_fields['C']); } - if (StrLength(mptr->Value[0]) > 0) { + if (StrLength(Recipient) > 0) { msg->cm_fields['C'] = - strdup(ChrPtr(mptr->Value[0])); + strdup(ChrPtr(Recipient)); } else { msg->cm_fields['C'] = @@ -625,7 +621,7 @@ void network_process_ignetpush(SpoolControl *sc, struct CtdlMessage *omsg, long sizeof(filename), "%s/%s@%lx%x", ctdl_netout_dir, - ChrPtr(mptr->Value[0]), + ChrPtr(Recipient), time(NULL), rand() ); @@ -653,6 +649,7 @@ void network_process_ignetpush(SpoolControl *sc, struct CtdlMessage *omsg, long } } FreeStrBuf(&Buf); + FreeStrBuf(&Recipient); CtdlFreeMessage(msg); } diff --git a/citadel/modules/network/serv_netspool.c b/citadel/modules/network/serv_netspool.c index c24f31bc5..bc7db52b3 100644 --- a/citadel/modules/network/serv_netspool.c +++ b/citadel/modules/network/serv_netspool.c @@ -142,67 +142,122 @@ void DeleteLastSent(const CfgLineType *ThisOne, RoomNetCfgLine **data) *data = NULL; } +RoomNetCfg SpoolCfgs [4] = { + listrecp, + digestrecp, + participate, + ignet_push_share +}; - -/* - * Batch up and send all outbound traffic from the current room - */ -void network_spoolout_room(RoomProcList *room_to_spool, - HashList *working_ignetcfg, - HashList *the_netmap) +int HaveSpoolConfig(OneRoomNetCfg* RNCfg) { - struct CitContext *CCC = CC; - char buf[SIZ]; - SpoolControl sc; int i; + int interested = 0; + for (i=0; i < 4; i++) if (RNCfg->NetConfigs[SpoolCfgs[i]] == NULL) interested = 1; + return interested; +} - memset(&sc, 0, sizeof(SpoolControl)); - sc.RNCfg = room_to_spool->OneRNCfg; - sc.lastsent = room_to_spool->OneRNCfg->lastsent; - sc.working_ignetcfg = working_ignetcfg; - sc.the_netmap = the_netmap; - if ((sc.RNCfg->NetConfigs[listrecp] == NULL) && - (sc.RNCfg->NetConfigs[digestrecp] == NULL) && - (sc.RNCfg->NetConfigs[participate] == NULL) && - (sc.RNCfg->NetConfigs[ignet_push_share] == NULL)) - { - /* nothing to do for this room... */ - return; - } + +void InspectQueuedRoom(SpoolControl **pSC, + RoomProcList *room_to_spool, + HashList *working_ignetcfg, + HashList *the_netmap) +{ + SpoolControl *sc; + int i = 0; + + sc = (SpoolControl*)malloc(sizeof(SpoolControl)); + memset(sc, 0, sizeof(SpoolControl)); + sc->RNCfg = room_to_spool->OneRNCfg; + sc->lastsent = room_to_spool->lastsent; + sc->working_ignetcfg = working_ignetcfg; + sc->the_netmap = the_netmap; /* * If the room doesn't exist, don't try to perform its networking tasks. * Normally this should never happen, but once in a while maybe a room gets * queued for networking and then deleted before it can happen. */ - if (CtdlGetRoom(&CCC->room, room_to_spool->name) != 0) { + if (CtdlGetRoom(&sc->room, room_to_spool->name) != 0) { syslog(LOG_CRIT, "ERROR: cannot load <%s>\n", room_to_spool->name); + free(sc); + return; + } + if (sc->room.QRhighest <= sc->lastsent) + { + syslog(LOG_DEBUG, "nothing to do for <%s>\n", room_to_spool->name); + free(sc); + return; + } + + if (sc->RNCfg == NULL) + sc->RNCfg = CtdlGetNetCfgForRoom(sc->room.QRnumber); + + if (!HaveSpoolConfig(sc->RNCfg)) + { + free(sc); + /* nothing to do for this room... */ return; } + /* Now lets remember whats needed for the actual work... */ + + for (i=0; i < 4; i++) + { + aggregate_recipients(&sc->Users[SpoolCfgs[i]], SpoolCfgs[i], sc->RNCfg); + } + if (StrLength(sc->RNCfg->Sender) > 0) + sc->Users[roommailalias] = NewStrBufDup(sc->RNCfg->Sender); + + sc->next = *pSC; + *pSC = sc; + +} + + +/* + * Batch up and send all outbound traffic from the current room + */ +void network_spoolout_room(SpoolControl *sc) +{ + struct CitContext *CCC = CC; + char buf[SIZ]; + int i; + long lastsent; + + /* + * If the room doesn't exist, don't try to perform its networking tasks. + * Normally this should never happen, but once in a while maybe a room gets + * queued for networking and then deleted before it can happen. + */ + memcpy (&CCC->room, &sc->room, sizeof(ctdlroom)); + syslog(LOG_INFO, "Networking started for <%s>\n", CCC->room.QRname); /* If there are digest recipients, we have to build a digest */ - if (sc.RNCfg->NetConfigs[digestrecp] != NULL) { - sc.digestfp = tmpfile(); - fprintf(sc.digestfp, "Content-type: text/plain\n\n"); + if (sc->Users[digestrecp] != NULL) { + sc->digestfp = tmpfile(); + fprintf(sc->digestfp, "Content-type: text/plain\n\n"); } + /* remember where we started... */ + lastsent = sc->lastsent; + /* Do something useful */ - CtdlForEachMessage(MSGS_GT, sc.lastsent, NULL, NULL, NULL, - network_spool_msg, &sc); + CtdlForEachMessage(MSGS_GT, sc->lastsent, NULL, NULL, NULL, + network_spool_msg, sc); /* If we wrote a digest, deliver it and then close it */ - if (StrLength(sc.RNCfg->Sender) > 0) + if (StrLength(sc->Users[roommailalias]) > 0) { long len; - len = StrLength(sc.RNCfg->Sender); + len = StrLength(sc->Users[roommailalias]); if (len + 1 > sizeof(buf)) len = sizeof(buf) - 1; - memcpy(buf, ChrPtr(sc.RNCfg->Sender), len); + memcpy(buf, ChrPtr(sc->Users[roommailalias]), len); buf[len] = '\0'; } else @@ -215,8 +270,8 @@ void network_spoolout_room(RoomProcList *room_to_spool, buf[i] = tolower(buf[i]); if (isspace(buf[i])) buf[i] = '_'; } - if (sc.digestfp != NULL) { - fprintf(sc.digestfp, + if (sc->digestfp != NULL) { + fprintf(sc->digestfp, " -----------------------------------" "------------------------------------" "-------\n" @@ -225,14 +280,16 @@ void network_spoolout_room(RoomProcList *room_to_spool, "To post to the list: %s\n", CCC->room.QRname, buf ); - network_deliver_digest(&sc); /* deliver and close */ + network_deliver_digest(sc); /* deliver and close */ } /* Now rewrite the config file */ - if (sc.lastsent != room_to_spool->OneRNCfg->lastsent) + if (sc->lastsent != lastsent) { - room_to_spool->OneRNCfg->lastsent = sc.lastsent; - room_to_spool->OneRNCfg->changed = 1; + sc->RNCfg = CtdlGetNetCfgForRoom(sc->room.QRnumber); + + sc->RNCfg->lastsent = sc->lastsent; + sc->RNCfg->changed = 1; } end_critical_section(S_NETCONFIGS); } diff --git a/citadel/modules/network/serv_network.c b/citadel/modules/network/serv_network.c index ebc00e04c..66bf69c6b 100644 --- a/citadel/modules/network/serv_network.c +++ b/citadel/modules/network/serv_network.c @@ -254,6 +254,7 @@ RoomProcList *CreateRoomProcListEntry(struct ctdlroom *qrbuf, OneRoomNetCfg *One ptr->lcname[ptr->namelen] = '\0'; ptr->key = hashlittle(ptr->lcname, ptr->namelen, 9872345); + ptr->lastsent = OneRNCFG->lastsent; ptr->OneRNCfg = OneRNCFG; return ptr; } @@ -266,6 +267,9 @@ void network_queue_interesting_rooms(struct ctdlroom *qrbuf, void *data, OneRoom struct RoomProcList *ptr; roomlists *RP = (roomlists*) data; + if (!HaveSpoolConfig(OneRNCfg)) + return; + ptr = CreateRoomProcListEntry(qrbuf, OneRNCfg); ptr->next = RP->rplist; @@ -287,10 +291,14 @@ int network_room_handler (struct ctdlroom *qrbuf) if (RNCfg == NULL) return 1; + if (!HaveSpoolConfig(RNCfg)) + return 1; + ptr = CreateRoomProcListEntry(qrbuf, RNCfg); if (ptr == NULL) return 1; + ptr->OneRNCfg = NULL; begin_critical_section(S_RPLIST); ptr->next = rplist; rplist = ptr; @@ -428,7 +436,16 @@ void network_bounce(struct CtdlMessage *msg, char *reason) } - +void free_network_spoolout_room(SpoolControl *sc) +{ + if (sc != NULL) + { + int i; + for (i = subpending; i < maxRoomNetCfg; i++) + FreeStrBuf(&sc->Users[i]); + free(sc); + } +} @@ -447,6 +464,8 @@ void network_do_queue(void) HashList *the_netmap = NULL; int netmap_changed = 0; roomlists RL; + SpoolControl *sc = NULL; + SpoolControl *pSC; /* * Run the full set of processing tasks no more frequently @@ -508,14 +527,30 @@ void network_do_queue(void) } if (ptr->namelen > 0) { - network_spoolout_room(ptr, - working_ignetcfg, - the_netmap); + InspectQueuedRoom(&sc, + ptr, + working_ignetcfg, + the_netmap); } ptr = ptr->next; } } + + pSC = sc; + while (pSC != NULL) + { + network_spoolout_room(pSC); + pSC = pSC->next; + } + + pSC = sc; + while (pSC != NULL) + { + sc = pSC->next; + free_network_spoolout_room(pSC); + pSC = sc; + } /* If there is anything in the inbound queue, process it */ if (!server_shutting_down) { network_do_spoolin(working_ignetcfg, -- 2.30.2