case eConnect:
case eSendReply:
case eSendMore:
+ case eSendFile:
case eReadMessage:
case eReadMore:
case eReadPayload:
+ case eReadFile:
ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
break;
case eTerminateConnection:
eReadState HandleInbound(AsyncIO *IO)
{
+ const char *Err = NULL;
eReadState Finished = eBufferNotEmpty;
become_session(IO->CitContext);
ev_io_stop(event_base, &IO->recv_event);
IO->NextState = IO->ReadDone(IO);
Finished = StrBufCheckBuffer(&IO->RecvBuf);
+ if ((IO->NextState == eReadFile) &&
+ (Finished == eBufferNotEmpty))
+ {
+ Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
+ if (Finished == eReadSuccess)
+ {
+ IO->NextState = eSendReply;
+ }
+ }
}
}
switch (IO->NextState) {
+ case eSendFile:
+ ev_io_start(event_base, &IO->send_event);
+ break;
case eSendReply:
case eSendMore:
assert(IO->SendDone);
break;
case eReadPayload:
case eReadMore:
+ case eReadFile:
ev_io_start(event_base, &IO->recv_event);
break;
case eTerminateConnection:
+//////TODOxxxx
+ break;
case eAbort:
ShutDownCLient(IO);
break;
{
int rc;
AsyncIO *IO = watcher->data;
+ const char *errmsg = NULL;
become_session(IO->CitContext);
#ifdef BIGBAD_IODBG
if (!rv) printf("failed to write debug to %s!\n", fn);
fprintf(fd, "]\n");
#endif
- rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
+ switch (IO->NextState) {
+ case eSendFile:
+ rc = FileSendChunked(&IO->IOB, &errmsg);
+ if (rc < 0)
+ StrBufPlain(IO->ErrMsg, errmsg, -1);
+ break;
+ default:
+ rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
+ }
#ifdef BIGBAD_IODBG
fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
ev_io_start(event_base, &IO->send_event);
}
break;
+ case eSendFile:
+ if (IO->IOB.ChunkSendRemain > 0) {
+ ev_io_start(event_base, &IO->recv_event);
+ } else {
+ assert(IO->ReadDone);
+ IO->NextState = IO->ReadDone(IO);
+ switch(IO->NextState) {
+ case eSendDNSQuery:
+ case eReadDNSReply:
+ case eDBQuery:
+ case eConnect:
+ break;
+ case eSendReply:
+ case eSendMore:
+ case eSendFile:
+ ev_io_start(event_base, &IO->send_event);
+ break;
+ case eReadMessage:
+ case eReadMore:
+ case eReadPayload:
+ case eReadFile:
+ break;
+ case eTerminateConnection:
+ case eAbort:
+ break;
+ }
+ }
+ break;
case eSendReply:
if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
break;
case eReadMore:
case eReadMessage:
case eReadPayload:
+ case eReadFile:
if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
HandleInbound(IO);
}
switch(IO->NextState) {
case eReadMore:
case eReadMessage:
+ case eReadFile:
ev_io_start(event_base, &IO->recv_event);
break;
case eSendReply:
case eSendMore:
case eReadPayload:
+ case eSendFile:
become_session(IO->CitContext);
IO_send_callback(loop, &IO->send_event, revents);
break;
static void
IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
{
+ const char *errmsg;
+ long rc;
ssize_t nbytes;
AsyncIO *IO = watcher->data;
- nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+ switch (IO->NextState) {
+ case eReadFile:
+ rc = FileRecvChunked(&IO->IOB, &errmsg);
+ if (rc < 0)
+ StrBufPlain(IO->ErrMsg, errmsg, -1);
+ break;
+ default:
+ nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+ break;
+ }
#ifdef BIGBAD_IODBG
{
int rv = 0;
AsyncIO IO;
DNSQueryParts HostLookup;
eNWCState State;
- int fd;
long n;
- FILE *TmpFile;
- long bytes_received;
StrBuf *SpoolFileName;
StrBuf *tempFileName;
StrBuf *node;
StrBuf *port;
StrBuf *secret;
StrBuf *Url;
-
- long download_len;
- int BlobReadSize;
- long bytes_written;
- long bytes_to_write;
} AsyncNetworker;
typedef eNextState(*NWClientHandler)(AsyncNetworker* NW);
eNextState nwc_connect_ip(AsyncIO *IO);
eNextState NWC_SendQUIT(AsyncNetworker *NW);
-
+eNextState NWC_DispatchWriteDone(AsyncIO *IO);
void DestroyNetworker(AsyncNetworker *NW)
eNextState NWC_ReadNDOPReply(AsyncNetworker *NW)
{
+ int TotalSendSize;
NWC_DBG_READ();
if (ChrPtr(NW->IO.IOBuf)[0] == '2')
{
- NW->download_len = atol (ChrPtr(NW->IO.IOBuf) + 4);
- syslog(LOG_DEBUG, "Expecting to transfer %ld bytes\n", NW->download_len);
- if (NW->download_len <= 0) {
+
+ NW->IO.IOB.TotalSentAlready = 0;
+ TotalSendSize = atol (ChrPtr(NW->IO.IOBuf) + 4);
+ syslog(LOG_DEBUG, "Expecting to transfer %ld bytes\n", NW->IO.IOB.TotalSendSize);
+ if (TotalSendSize <= 0) {
NW->State = eNUOP - 1;
}
else {
- NW->TmpFile = fopen(ChrPtr(NW->SpoolFileName), "w");
- if (NW->TmpFile == NULL)
+ int fd;
+ fd = open(ChrPtr(NW->SpoolFileName),
+ O_EXCL|O_CREAT|O_NONBLOCK|O_WRONLY,
+ S_IRUSR|S_IWUSR);
+ if (fd < 0)
{
syslog(LOG_CRIT,
"cannot open %s: %s\n",
strerror(errno));
NW->State = eQUIT - 1;
+ return eAbort;
}
+ FDIOBufferInit(&NW->IO.IOB, &NW->IO.RecvBuf, fd, TotalSendSize);
}
return eSendReply;
}
eNextState NWC_SendREAD(AsyncNetworker *NW)
{
- if (NW->bytes_received < NW->download_len)
+ if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize)
{
/*
* If shutting down we can exit here and unlink the temp file.
*/
if (server_shutting_down)
{
- fclose(NW->TmpFile);
+ close(NW->IO.IOB.OtherFD);
////// unlink(ChrPtr(NW->tempFileName));
return eAbort;
}
StrBufPrintf(NW->IO.SendBuf.Buf, "READ %ld|%ld\n",
- NW->bytes_received,
- ((NW->download_len - NW->bytes_received > IGNET_PACKET_SIZE)
+ NW->IO.IOB.TotalSentAlready,
+ ((NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready > IGNET_PACKET_SIZE)
? IGNET_PACKET_SIZE :
- (NW->download_len - NW->bytes_received))
+ (NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready))
);
return eSendReply;
NWC_DBG_READ();
if (ChrPtr(NW->IO.IOBuf)[0] == '6')
{
- NW->BlobReadSize = atol(ChrPtr(NW->IO.IOBuf)+4);
- NW->bytes_received += NW->BlobReadSize;
+ NW->IO.IOB.ChunkSendRemain =
+ NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
+/// NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize;
/// TODO StrBufReadjustIOBuffer(NW->IO.RecvBuf, NW->BlobReadSize);
- return eReadPayload;
+ return eReadFile;
}
return eAbort;
}
+eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW);
eNextState NWC_ReadREADBlob(AsyncNetworker *NW)
{
/// FlushIOBuffer(NW->IO.RecvBuf); /// TODO
- fwrite(NW->IO.RecvBuf.ReadWritePointer,
- 1,
- NW->BlobReadSize,
- NW->TmpFile);
- if (NW->bytes_received < NW->download_len)
+
+ ///NW->bytes_received += NW->IO.IOB.ChunkSize;
+
+ if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize)
{
NW->State = eREAD - 1;
return eSendReply;/* now fetch next chunk*/
}
- else
+ else
+ return NWC_ReadREADBlobDone(NW);
+}
+
+eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW)
+{
+ if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
{
- fclose(NW->TmpFile);
+ NW->State ++;
+
+ close(NW->IO.IOB.OtherFD);
if (link(ChrPtr(NW->SpoolFileName), ChrPtr(NW->tempFileName)) != 0) {
syslog(LOG_ALERT,
}
///// unlink(ChrPtr(NW->tempFileName));
- return eSendReply; //// TODO: step forward.
+ return NWC_DispatchWriteDone(&NW->IO);
+ }
+ else {
+ NW->State --;
+ return NWC_DispatchWriteDone(&NW->IO);
}
}
-
-
eNextState NWC_SendCLOS(AsyncNetworker *NW)
{
StrBufPlain(NW->IO.SendBuf.Buf, HKEY("CLOS\n"));
eNextState NWC_SendNUOP(AsyncNetworker *NW)
{
+ long TotalSendSize;
struct stat statbuf;
+ int fd;
StrBufPrintf(NW->tempFileName,
"%s/%s",
ctdl_netout_dir,
ChrPtr(NW->node));
- NW->fd = open(ChrPtr(NW->tempFileName), O_RDONLY);
- if (NW->fd < 0) {
+ fd = open(ChrPtr(NW->tempFileName), O_RDONLY);
+ if (fd < 0) {
if (errno != ENOENT) {
syslog(LOG_CRIT,
"cannot open %s: %s\n",
return NWC_SendQUIT(NW);
}
- if (fstat(NW->fd, &statbuf) == -1) {
+ if (fstat(fd, &statbuf) == -1) {
syslog(9, "FSTAT FAILED %s [%s]--\n",
ChrPtr(NW->tempFileName),
strerror(errno));
- if (NW->fd > 0) close(NW->fd);
+ if (fd > 0) close(fd);
return eAbort;
}
-
- NW->download_len = statbuf.st_size;
- if (NW->download_len == 0) {
+ TotalSendSize = statbuf.st_size;
+ if (TotalSendSize == 0) {
syslog(LOG_DEBUG,
"Nothing to send.\n");
NW->State = eQUIT;
return NWC_SendQUIT(NW);
}
+ FDIOBufferInit(&NW->IO.IOB, &NW->IO.SendBuf, fd, TotalSendSize);
- NW->bytes_written = 0;
+//// NW->bytes_written = 0;
StrBufPlain(NW->IO.SendBuf.Buf, HKEY("NUOP\n"));
return eSendReply;
eNextState NWC_SendWRIT(AsyncNetworker *NW)
{
- StrBufPrintf(NW->IO.SendBuf.Buf, "WRIT %ld\n", NW->bytes_to_write);
+ StrBufPrintf(NW->IO.SendBuf.Buf, "WRIT %ld\n",
+ NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready);
return eSendReply;
}
return eAbort;
}
- NW->BlobReadSize = atol(ChrPtr(NW->IO.IOBuf)+4);
- return eSendMore;
+ NW->IO.IOB.ChunkSendRemain =
+ NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
+/// NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize;
+ return eSendFile;
}
-eNextState NWC_SendBlob(AsyncNetworker *NW)
+eNextState NWC_SendBlobDone(AsyncNetworker *NW)
{
+ eNextState rc;
+ if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
+ {
+ NW->State ++;
- /// bytes_to_write -= thisblock;
- /// bytes_written += thisblock;
-
- return eReadMessage;
+ close(NW->IO.IOB.OtherFD);
+//// TODO: unlink networker file?
+ rc = NWC_DispatchWriteDone(&NW->IO);
+ NW->State --;
+ return rc;
+ }
+ else {
+ NW->State --;
+ return NWC_DispatchWriteDone(&NW->IO);
+ }
}
eNextState NWC_SendUCLS(AsyncNetworker *NW)
{
NWC_DBG_READ();
- syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", NW->bytes_written, ChrPtr(NW->node));
+ syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", NW->IO.IOB.ChunkSize, ChrPtr(NW->node));
/// syslog(LOG_DEBUG, "<%s\n", buf);
if (ChrPtr(NW->IO.IOBuf)[0] == '2') {
syslog(LOG_DEBUG, "Removing <%s>\n", ChrPtr(NW->tempFileName));
NWC_ReadCLOSReply,
NWC_ReadNUOPReply,
NWC_ReadWRITReply,
- NULL,
+ NWC_SendBlobDone,
NWC_ReadUCLS,
NWC_ReadQUIT
};
NWC_SendAuth,
NWC_SendNDOP,
NWC_SendREAD,
- NULL,
+ NWC_ReadREADBlobDone,
NWC_SendCLOS,
NWC_SendNUOP,
NWC_SendWRIT,
- NWC_SendBlob,
+ NWC_SendBlobDone,
NWC_SendUCLS,
NWC_SendQUIT
};
case eReadMessage:
Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
break;
+ case eReadFile:
+ case eSendFile:
case eReadPayload:
- Finished = IOBufferStrLength(&IO->RecvBuf) >= NW->BlobReadSize;
+////TODO Finished = IOBufferStrLength(&IO->RecvBuf) >= NW->BlobReadSize;
break;
}
return Finished;