X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=57a6440102f12ca86d58b83aba96c0f1d1ad76f7;hb=192056a6112602350c1f8a73eae2e134a31c7ba2;hp=9a7566f7d4d84f777eac6c1a2e8085b631467695;hpb=c06372f95e9313f29a331c7d76ef812e480f3c84;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 9a7566f7d..57a644010 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -203,6 +203,28 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) return eSendReply; } +extern eNextState evcurl_handle_start(AsyncIO *IO); + +eNextState QueueCurlContext(AsyncIO *IO) +{ + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->IO = IO; + h->EvAttch = evcurl_handle_start; + + pthread_mutex_lock(&EventQueueMutex); + syslog(LOG_DEBUG, "EVENT Q\n"); + i = ++evbase_count; + Put(InboundEventQueue, IKEY(i), h, NULL); + pthread_mutex_unlock(&EventQueueMutex); + + ev_async_send (event_base, &AddJob); + syslog(LOG_DEBUG, "EVENT Q Done.\n"); + return eSendReply; +} + int ShutDownEventQueue(void) { pthread_mutex_lock(&DBEventQueueMutex); @@ -270,6 +292,7 @@ eReadState HandleInbound(AsyncIO *IO) while ((Finished == eBufferNotEmpty) && ((IO->NextState == eReadMessage)|| (IO->NextState == eReadMore)|| + (IO->NextState == eReadFile)|| (IO->NextState == eReadPayload))) { if (IO->RecvBuf.nBlobBytesWanted != 0) { @@ -277,7 +300,16 @@ eReadState HandleInbound(AsyncIO *IO) } else { /* Reading lines... */ //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP - if (IO->LineReader) + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) + { + IO->NextState = eSendReply; + } + } + else if (IO->LineReader) Finished = IO->LineReader(IO); else Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); @@ -300,15 +332,6 @@ eReadState HandleInbound(AsyncIO *IO) ev_io_stop(event_base, &IO->recv_event); IO->NextState = IO->ReadDone(IO); Finished = StrBufCheckBuffer(&IO->RecvBuf); - if ((IO->NextState == eReadFile) && - (Finished == eBufferNotEmpty)) - { - Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); - if (Finished == eReadSuccess) - { - IO->NextState = eSendReply; - } - } } } @@ -590,20 +613,29 @@ static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { const char *errmsg; - long rc; ssize_t nbytes; AsyncIO *IO = watcher->data; switch (IO->NextState) { case eReadFile: - rc = FileRecvChunked(&IO->IOB, &errmsg); - if (rc < 0) + nbytes = FileRecvChunked(&IO->IOB, &errmsg); + if (nbytes < 0) StrBufPlain(IO->ErrMsg, errmsg, -1); + else + { + if (IO->IOB.ChunkSendRemain == 0) + { + IO->NextState = eSendReply; + } + else + return; + } break; default: nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); break; } + #ifdef BIGBAD_IODBG { int rv = 0; @@ -650,6 +682,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) syslog(LOG_DEBUG, "EVENT: Socket Invalid! %s \n", strerror(errno)); + ShutDownCLient(IO); return; } } @@ -661,6 +694,7 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); assert(IO->DNSFail); + assert(IO->DNSQuery->PostDNS); switch (IO->DNSQuery->PostDNS(IO)) { case eAbort: