#include "netconfig.h"
#include "ctdl_module.h"
-/*
- * receive network spool from the remote system
- */
-void receive_spool(int *sock, char *remote_nodename) {
- int download_len = 0L;
- int bytes_received = 0L;
- char buf[SIZ];
- char tempfilename[PATH_MAX];
- char permfilename[PATH_MAX];
- int plen;
- FILE *fp;
-
- snprintf(tempfilename,
- sizeof tempfilename,
- "%s/%s.%lx%x",
- ctdl_nettmp_dir,
- remote_nodename,
- time(NULL),
- rand()
- );
-
- snprintf(permfilename,
- sizeof permfilename,
- "%s/%s.%lx%x",
- ctdl_netin_dir,
- remote_nodename,
- time(NULL),
- rand()
- );
-
- if (sock_puts(sock, "NDOP") < 0) return;
- if (sock_getln(sock, buf, sizeof buf) < 0) return;
- syslog(LOG_DEBUG, "<%s\n", buf);
- if (buf[0] != '2') {
- return;
+struct CitContext networker_client_CC;
+
+#define NODE ChrPtr(((AsyncNetworker*)IO->Data)->node)
+#define N ((AsyncNetworker*)IO->Data)->n
+
+#define EVN_syslog(LEVEL, FORMAT, ...) \
+ syslog(LEVEL, \
+ "IO[%ld]CC[%d]NW[%s][%ld]" FORMAT, \
+ IO->ID, CCID, NODE, N, __VA_ARGS__)
+
+#define EVNM_syslog(LEVEL, FORMAT) \
+ syslog(LEVEL, \
+ "IO[%ld]CC[%d]NW[%s][%ld]" FORMAT, \
+ IO->ID, CCID, NODE, N)
+
+#define EVNCS_syslog(LEVEL, FORMAT, ...) \
+ syslog(LEVEL, "IO[%ld]NW[%s][%ld]" FORMAT, \
+ IO->ID, NODE, N, __VA_ARGS__)
+
+#define EVNCSM_syslog(LEVEL, FORMAT) \
+ syslog(LEVEL, "IO[%ld]NW[%s][%ld]" FORMAT, \
+ IO->ID, NODE, N)
+
+
+typedef enum _eNWCState {
+ eeGreating,
+ eAuth,
+ eNDOP,
+ eREAD,
+ eReadBLOB,
+ eCLOS,
+ eNUOP,
+ eWRIT,
+ eWriteBLOB,
+ eUCLS,
+ eQUIT
+}eNWCState;
+
+
+typedef struct _async_networker {
+ AsyncIO IO;
+ DNSQueryParts HostLookup;
+ eNWCState State;
+ long n;
+ StrBuf *SpoolFileName;
+ StrBuf *tempFileName;
+ StrBuf *node;
+ StrBuf *host;
+ StrBuf *port;
+ StrBuf *secret;
+ StrBuf *Url;
+} AsyncNetworker;
+
+typedef eNextState(*NWClientHandler)(AsyncNetworker* NW);
+eNextState nwc_get_one_host_ip(AsyncIO *IO);
+
+eNextState nwc_connect_ip(AsyncIO *IO);
+
+eNextState NWC_SendQUIT(AsyncNetworker *NW);
+eNextState NWC_DispatchWriteDone(AsyncIO *IO);
+
+void DeleteNetworker(void *vptr)
+{
+ AsyncNetworker *NW = (AsyncNetworker *)vptr;
+ FreeStrBuf(&NW->SpoolFileName);
+ FreeStrBuf(&NW->tempFileName);
+ FreeStrBuf(&NW->node);
+ FreeStrBuf(&NW->host);
+ FreeStrBuf(&NW->port);
+ FreeStrBuf(&NW->secret);
+ FreeStrBuf(&NW->Url);
+ FreeAsyncIOContents(&NW->IO);
+ free(NW);
+}
+
+#define NWC_DBG_SEND() EVN_syslog(LOG_DEBUG, ": > %s", ChrPtr(NW->IO.SendBuf.Buf))
+#define NWC_DBG_READ() EVN_syslog(LOG_DEBUG, ": < %s\n", ChrPtr(NW->IO.IOBuf))
+#define NWC_OK (strncasecmp(ChrPtr(NW->IO.IOBuf), "+OK", 3) == 0)
+
+eNextState FinalizeNetworker(AsyncIO *IO)
+{
+ AsyncNetworker *NW = (AsyncNetworker *)IO->Data;
+
+ network_talking_to(SKEY(NW->node), NTT_REMOVE);
+
+ DeleteNetworker(IO->Data);
+ return eAbort;
+}
+
+eNextState NWC_ReadGreeting(AsyncNetworker *NW)
+{
+ char connected_to[SIZ];
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
+ /* Read the server greeting */
+ /* Check that the remote is who we think it is and warn the Aide if not */
+ extract_token (connected_to, ChrPtr(NW->IO.IOBuf), 1, ' ', sizeof connected_to);
+ if (strcmp(connected_to, ChrPtr(NW->node)) != 0)
+ {
+ StrBufPrintf(NW->IO.ErrMsg,
+ "Connected to node \"%s\" but I was expecting to connect to node \"%s\".",
+ connected_to, ChrPtr(NW->node));
+ EVN_syslog(LOG_ERR, "%s\n", ChrPtr(NW->IO.ErrMsg));
+ CtdlAideMessage(ChrPtr(NW->IO.ErrMsg), "Network error");
+ return eAbort;/// todo: aide message in anderer queue speichern
}
+ return eSendReply;
+}
- download_len = extract_long(&buf[4], 0);
- if (download_len <= 0) {
- return;
+eNextState NWC_SendAuth(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ /* We're talking to the correct node. Now identify ourselves. */
+ StrBufPrintf(NW->IO.SendBuf.Buf, "NETP %s|%s\n",
+ config.c_nodename,
+ ChrPtr(NW->secret));
+ NWC_DBG_SEND();
+ return eSendReply;
+}
+
+eNextState NWC_ReadAuthReply(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
+ if (ChrPtr(NW->IO.IOBuf)[0] == '2')
+ {
+ return eSendReply;
+ }
+ else
+ {
+ StrBufPrintf(NW->IO.ErrMsg,
+ "Connected to node \"%s\" but my secret wasn't accurate.",
+ ChrPtr(NW->node));
+ EVN_syslog(LOG_ERR, "%s\n", ChrPtr(NW->IO.ErrMsg));
+ CtdlAideMessage(ChrPtr(NW->IO.ErrMsg), "Network error");
+
+ return eAbort;
}
+}
- bytes_received = 0L;
- fp = fopen(tempfilename, "w");
- if (fp == NULL) {
- syslog(LOG_CRIT, "Cannot create %s: %s\n", tempfilename, strerror(errno));
- return;
+eNextState NWC_SendNDOP(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ NW->tempFileName = NewStrBuf();
+ NW->SpoolFileName = NewStrBuf();
+ StrBufPrintf(NW->tempFileName,
+ "%s/%s.%lx%x",
+ ctdl_netin_dir,
+ ChrPtr(NW->node),
+ time(NULL),// TODO: get time from libev
+ rand());
+ StrBufPrintf(NW->SpoolFileName,
+ "%s/%s.%lx%x",
+ ctdl_nettmp_dir,
+ ChrPtr(NW->node),
+ time(NULL),// TODO: get time from libev
+ rand());
+
+ /* We're talking to the correct node. Now identify ourselves. */
+ StrBufPlain(NW->IO.SendBuf.Buf, HKEY("NDOP\n"));
+ NWC_DBG_SEND();
+ return eSendReply;
+}
+
+eNextState NWC_ReadNDOPReply(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ int TotalSendSize;
+ NWC_DBG_READ();
+ if (ChrPtr(NW->IO.IOBuf)[0] == '2')
+ {
+
+ NW->IO.IOB.TotalSentAlready = 0;
+ TotalSendSize = atol (ChrPtr(NW->IO.IOBuf) + 4);
+ EVN_syslog(LOG_DEBUG, "Expecting to transfer %ld bytes\n", NW->IO.IOB.TotalSendSize);
+ if (TotalSendSize <= 0) {
+ NW->State = eNUOP - 1;
+ }
+ else {
+ int fd;
+ fd = open(ChrPtr(NW->SpoolFileName),
+ O_EXCL|O_CREAT|O_NONBLOCK|O_WRONLY,
+ S_IRUSR|S_IWUSR);
+ if (fd < 0)
+ {
+ EVN_syslog(LOG_CRIT,
+ "cannot open %s: %s\n",
+ ChrPtr(NW->SpoolFileName),
+ strerror(errno));
+
+ NW->State = eQUIT - 1;
+ return eAbort;
+ }
+ FDIOBufferInit(&NW->IO.IOB, &NW->IO.RecvBuf, fd, TotalSendSize);
+ }
+ return eSendReply;
}
+ else
+ {
+ return eAbort;
+ }
+}
- syslog(LOG_DEBUG, "Expecting to transfer %d bytes\n", download_len);
- while (bytes_received < download_len) {
+eNextState NWC_SendREAD(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ eNextState rc;
+
+ if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize)
+ {
/*
* If shutting down we can exit here and unlink the temp file.
* this shouldn't loose us any messages.
*/
if (server_shutting_down)
{
- fclose(fp);
- unlink(tempfilename);
- return;
+ FDIOBufferDelete(&NW->IO.IOB);
+ unlink(ChrPtr(NW->tempFileName));
+ return eAbort;
}
- snprintf(buf, sizeof buf, "READ %d|%d",
- bytes_received,
- ((download_len - bytes_received > IGNET_PACKET_SIZE)
- ? IGNET_PACKET_SIZE : (download_len - bytes_received))
- );
+ StrBufPrintf(NW->IO.SendBuf.Buf, "READ %ld|%ld\n",
+ NW->IO.IOB.TotalSentAlready,
+ NW->IO.IOB.TotalSendSize);
+/*
+ ((NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready > IGNET_PACKET_SIZE)
+ ? IGNET_PACKET_SIZE :
+ (NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready))
+ );
+*/
+ NWC_DBG_SEND();
+ return eSendReply;
+ }
+ else
+ {
+ NW->State = eCLOS;
+ rc = NWC_DispatchWriteDone(&NW->IO);
+ NWC_DBG_SEND();
+
+ return rc;
+ }
+}
+
+eNextState NWC_ReadREADState(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
+ if (ChrPtr(NW->IO.IOBuf)[0] == '6')
+ {
+ NW->IO.IOB.ChunkSendRemain =
+ NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
+ return eReadFile;
+ }
+ return eAbort;
+}
+eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW);
+eNextState NWC_ReadREADBlob(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
+ if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
+ {
+ NW->State ++;
+
+ FDIOBufferDelete(&NW->IO.IOB);
- if (sock_puts(sock, buf) < 0) {
- fclose(fp);
- unlink(tempfilename);
- return;
- }
- if (sock_getln(sock, buf, sizeof buf) < 0) {
- fclose(fp);
- unlink(tempfilename);
- return;
+ if (link(ChrPtr(NW->SpoolFileName), ChrPtr(NW->tempFileName)) != 0) {
+ EVN_syslog(LOG_ALERT,
+ "Could not link %s to %s: %s\n",
+ ChrPtr(NW->tempFileName),
+ ChrPtr(NW->SpoolFileName),
+ strerror(errno));
}
+
+ unlink(ChrPtr(NW->tempFileName));
+ return NWC_DispatchWriteDone(&NW->IO);
+ }
+ else {
+ NW->State --;
+ NW->IO.IOB.ChunkSendRemain = NW->IO.IOB.ChunkSize;
+ return NWC_DispatchWriteDone(&NW->IO);
+ }
+}
+
+eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
+ if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
+ {
+ NW->State ++;
+
+ FDIOBufferDelete(&NW->IO.IOB);
- if (buf[0] == '6') {
- plen = extract_int(&buf[4], 0);
- StrBuf *pbuf = NewStrBuf();
- if (socket_read_blob(sock, pbuf, plen, CLIENT_TIMEOUT) != plen) {
- syslog(LOG_INFO, "Short read from peer; aborting.\n");
- fclose(fp);
- unlink(tempfilename);
- FreeStrBuf(&pbuf);
- return;
- }
- fwrite(ChrPtr(pbuf), plen, 1, fp);
- bytes_received += plen;
- FreeStrBuf(&pbuf);
+ if (link(ChrPtr(NW->SpoolFileName), ChrPtr(NW->tempFileName)) != 0) {
+ EVN_syslog(LOG_ALERT,
+ "Could not link %s to %s: %s\n",
+ ChrPtr(NW->tempFileName),
+ ChrPtr(NW->SpoolFileName),
+ strerror(errno));
}
+
+ unlink(ChrPtr(NW->tempFileName));
+ return NWC_DispatchWriteDone(&NW->IO);
}
+ else {
+ NW->State --;
+ NW->IO.IOB.ChunkSendRemain = NW->IO.IOB.ChunkSize;
+ return NWC_DispatchWriteDone(&NW->IO);
+ }
+}
+eNextState NWC_SendCLOS(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ StrBufPlain(NW->IO.SendBuf.Buf, HKEY("CLOS\n"));
+ NWC_DBG_SEND();
+ return eSendReply;
+}
- fclose(fp);
+eNextState NWC_ReadCLOSReply(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
+ if (ChrPtr(NW->IO.IOBuf)[0] != '2')
+ return eTerminateConnection;
+ return eSendReply;
+}
- /* Last chance for shutdown exit */
- if (server_shutting_down)
- {
- unlink(tempfilename);
- return;
- }
- if (sock_puts(sock, "CLOS") < 0) {
- unlink(tempfilename);
- return;
+eNextState NWC_SendNUOP(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ eNextState rc;
+ long TotalSendSize;
+ struct stat statbuf;
+ int fd;
+
+ StrBufPrintf(NW->tempFileName,
+ "%s/%s",
+ ctdl_netout_dir,
+ ChrPtr(NW->node));
+ fd = open(ChrPtr(NW->tempFileName), O_RDONLY);
+ if (fd < 0) {
+ if (errno != ENOENT) {
+ EVN_syslog(LOG_CRIT,
+ "cannot open %s: %s\n",
+ ChrPtr(NW->tempFileName),
+ strerror(errno));
+ }
+ NW->State = eQUIT;
+ rc = NWC_SendQUIT(NW);
+ NWC_DBG_SEND();
+ return rc;
}
- /*
- * From here on we must complete or messages will get lost
- */
- if (sock_getln(sock, buf, sizeof buf) < 0) {
- unlink(tempfilename);
- return;
+ if (fstat(fd, &statbuf) == -1) {
+ EVN_syslog(LOG_CRIT, "FSTAT FAILED %s [%s]--\n",
+ ChrPtr(NW->tempFileName),
+ strerror(errno));
+ if (fd > 0) close(fd);
+ return eAbort;
}
+ TotalSendSize = statbuf.st_size;
+ if (TotalSendSize == 0) {
+ EVNM_syslog(LOG_DEBUG,
+ "Nothing to send.\n");
+ NW->State = eQUIT;
+ rc = NWC_SendQUIT(NW);
+ NWC_DBG_SEND();
+ return rc;
+ }
+ FDIOBufferInit(&NW->IO.IOB, &NW->IO.SendBuf, fd, TotalSendSize);
- syslog(LOG_DEBUG, "%s\n", buf);
+ StrBufPlain(NW->IO.SendBuf.Buf, HKEY("NUOP\n"));
+ NWC_DBG_SEND();
+ return eSendReply;
- /*
- * Now move the temp file to its permanent location.
- */
- if (link(tempfilename, permfilename) != 0) {
- syslog(LOG_ALERT, "Could not link %s to %s: %s\n",
- tempfilename, permfilename, strerror(errno)
- );
+}
+eNextState NWC_ReadNUOPReply(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
+ if (ChrPtr(NW->IO.IOBuf)[0] != '2')
+ return eAbort;
+ return eSendReply;
+}
+
+eNextState NWC_SendWRIT(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ StrBufPrintf(NW->IO.SendBuf.Buf, "WRIT %ld\n",
+ NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready);
+ NWC_DBG_SEND();
+ return eSendReply;
+}
+eNextState NWC_ReadWRITReply(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
+ if (ChrPtr(NW->IO.IOBuf)[0] != '7')
+ {
+ return eAbort;
}
-
- unlink(tempfilename);
+
+ NW->IO.IOB.ChunkSendRemain =
+ NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
+ return eSendFile;
}
+eNextState NWC_SendBlobDone(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ eNextState rc;
+ if (IO->IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
+ {
+ NW->State ++;
+
+ FDIOBufferDelete(&IO->IOB);
+ rc = NWC_DispatchWriteDone(IO);
+ NW->State --;
+ return rc;
+ }
+ else {
+ NW->State --;
+ IO->IOB.ChunkSendRemain = IO->IOB.ChunkSize;
+ return NWC_DispatchWriteDone(IO);
+ }
+}
+eNextState NWC_SendUCLS(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ StrBufPlain(NW->IO.SendBuf.Buf, HKEY("UCLS 1\n"));
+ NWC_DBG_SEND();
+ return eSendReply;
-/*
- * transmit network spool to the remote system
- */
-void transmit_spool(int *sock, char *remote_nodename)
+}
+eNextState NWC_ReadUCLS(AsyncNetworker *NW)
{
- char buf[SIZ];
- char pbuf[4096];
- long plen;
- long bytes_to_write, thisblock, bytes_written;
- int fd;
- char sfname[128];
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
- if (sock_puts(sock, "NUOP") < 0) return;
- if (sock_getln(sock, buf, sizeof buf) < 0) return;
- syslog(LOG_DEBUG, "<%s\n", buf);
- if (buf[0] != '2') {
- return;
+ EVN_syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", NW->IO.IOB.ChunkSize, ChrPtr(NW->node));
+ if (ChrPtr(NW->IO.IOBuf)[0] == '2') {
+ EVN_syslog(LOG_DEBUG, "Removing <%s>\n", ChrPtr(NW->tempFileName));
+ unlink(ChrPtr(NW->tempFileName));
}
+ return eSendReply;
+}
- snprintf(sfname, sizeof sfname,
- "%s/%s",
- ctdl_netout_dir,
- remote_nodename
- );
- fd = open(sfname, O_RDONLY);
- if (fd < 0) {
- if (errno != ENOENT) {
- syslog(LOG_CRIT, "cannot open %s: %s\n", sfname, strerror(errno));
+eNextState NWC_SendQUIT(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ StrBufPlain(NW->IO.SendBuf.Buf, HKEY("QUIT\n"));
+
+ NWC_DBG_SEND();
+ return eSendReply;
+}
+
+eNextState NWC_ReadQUIT(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ NWC_DBG_READ();
+
+ return eAbort;
+}
+
+
+NWClientHandler NWC_ReadHandlers[] = {
+ NWC_ReadGreeting,
+ NWC_ReadAuthReply,
+ NWC_ReadNDOPReply,
+ NWC_ReadREADState,
+ NWC_ReadREADBlob,
+ NWC_ReadCLOSReply,
+ NWC_ReadNUOPReply,
+ NWC_ReadWRITReply,
+ NWC_SendBlobDone,
+ NWC_ReadUCLS,
+ NWC_ReadQUIT};
+
+long NWC_ConnTimeout = 100;
+
+const long NWC_SendTimeouts[] = {
+ 100,
+ 100,
+ 100,
+ 100,
+ 100,
+ 100,
+ 100,
+ 100
+};
+const ConstStr NWC[] = {
+ {HKEY("Connection broken during ")},
+ {HKEY("Connection broken during ")},
+ {HKEY("Connection broken during ")},
+ {HKEY("Connection broken during ")},
+ {HKEY("Connection broken during ")},
+ {HKEY("Connection broken during ")},
+ {HKEY("Connection broken during ")},
+ {HKEY("Connection broken during ")}
+};
+
+NWClientHandler NWC_SendHandlers[] = {
+ NULL,
+ NWC_SendAuth,
+ NWC_SendNDOP,
+ NWC_SendREAD,
+ NWC_ReadREADBlobDone,
+ NWC_SendCLOS,
+ NWC_SendNUOP,
+ NWC_SendWRIT,
+ NWC_SendBlobDone,
+ NWC_SendUCLS,
+ NWC_SendQUIT
+};
+
+const long NWC_ReadTimeouts[] = {
+ 100,
+ 100,
+ 100,
+ 100,
+ 100,
+ 100,
+ 100,
+ 100,
+ 100,
+ 100
+};
+
+
+
+
+eNextState nwc_get_one_host_ip_done(AsyncIO *IO)
+{
+ AsyncNetworker *NW = IO->Data;
+ struct hostent *hostent;
+
+ QueryCbDone(IO);
+
+ hostent = NW->HostLookup.VParsedDNSReply;
+ if ((NW->HostLookup.DNSStatus == ARES_SUCCESS) &&
+ (hostent != NULL) ) {
+ memset(&NW->IO.ConnectMe->Addr, 0, sizeof(struct in6_addr));
+ if (NW->IO.ConnectMe->IPv6) {
+ memcpy(&NW->IO.ConnectMe->Addr.sin6_addr.s6_addr,
+ &hostent->h_addr_list[0],
+ sizeof(struct in6_addr));
+
+ NW->IO.ConnectMe->Addr.sin6_family = hostent->h_addrtype;
+ NW->IO.ConnectMe->Addr.sin6_port = htons(atol(ChrPtr(NW->port)));//// TODO use the one from the URL.
}
- return;
- }
- bytes_written = 0;
- while (plen = (long) read(fd, pbuf, IGNET_PACKET_SIZE), plen > 0L) {
- bytes_to_write = plen;
- while (bytes_to_write > 0L) {
- /* Exit if shutting down */
- if (server_shutting_down)
- {
- close(fd);
- return;
- }
+ else {
+ struct sockaddr_in *addr = (struct sockaddr_in*) &NW->IO.ConnectMe->Addr;
+ /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
+// addr->sin_addr.s_addr = htonl((uint32_t)&hostent->h_addr_list[0]);
+ memcpy(&addr->sin_addr.s_addr,
+ hostent->h_addr_list[0],
+ sizeof(uint32_t));
+
+ addr->sin_family = hostent->h_addrtype;
+ addr->sin_port = htons(504);/// default citadel port
- snprintf(buf, sizeof buf, "WRIT %ld", bytes_to_write);
- if (sock_puts(sock, buf) < 0) {
- close(fd);
- return;
- }
- if (sock_getln(sock, buf, sizeof buf) < 0) {
- close(fd);
- return;
- }
- thisblock = atol(&buf[4]);
- if (buf[0] == '7') {
- if (sock_write(sock, pbuf, (int) thisblock) < 0) {
- close(fd);
- return;
- }
- bytes_to_write -= thisblock;
- bytes_written += thisblock;
- } else {
- goto ABORTUPL;
- }
}
+ return nwc_connect_ip(IO);
}
+ else
+ return eAbort;
+}
-ABORTUPL:
- close(fd);
-
- /* Last chance for shutdown exit */
- if(server_shutting_down)
- return;
-
- if (sock_puts(sock, "UCLS 1") < 0) return;
- /*
- * From here on we must complete or messages will get lost
- */
- if (sock_getln(sock, buf, sizeof buf) < 0) return;
- syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", bytes_written, remote_nodename);
- syslog(LOG_DEBUG, "<%s\n", buf);
- if (buf[0] == '2') {
- syslog(LOG_DEBUG, "Removing <%s>\n", sfname);
- unlink(sfname);
+eNextState nwc_get_one_host_ip(AsyncIO *IO)
+{
+ AsyncNetworker *NW = IO->Data;
+ /*
+ * here we start with the lookup of one host.
+ */
+
+ EVN_syslog(LOG_DEBUG, "NWC: %s\n", __FUNCTION__);
+
+ EVN_syslog(LOG_DEBUG,
+ "NWC client[%ld]: looking up %s-Record %s : %d ...\n",
+ NW->n,
+ (NW->IO.ConnectMe->IPv6)? "aaaa": "a",
+ NW->IO.ConnectMe->Host,
+ NW->IO.ConnectMe->Port);
+
+ QueueQuery((NW->IO.ConnectMe->IPv6)? ns_t_aaaa : ns_t_a,
+ NW->IO.ConnectMe->Host,
+ &NW->IO,
+ &NW->HostLookup,
+ nwc_get_one_host_ip_done);
+ IO->NextState = eReadDNSReply;
+ return IO->NextState;
+}
+/**
+ * @brief lineread Handler; understands when to read more POP3 lines, and when this is a one-lined reply.
+ */
+eReadState NWC_ReadServerStatus(AsyncIO *IO)
+{
+// AsyncNetworker *NW = IO->Data;
+ eReadState Finished = eBufferNotEmpty;
+
+ switch (IO->NextState) {
+ case eSendDNSQuery:
+ case eReadDNSReply:
+ case eDBQuery:
+ case eConnect:
+ case eTerminateConnection:
+ case eAbort:
+ Finished = eReadFail;
+ break;
+ case eSendReply:
+ case eSendMore:
+ case eReadMore:
+ case eReadMessage:
+ Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
+ break;
+ case eReadFile:
+ case eSendFile:
+ case eReadPayload:
+ break;
}
+ return Finished;
}
-/*
- * Poll one Citadel node (called by network_poll_other_citadel_nodes() below)
- */
-void network_poll_node(char *node, char *secret, char *host, char *port) {
- int sock;
- char buf[SIZ];
- char err_buf[SIZ];
- char connected_to[SIZ];
- CitContext *CCC=CC;
- if (network_talking_to(node, NTT_CHECK)) return;
- network_talking_to(node, NTT_ADD);
- syslog(LOG_DEBUG, "network: polling <%s>\n", node);
- syslog(LOG_NOTICE, "Connecting to <%s> at %s:%s\n", node, host, port);
+eNextState NWC_FailNetworkConnection(AsyncIO *IO)
+{
+ return eAbort;
+}
- sock = sock_connect(host, port);
- if (sock < 0) {
- syslog(LOG_ERR, "Could not connect: %s\n", strerror(errno));
- network_talking_to(node, NTT_REMOVE);
+void NWC_SetTimeout(eNextState NextTCPState, AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+ double Timeout = 0.0;
+
+ EVN_syslog(LOG_DEBUG, "%s\n", __FUNCTION__);
+
+ switch (NextTCPState) {
+ case eSendReply:
+ case eSendMore:
+ break;
+ case eReadFile:
+ case eReadMessage:
+ Timeout = NWC_ReadTimeouts[NW->State];
+ break;
+ case eReadPayload:
+ Timeout = 100000;
+ /* TODO!!! */
+ break;
+ case eSendDNSQuery:
+ case eReadDNSReply:
+ case eConnect:
+ case eSendFile:
+//TODO
+ case eTerminateConnection:
+ case eDBQuery:
+ case eAbort:
+ case eReadMore://// TODO
return;
}
-
- syslog(LOG_DEBUG, "Connected!\n");
- CCC->SBuf.Buf = NewStrBuf();
- CCC->sMigrateBuf = NewStrBuf();
- CCC->SBuf.ReadWritePointer = NULL;
+ SetNextTimeout(&NW->IO, Timeout);
+}
- /* Read the server greeting */
- if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
- syslog(LOG_DEBUG, ">%s\n", buf);
- /* Check that the remote is who we think it is and warn the Aide if not */
- extract_token (connected_to, buf, 1, ' ', sizeof connected_to);
- if (strcmp(connected_to, node))
- {
- snprintf(err_buf, sizeof(err_buf),
- "Connected to node \"%s\" but I was expecting to connect to node \"%s\".",
- connected_to, node
- );
- syslog(LOG_ERR, "%s\n", err_buf);
- CtdlAideMessage(err_buf, "Network error");
- }
- else {
- /* We're talking to the correct node. Now identify ourselves. */
- snprintf(buf, sizeof buf, "NETP %s|%s", config.c_nodename, secret);
- syslog(LOG_DEBUG, "<%s\n", buf);
- if (sock_puts(&sock, buf) <0) goto bail;
- if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
- syslog(LOG_DEBUG, ">%s\n", buf);
- if (buf[0] != '2') {
- goto bail;
- }
-
- /* At this point we are authenticated. */
- if (!server_shutting_down)
- receive_spool(&sock, node);
- if (!server_shutting_down)
- transmit_spool(&sock, node);
- }
+eNextState NWC_DispatchReadDone(AsyncIO *IO)
+{
+ EVN_syslog(LOG_DEBUG, "%s\n", __FUNCTION__);
+ AsyncNetworker *NW = IO->Data;
+ eNextState rc;
+
+ rc = NWC_ReadHandlers[NW->State](NW);
+ if (rc != eReadMore)
+ NW->State++;
+ NWC_SetTimeout(rc, NW);
+ return rc;
+}
+eNextState NWC_DispatchWriteDone(AsyncIO *IO)
+{
+ EVN_syslog(LOG_DEBUG, "%s\n", __FUNCTION__);
+ AsyncNetworker *NW = IO->Data;
+ eNextState rc;
+
+ rc = NWC_SendHandlers[NW->State](NW);
+ NWC_SetTimeout(rc, NW);
+ return rc;
+}
+
+/*****************************************************************************/
+/* Networker CLIENT ERROR CATCHERS */
+/*****************************************************************************/
+eNextState NWC_Terminate(AsyncIO *IO)
+{
+ EVN_syslog(LOG_DEBUG, "%s\n", __FUNCTION__);
+ FinalizeNetworker(IO);
+ return eAbort;
+}
+
+eNextState NWC_Timeout(AsyncIO *IO)
+{
+ EVN_syslog(LOG_DEBUG, "%s\n", __FUNCTION__);
+
+ return NWC_FailNetworkConnection(IO);
+}
+eNextState NWC_ConnFail(AsyncIO *IO)
+{
+/// AsyncNetworker *NW = IO->Data;
+
+ EVN_syslog(LOG_DEBUG, "%s\n", __FUNCTION__);
+//// StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State])); todo
+ return NWC_FailNetworkConnection(IO);
+}
+eNextState NWC_DNSFail(AsyncIO *IO)
+{
+/// AsyncNetworker *NW = IO->Data;
+
+ EVN_syslog(LOG_DEBUG, "%s\n", __FUNCTION__);
+//// StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State])); todo
+ return NWC_FailNetworkConnection(IO);
+}
+eNextState NWC_Shutdown(AsyncIO *IO)
+{
+ EVN_syslog(LOG_DEBUG, "%s\n", __FUNCTION__);
+//// pop3aggr *pMsg = IO->Data;
- sock_puts(&sock, "QUIT");
-bail:
- FreeStrBuf(&CCC->SBuf.Buf);
- FreeStrBuf(&CCC->sMigrateBuf);
- if (sock != -1)
- sock_close(sock);
- network_talking_to(node, NTT_REMOVE);
+ FinalizeNetworker(IO);
+ return eAbort;
}
+eNextState nwc_connect_ip(AsyncIO *IO)
+{
+ AsyncNetworker *NW = IO->Data;
+
+ EVN_syslog(LOG_DEBUG, "%s\n", __FUNCTION__);
+ EVN_syslog(LOG_NOTICE, "Connecting to <%s> at %s:%s\n",
+ ChrPtr(NW->node),
+ ChrPtr(NW->host),
+ ChrPtr(NW->port));
+
+ return EvConnectSock(IO,
+ NWC_ConnTimeout,
+ NWC_ReadTimeouts[0],
+ 1);
+}
+
+static int NetworkerCount = 0;
+void RunNetworker(AsyncNetworker *NW)
+{
+ AsyncIO *IO = &NW->IO;
+
+ NW->n = NetworkerCount++;
+ network_talking_to(SKEY(NW->node), NTT_ADD);
+ syslog(LOG_DEBUG, "NW[%s][%ld]: polling\n", ChrPtr(NW->node), NW->n);
+ ParseURL(&NW->IO.ConnectMe, NW->Url, 504);
+
+ InitIOStruct(&NW->IO,
+ NW,
+ eReadMessage,
+ NWC_ReadServerStatus,
+ NWC_DNSFail,
+ NWC_DispatchWriteDone,
+ NWC_DispatchReadDone,
+ NWC_Terminate,
+ NWC_ConnFail,
+ NWC_Timeout,
+ NWC_Shutdown);
+
+ safestrncpy(((CitContext *)NW->IO.CitContext)->cs_host,
+ ChrPtr(NW->host),
+ sizeof(((CitContext *)NW->IO.CitContext)->cs_host));
+
+ if (NW->IO.ConnectMe->IsIP) {
+ QueueEventContext(&NW->IO,
+ nwc_connect_ip);
+ }
+ else { /* uneducated admin has chosen to add DNS to the equation... */
+ QueueEventContext(&NW->IO,
+ nwc_get_one_host_ip);
+ }
+}
/*
* Poll other Citadel nodes and transfer inbound/outbound network data.
* Set "full" to nonzero to force a poll of every node, or to zero to poll
* only nodes to which we have data to send.
*/
-void network_poll_other_citadel_nodes(int full_poll) {
- int i;
- char linebuf[256];
- char node[SIZ];
- char host[256];
- char port[256];
- char secret[256];
+void network_poll_other_citadel_nodes(int full_poll, char *working_ignetcfg)
+{
+ AsyncNetworker *NW;
+ StrBuf *CfgData;
+ StrBuf *Line;
+ const char *lptr;
+ const char *CfgPtr;
+ int Done;
+
int poll = 0;
- char spoolfile[256];
-
- if (working_ignetcfg == NULL) {
+
+ if ((working_ignetcfg == NULL) || (*working_ignetcfg == '\0')) {
syslog(LOG_DEBUG, "network: no neighbor nodes are configured - not polling.\n");
return;
}
+ become_session(&networker_client_CC);
- /* Use the string tokenizer to grab one line at a time */
- for (i=0; i<num_tokens(working_ignetcfg, '\n'); ++i) {
- if(server_shutting_down)
- return;
- extract_token(linebuf, working_ignetcfg, i, '\n', sizeof linebuf);
- extract_token(node, linebuf, 0, '|', sizeof node);
- extract_token(secret, linebuf, 1, '|', sizeof secret);
- extract_token(host, linebuf, 2, '|', sizeof host);
- extract_token(port, linebuf, 3, '|', sizeof port);
- if ( !IsEmptyStr(node) && !IsEmptyStr(secret)
- && !IsEmptyStr(host) && !IsEmptyStr(port)) {
- poll = full_poll;
- if (poll == 0) {
- snprintf(spoolfile,
- sizeof spoolfile,
- "%s/%s",
- ctdl_netout_dir,
- node
- );
- if (access(spoolfile, R_OK) == 0) {
- poll = 1;
+ CfgData = NewStrBufPlain(working_ignetcfg, -1);
+ Line = NewStrBufPlain(NULL, StrLength(CfgData));
+ Done = 0;
+ CfgPtr = NULL;
+ while (!Done)
+ {
+ /* Use the string tokenizer to grab one line at a time */
+ StrBufSipLine(Line, CfgData, &CfgPtr);
+ Done = CfgPtr == StrBufNOTNULL;
+ if (StrLength(Line) > 0)
+ {
+ if(server_shutting_down)
+ return;/* TODO free stuff*/
+ lptr = NULL;
+ poll = 0;
+ NW = (AsyncNetworker*)malloc(sizeof(AsyncNetworker));
+ memset(NW, 0, sizeof(AsyncNetworker));
+
+ NW->node = NewStrBufPlain(NULL, StrLength(Line));
+ NW->host = NewStrBufPlain(NULL, StrLength(Line));
+ NW->port = NewStrBufPlain(NULL, StrLength(Line));
+ NW->secret = NewStrBufPlain(NULL, StrLength(Line));
+
+ StrBufExtract_NextToken(NW->node, Line, &lptr, '|');
+ StrBufExtract_NextToken(NW->secret, Line, &lptr, '|');
+ StrBufExtract_NextToken(NW->host, Line, &lptr, '|');
+ StrBufExtract_NextToken(NW->port, Line, &lptr, '|');
+ if ( (StrLength(NW->node) != 0) &&
+ (StrLength(NW->secret) != 0) &&
+ (StrLength(NW->host) != 0) &&
+ (StrLength(NW->port) != 0))
+ {
+ poll = full_poll;
+ if (poll == 0)
+ {
+ NW->SpoolFileName = NewStrBufPlain(ctdl_netout_dir, -1);
+ StrBufAppendBufPlain(NW->SpoolFileName, HKEY("/"), 0);
+ StrBufAppendBuf(NW->SpoolFileName, NW->node, 0);
+ if (access(ChrPtr(NW->SpoolFileName), R_OK) == 0) {
+ poll = 1;
+ }
}
}
- if (poll) {
- network_poll_node(node, secret, host, port);
+ if (poll &&
+ (StrLength(NW->host) > 0) &&
+ strcmp("0.0.0.0", ChrPtr(NW->host)))
+ {
+ NW->Url = NewStrBufPlain(NULL, StrLength(Line));
+ StrBufPrintf(NW->Url, "citadel://:%s@%s:%s",
+ ChrPtr(NW->secret),
+ ChrPtr(NW->host),
+ ChrPtr(NW->port));
+ if (!network_talking_to(SKEY(NW->node), NTT_CHECK))
+ {
+ RunNetworker(NW);
+ continue;
+ }
}
+ DeleteNetworker(NW);
}
}
+ FreeStrBuf(&CfgData);
+ FreeStrBuf(&Line);
}
void network_do_clientqueue(void)
{
+ char *working_ignetcfg;
int full_processing = 1;
static time_t last_run = 0L;
);
}
-
+ working_ignetcfg = load_working_ignetcfg();
/*
* Poll other Citadel nodes. Maybe. If "full_processing" is set
* then we poll everyone. Otherwise we only poll nodes we have stuff
* to send to.
*/
- network_poll_other_citadel_nodes(full_processing);
+ network_poll_other_citadel_nodes(full_processing, working_ignetcfg);
+ if (working_ignetcfg)
+ free(working_ignetcfg);
}
{
if (!threading)
{
+ CtdlFillSystemContext(&networker_client_CC, "CitNetworker");
+
CtdlRegisterSessionHook(network_do_clientqueue, EVT_TIMER);
}
return "network_client";