2 * This module handles shared rooms, inter-Citadel mail, and outbound
3 * mailing list processing.
5 * Copyright (c) 2000-2011 by the citadel.org team
7 * This program is open source software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 * ** NOTE ** A word on the S_NETCONFIGS semaphore:
22 * This is a fairly high-level type of critical section. It ensures that no
23 * two threads work on the netconfigs files at the same time. Since we do
24 * so many things inside these, here are the rules:
25 * 1. begin_critical_section(S_NETCONFIGS) *before* begin_ any others.
26 * 2. Do *not* perform any I/O with the client during these sections.
41 #include <sys/types.h>
43 #if TIME_WITH_SYS_TIME
44 # include <sys/time.h>
48 # include <sys/time.h>
56 # if HAVE_SYS_SYSCALL_H
57 # include <sys/syscall.h>
64 #include <libcitadel.h>
67 #include "citserver.h"
73 #include "internet_addressing.h"
74 #include "serv_network.h"
75 #include "clientsocket.h"
77 #include "citadel_dirs.h"
86 #include "netconfig.h"
87 #include "ctdl_module.h"
89 struct CitContext networker_client_CC;
91 typedef enum _eNWCState {
106 typedef struct _async_networker {
108 DNSQueryParts HostLookup;
111 StrBuf *SpoolFileName;
112 StrBuf *tempFileName;
120 typedef eNextState(*NWClientHandler)(AsyncNetworker* NW);
121 eNextState nwc_get_one_host_ip(AsyncIO *IO);
123 eNextState nwc_connect_ip(AsyncIO *IO);
125 eNextState NWC_SendQUIT(AsyncNetworker *NW);
126 eNextState NWC_DispatchWriteDone(AsyncIO *IO);
129 void DestroyNetworker(AsyncNetworker *NW)
133 #define NWC_DBG_SEND() syslog(LOG_DEBUG, "NW client[%ld]: > %s", NW->n, ChrPtr(NW->IO.SendBuf.Buf))
134 #define NWC_DBG_READ() syslog(LOG_DEBUG, "NW client[%ld]: < %s\n", NW->n, ChrPtr(NW->IO.IOBuf))
135 #define NWC_OK (strncasecmp(ChrPtr(NW->IO.IOBuf), "+OK", 3) == 0)
137 eNextState NWC_ReadGreeting(AsyncNetworker *NW)
139 char connected_to[SIZ];
141 /* Read the server greeting */
142 /* Check that the remote is who we think it is and warn the Aide if not */
143 extract_token (connected_to, ChrPtr(NW->IO.IOBuf), 1, ' ', sizeof connected_to);
144 if (strcmp(connected_to, ChrPtr(NW->node)) != 0)
146 StrBufPrintf(NW->IO.ErrMsg,
147 "Connected to node \"%s\" but I was expecting to connect to node \"%s\".",
148 connected_to, ChrPtr(NW->node));
149 syslog(LOG_ERR, "%s\n", ChrPtr(NW->IO.ErrMsg));
150 CtdlAideMessage(ChrPtr(NW->IO.ErrMsg), "Network error");
151 return eAbort;/// todo: aide message in anderer queue speichern
156 eNextState NWC_SendAuth(AsyncNetworker *NW)
158 /* We're talking to the correct node. Now identify ourselves. */
159 StrBufPrintf(NW->IO.SendBuf.Buf, "NETP %s|%s\n",
166 eNextState NWC_ReadAuthReply(AsyncNetworker *NW)
169 if (ChrPtr(NW->IO.IOBuf)[0] == '2')
175 StrBufPrintf(NW->IO.ErrMsg,
176 "Connected to node \"%s\" but my secret wasn't accurate.",
178 syslog(LOG_ERR, "%s\n", ChrPtr(NW->IO.ErrMsg));
179 CtdlAideMessage(ChrPtr(NW->IO.ErrMsg), "Network error");
185 eNextState NWC_SendNDOP(AsyncNetworker *NW)
187 NW->tempFileName = NewStrBuf();
188 NW->SpoolFileName = NewStrBuf();
189 StrBufPrintf(NW->tempFileName,
193 time(NULL),// TODO: get time from libev
195 StrBufPrintf(NW->SpoolFileName,
199 time(NULL),// TODO: get time from libev
202 /* We're talking to the correct node. Now identify ourselves. */
203 StrBufPlain(NW->IO.SendBuf.Buf, HKEY("NDOP\n"));
208 eNextState NWC_ReadNDOPReply(AsyncNetworker *NW)
212 if (ChrPtr(NW->IO.IOBuf)[0] == '2')
215 NW->IO.IOB.TotalSentAlready = 0;
216 TotalSendSize = atol (ChrPtr(NW->IO.IOBuf) + 4);
217 syslog(LOG_DEBUG, "Expecting to transfer %ld bytes\n", NW->IO.IOB.TotalSendSize);
218 if (TotalSendSize <= 0) {
219 NW->State = eNUOP - 1;
223 fd = open(ChrPtr(NW->SpoolFileName),
224 O_EXCL|O_CREAT|O_NONBLOCK|O_WRONLY,
229 "cannot open %s: %s\n",
230 ChrPtr(NW->SpoolFileName),
233 NW->State = eQUIT - 1;
236 FDIOBufferInit(&NW->IO.IOB, &NW->IO.RecvBuf, fd, TotalSendSize);
246 eNextState NWC_SendREAD(AsyncNetworker *NW)
248 if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize)
251 * If shutting down we can exit here and unlink the temp file.
252 * this shouldn't loose us any messages.
254 if (server_shutting_down)
256 close(NW->IO.IOB.OtherFD);
257 ////// unlink(ChrPtr(NW->tempFileName));
260 StrBufPrintf(NW->IO.SendBuf.Buf, "READ %ld|%ld\n",
261 NW->IO.IOB.TotalSentAlready,
262 ((NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready > IGNET_PACKET_SIZE)
263 ? IGNET_PACKET_SIZE :
264 (NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready))
271 else {} // continue sending
275 eNextState NWC_ReadREADState(AsyncNetworker *NW)
278 if (ChrPtr(NW->IO.IOBuf)[0] == '6')
280 NW->IO.IOB.ChunkSendRemain =
281 NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
282 /// NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize;
283 /// TODO StrBufReadjustIOBuffer(NW->IO.RecvBuf, NW->BlobReadSize);
288 eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW);
289 eNextState NWC_ReadREADBlob(AsyncNetworker *NW)
291 /// FlushIOBuffer(NW->IO.RecvBuf); /// TODO
293 ///NW->bytes_received += NW->IO.IOB.ChunkSize;
295 if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize)
297 NW->State = eREAD - 1;
298 return eSendReply;/* now fetch next chunk*/
301 return NWC_ReadREADBlobDone(NW);
304 eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW)
306 if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
310 close(NW->IO.IOB.OtherFD);
312 if (link(ChrPtr(NW->SpoolFileName), ChrPtr(NW->tempFileName)) != 0) {
314 "Could not link %s to %s: %s\n",
315 ChrPtr(NW->tempFileName),
316 ChrPtr(NW->SpoolFileName),
320 ///// unlink(ChrPtr(NW->tempFileName));
321 return NWC_DispatchWriteDone(&NW->IO);
325 return NWC_DispatchWriteDone(&NW->IO);
328 eNextState NWC_SendCLOS(AsyncNetworker *NW)
330 StrBufPlain(NW->IO.SendBuf.Buf, HKEY("CLOS\n"));
331 //// unlink(ChrPtr(NW->tempFileName));
335 eNextState NWC_ReadCLOSReply(AsyncNetworker *NW)
338 if (ChrPtr(NW->IO.IOBuf)[0] != '2')
339 return eTerminateConnection;
344 eNextState NWC_SendNUOP(AsyncNetworker *NW)
350 StrBufPrintf(NW->tempFileName,
354 fd = open(ChrPtr(NW->tempFileName), O_RDONLY);
356 if (errno != ENOENT) {
358 "cannot open %s: %s\n",
359 ChrPtr(NW->tempFileName),
363 return NWC_SendQUIT(NW);
366 if (fstat(fd, &statbuf) == -1) {
367 syslog(9, "FSTAT FAILED %s [%s]--\n",
368 ChrPtr(NW->tempFileName),
370 if (fd > 0) close(fd);
373 TotalSendSize = statbuf.st_size;
374 if (TotalSendSize == 0) {
376 "Nothing to send.\n");
378 return NWC_SendQUIT(NW);
380 FDIOBufferInit(&NW->IO.IOB, &NW->IO.SendBuf, fd, TotalSendSize);
382 //// NW->bytes_written = 0;
384 StrBufPlain(NW->IO.SendBuf.Buf, HKEY("NUOP\n"));
388 eNextState NWC_ReadNUOPReply(AsyncNetworker *NW)
391 if (ChrPtr(NW->IO.IOBuf)[0] != '2')
396 eNextState NWC_SendWRIT(AsyncNetworker *NW)
398 StrBufPrintf(NW->IO.SendBuf.Buf, "WRIT %ld\n",
399 NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready);
403 eNextState NWC_ReadWRITReply(AsyncNetworker *NW)
406 if (ChrPtr(NW->IO.IOBuf)[0] != '7')
411 NW->IO.IOB.ChunkSendRemain =
412 NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
413 /// NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize;
417 eNextState NWC_SendBlobDone(AsyncNetworker *NW)
420 if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
424 close(NW->IO.IOB.OtherFD);
425 //// TODO: unlink networker file?
426 rc = NWC_DispatchWriteDone(&NW->IO);
432 return NWC_DispatchWriteDone(&NW->IO);
436 eNextState NWC_SendUCLS(AsyncNetworker *NW)
438 StrBufPlain(NW->IO.SendBuf.Buf, HKEY("UCLS 1\n"));
442 eNextState NWC_ReadUCLS(AsyncNetworker *NW)
446 syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", NW->IO.IOB.ChunkSize, ChrPtr(NW->node));
447 /// syslog(LOG_DEBUG, "<%s\n", buf);
448 if (ChrPtr(NW->IO.IOBuf)[0] == '2') {
449 syslog(LOG_DEBUG, "Removing <%s>\n", ChrPtr(NW->tempFileName));
450 /// unlink(ChrPtr(NW->tempFileName));
455 eNextState NWC_SendQUIT(AsyncNetworker *NW)
457 StrBufPlain(NW->IO.SendBuf.Buf, HKEY("QUIT\n"));
459 network_talking_to(ChrPtr(NW->node), NTT_REMOVE);
463 eNextState NWC_ReadQUIT(AsyncNetworker *NW)
467 return eTerminateConnection;
471 NWClientHandler NWC_ReadHandlers[] = {
485 const long NWC_SendTimeouts[] = {
495 const ConstStr NWC[] = {
496 {HKEY("Connection broken during ")},
497 {HKEY("Connection broken during ")},
498 {HKEY("Connection broken during ")},
499 {HKEY("Connection broken during ")},
500 {HKEY("Connection broken during ")},
501 {HKEY("Connection broken during ")},
502 {HKEY("Connection broken during ")},
503 {HKEY("Connection broken during ")}
506 NWClientHandler NWC_SendHandlers[] = {
511 NWC_ReadREADBlobDone,
520 const long NWC_ReadTimeouts[] = {
536 eNextState nwc_get_one_host_ip_done(AsyncIO *IO)
538 AsyncNetworker *NW = IO->Data;
539 struct hostent *hostent;
543 hostent = NW->HostLookup.VParsedDNSReply;
544 if ((NW->HostLookup.DNSStatus == ARES_SUCCESS) &&
545 (hostent != NULL) ) {
546 memset(&NW->IO.ConnectMe->Addr, 0, sizeof(struct in6_addr));
547 if (NW->IO.ConnectMe->IPv6) {
548 memcpy(&NW->IO.ConnectMe->Addr.sin6_addr.s6_addr,
549 &hostent->h_addr_list[0],
550 sizeof(struct in6_addr));
552 NW->IO.ConnectMe->Addr.sin6_family = hostent->h_addrtype;
553 NW->IO.ConnectMe->Addr.sin6_port = htons(atol(ChrPtr(NW->port)));//// TODO use the one from the URL.
556 struct sockaddr_in *addr = (struct sockaddr_in*) &NW->IO.ConnectMe->Addr;
557 /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
558 // addr->sin_addr.s_addr = htonl((uint32_t)&hostent->h_addr_list[0]);
559 memcpy(&addr->sin_addr.s_addr,
560 hostent->h_addr_list[0],
563 addr->sin_family = hostent->h_addrtype;
564 addr->sin_port = htons(504);/// default citadel port
567 return nwc_connect_ip(IO);
574 eNextState nwc_get_one_host_ip(AsyncIO *IO)
576 AsyncNetworker *NW = IO->Data;
578 * here we start with the lookup of one host. it might be...
579 * - the relay host *sigh*
580 * - the direct hostname if there was no mx record
586 syslog(LOG_DEBUG, "NWC: %s\n", __FUNCTION__);
589 "NWC client[%ld]: looking up %s-Record %s : %d ...\n",
591 (NW->IO.ConnectMe->IPv6)? "aaaa": "a",
592 NW->IO.ConnectMe->Host,
593 NW->IO.ConnectMe->Port);
595 QueueQuery((NW->IO.ConnectMe->IPv6)? ns_t_aaaa : ns_t_a,
596 NW->IO.ConnectMe->Host,
599 nwc_get_one_host_ip_done);
600 IO->NextState = eReadDNSReply;
601 return IO->NextState;
604 * @brief lineread Handler; understands when to read more POP3 lines, and when this is a one-lined reply.
606 eReadState NWC_ReadServerStatus(AsyncIO *IO)
608 AsyncNetworker *NW = IO->Data;
609 eReadState Finished = eBufferNotEmpty;
611 switch (IO->NextState) {
616 case eTerminateConnection:
618 Finished = eReadFail;
624 Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
629 ////TODO Finished = IOBufferStrLength(&IO->RecvBuf) >= NW->BlobReadSize;
637 eNextState NWC_FailNetworkConnection(AsyncIO *IO)
639 return eTerminateConnection;
643 eNextState NWC_DispatchReadDone(AsyncIO *IO)
645 syslog(LOG_DEBUG, "NWC: %s\n", __FUNCTION__);
646 AsyncNetworker *NW = IO->Data;
649 rc = NWC_ReadHandlers[NW->State](NW);
652 ////NWCSetTimeout(rc, NW);
655 eNextState NWC_DispatchWriteDone(AsyncIO *IO)
657 syslog(LOG_DEBUG, "NWC: %s\n", __FUNCTION__);
658 AsyncNetworker *NW = IO->Data;
661 rc = NWC_SendHandlers[NW->State](NW);
662 ////NWCSetTimeout(rc, NW);
666 /*****************************************************************************/
667 /* Networker CLIENT ERROR CATCHERS */
668 /*****************************************************************************/
669 eNextState NWC_Terminate(AsyncIO *IO)
671 syslog(LOG_DEBUG, "Nw: %s\n", __FUNCTION__);
672 /// FinalizeNetworker(IO); TODO
676 eNextState NWC_Timeout(AsyncIO *IO)
678 // AsyncNetworker *NW = IO->Data;
680 syslog(LOG_DEBUG, "NW: %s\n", __FUNCTION__);
681 // StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State])); todo
682 return NWC_FailNetworkConnection(IO);
684 eNextState NWC_ConnFail(AsyncIO *IO)
686 /// AsyncNetworker *NW = IO->Data;
688 syslog(LOG_DEBUG, "NW: %s\n", __FUNCTION__);
689 //// StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State])); todo
690 return NWC_FailNetworkConnection(IO);
692 eNextState NWC_DNSFail(AsyncIO *IO)
694 /// AsyncNetworker *NW = IO->Data;
696 syslog(LOG_DEBUG, "NW: %s\n", __FUNCTION__);
697 //// StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State])); todo
698 return NWC_FailNetworkConnection(IO);
700 eNextState NWC_Shutdown(AsyncIO *IO)
702 syslog(LOG_DEBUG, "NW: %s\n", __FUNCTION__);
703 //// pop3aggr *pMsg = IO->Data;
705 ////pMsg->MyQEntry->Status = 3;
706 ///StrBufPlain(pMsg->MyQEntry->StatusMessage, HKEY("server shutdown during message retrieval."));
707 /// FinalizePOP3AggrRun(IO); todo
712 eNextState nwc_connect_ip(AsyncIO *IO)
714 AsyncNetworker *NW = IO->Data;
716 syslog(LOG_DEBUG, "NW: %s\n", __FUNCTION__);
717 syslog(LOG_DEBUG, "network: polling <%s>\n", ChrPtr(NW->node));
718 syslog(LOG_NOTICE, "Connecting to <%s> at %s:%s\n",
723 //// IO->ConnectMe = &NW->Pop3Host;
724 /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
726 /////// SetConnectStatus(IO);
728 return InitEventIO(IO, NW, 100, 100, 1); /*
734 void RunNetworker(AsyncNetworker *NW)
738 ParseURL(&NW->IO.ConnectMe, NW->Url, 504);
741 NW->IO.SendDone = NWC_DispatchWriteDone;
742 NW->IO.ReadDone = NWC_DispatchReadDone;
743 NW->IO.Terminate = NWC_Terminate;
744 NW->IO.LineReader = NWC_ReadServerStatus;
745 NW->IO.ConnFail = NWC_ConnFail;
746 NW->IO.DNSFail = NWC_DNSFail;
747 NW->IO.Timeout = NWC_Timeout;
748 NW->IO.ShutdownAbort = NWC_Shutdown;
750 NW->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024);
751 NW->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
752 NW->IO.IOBuf = NewStrBuf();
754 NW->IO.NextState = eReadMessage;
755 SubC = CloneContext (&networker_client_CC);
756 SubC->session_specific_data = (char*) NW;
757 NW->IO.CitContext = SubC;
759 if (NW->IO.ConnectMe->IsIP) {
760 QueueEventContext(&NW->IO,
763 else { /* uneducated admin has chosen to add DNS to the equation... */
764 QueueEventContext(&NW->IO,
765 nwc_get_one_host_ip);
770 * Poll other Citadel nodes and transfer inbound/outbound network data.
771 * Set "full" to nonzero to force a poll of every node, or to zero to poll
772 * only nodes to which we have data to send.
774 void network_poll_other_citadel_nodes(int full_poll, char *working_ignetcfg)
785 if ((working_ignetcfg == NULL) || (*working_ignetcfg == '\0')) {
786 syslog(LOG_DEBUG, "network: no neighbor nodes are configured - not polling.\n");
789 CfgData = NewStrBufPlain(working_ignetcfg, -1);
790 Line = NewStrBufPlain(NULL, StrLength(CfgData));
795 /* Use the string tokenizer to grab one line at a time */
796 StrBufSipLine(Line, CfgData, &CfgPtr);
797 Done = CfgPtr == StrBufNOTNULL;
798 if (StrLength(Line) > 0)
800 if(server_shutting_down)
801 return;/* TODO free stuff*/
804 NW = (AsyncNetworker*)malloc(sizeof(AsyncNetworker));
805 memset(NW, 0, sizeof(AsyncNetworker));
807 NW->node = NewStrBufPlain(NULL, StrLength(Line));
808 NW->host = NewStrBufPlain(NULL, StrLength(Line));
809 NW->port = NewStrBufPlain(NULL, StrLength(Line));
810 NW->secret = NewStrBufPlain(NULL, StrLength(Line));
812 StrBufExtract_NextToken(NW->node, Line, &lptr, '|');
813 StrBufExtract_NextToken(NW->secret, Line, &lptr, '|');
814 StrBufExtract_NextToken(NW->host, Line, &lptr, '|');
815 StrBufExtract_NextToken(NW->port, Line, &lptr, '|');
816 if ( (StrLength(NW->node) != 0) &&
817 (StrLength(NW->secret) != 0) &&
818 (StrLength(NW->host) != 0) &&
819 (StrLength(NW->port) != 0))
824 NW->SpoolFileName = NewStrBufPlain(ctdl_netout_dir, -1);
825 StrBufAppendBufPlain(NW->SpoolFileName, HKEY("/"), 0);
826 StrBufAppendBuf(NW->SpoolFileName, NW->node, 0);
827 if (access(ChrPtr(NW->SpoolFileName), R_OK) == 0) {
833 NW->Url = NewStrBufPlain(NULL, StrLength(Line));
834 StrBufPrintf(NW->Url, "citadel://:%s@%s:%s",
838 if (!network_talking_to(ChrPtr(NW->node), NTT_CHECK))
840 network_talking_to(ChrPtr(NW->node), NTT_ADD);
845 DestroyNetworker(NW);
852 void network_do_clientqueue(void)
854 char *working_ignetcfg;
855 int full_processing = 1;
856 static time_t last_run = 0L;
859 * Run the full set of processing tasks no more frequently
860 * than once every n seconds
862 if ( (time(NULL) - last_run) < config.c_net_freq ) {
864 syslog(LOG_DEBUG, "Network full processing in %ld seconds.\n",
865 config.c_net_freq - (time(NULL)- last_run)
869 working_ignetcfg = load_working_ignetcfg();
871 * Poll other Citadel nodes. Maybe. If "full_processing" is set
872 * then we poll everyone. Otherwise we only poll nodes we have stuff
875 network_poll_other_citadel_nodes(full_processing, working_ignetcfg);
876 if (working_ignetcfg)
877 free(working_ignetcfg);
885 CTDL_MODULE_INIT(network_client)
889 CtdlFillSystemContext(&networker_client_CC, "CitNetworker");
891 CtdlRegisterSessionHook(network_do_clientqueue, EVT_TIMER);
893 return "network_client";