don't call curl_multi_add_handle() from outside of the event queue
[citadel.git] / citadel / event_client.c
index f1abcbef7ff9396550809c03be7b3af22edec87e..57a6440102f12ca86d58b83aba96c0f1d1ad76f7 100644 (file)
@@ -145,9 +145,11 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
        case eConnect:
        case eSendReply: 
        case eSendMore:
+       case eSendFile:
        case eReadMessage: 
        case eReadMore:
        case eReadPayload:
+       case eReadFile:
                ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
                break;
        case eTerminateConnection:
@@ -201,6 +203,28 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
        return eSendReply;
 }
 
+extern eNextState evcurl_handle_start(AsyncIO *IO);
+
+eNextState QueueCurlContext(AsyncIO *IO)
+{
+       IOAddHandler *h;
+       int i;
+
+       h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
+       h->IO = IO;
+       h->EvAttch = evcurl_handle_start;
+
+       pthread_mutex_lock(&EventQueueMutex);
+       syslog(LOG_DEBUG, "EVENT Q\n");
+       i = ++evbase_count;
+       Put(InboundEventQueue, IKEY(i), h, NULL);
+       pthread_mutex_unlock(&EventQueueMutex);
+
+       ev_async_send (event_base, &AddJob);
+       syslog(LOG_DEBUG, "EVENT Q Done.\n");
+       return eSendReply;
+}
+
 int ShutDownEventQueue(void)
 {
        pthread_mutex_lock(&DBEventQueueMutex);
@@ -260,6 +284,7 @@ void ShutDownCLient(AsyncIO *IO)
 
 eReadState HandleInbound(AsyncIO *IO)
 {
+       const char *Err = NULL;
        eReadState Finished = eBufferNotEmpty;
        
        become_session(IO->CitContext);
@@ -267,6 +292,7 @@ eReadState HandleInbound(AsyncIO *IO)
        while ((Finished == eBufferNotEmpty) && 
               ((IO->NextState == eReadMessage)||
                (IO->NextState == eReadMore)||
+               (IO->NextState == eReadFile)||
                (IO->NextState == eReadPayload)))
        {
                if (IO->RecvBuf.nBlobBytesWanted != 0) { 
@@ -274,7 +300,16 @@ eReadState HandleInbound(AsyncIO *IO)
                }
                else { /* Reading lines... */
 //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP
-                       if (IO->LineReader)
+                       if ((IO->NextState == eReadFile) && 
+                           (Finished == eBufferNotEmpty))
+                       {
+                               Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
+                               if (Finished == eReadSuccess)
+                               {
+                                       IO->NextState = eSendReply;                             
+                               }
+                       }
+                       else if (IO->LineReader)
                                Finished = IO->LineReader(IO);
                        else 
                                Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
@@ -301,6 +336,9 @@ eReadState HandleInbound(AsyncIO *IO)
        }
 
        switch (IO->NextState) {
+       case eSendFile:
+               ev_io_start(event_base, &IO->send_event);
+               break;
        case eSendReply:
        case eSendMore:
                assert(IO->SendDone);
@@ -309,9 +347,12 @@ eReadState HandleInbound(AsyncIO *IO)
                break;
        case eReadPayload:
        case eReadMore:
+       case eReadFile:
                ev_io_start(event_base, &IO->recv_event);
                break;
        case eTerminateConnection:
+//////TODOxxxx
+               break;
        case eAbort:
                ShutDownCLient(IO);
                break;
@@ -331,6 +372,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
        int rc;
        AsyncIO *IO = watcher->data;
+       const char *errmsg = NULL;
 
        become_session(IO->CitContext);
 #ifdef BIGBAD_IODBG
@@ -357,7 +399,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                if (!rv) printf("failed to write debug to %s!\n", fn);
                fprintf(fd, "]\n");
 #endif
-               rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
+               switch (IO->NextState) {
+               case eSendFile:
+                       rc = FileSendChunked(&IO->IOB, &errmsg);
+                       if (rc < 0)
+                               StrBufPlain(IO->ErrMsg, errmsg, -1);
+                       break;
+               default:
+                       rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
+               }
 
 #ifdef BIGBAD_IODBG
                fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
@@ -379,6 +429,34 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                                ev_io_start(event_base, &IO->send_event);
                        }
                        break;
+               case eSendFile:
+                       if (IO->IOB.ChunkSendRemain > 0) {
+                               ev_io_start(event_base, &IO->recv_event);
+                       } else {
+                               assert(IO->ReadDone);
+                               IO->NextState = IO->ReadDone(IO);
+                               switch(IO->NextState) {
+                               case eSendDNSQuery:
+                               case eReadDNSReply:
+                               case eDBQuery:
+                               case eConnect:
+                                       break;
+                               case eSendReply: 
+                               case eSendMore:
+                               case eSendFile:
+                                       ev_io_start(event_base, &IO->send_event);
+                                       break;
+                               case eReadMessage: 
+                               case eReadMore:
+                               case eReadPayload:
+                               case eReadFile:
+                                       break;
+                               case eTerminateConnection:
+                               case eAbort:
+                                       break;
+                               }
+                       }
+                       break;
                case eSendReply:
                    if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) 
                        break;
@@ -386,6 +464,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                case eReadMore:
                case eReadMessage:
                case eReadPayload:
+               case eReadFile:
                        if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
                                HandleInbound(IO);
                        }
@@ -419,11 +498,13 @@ set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
        switch(IO->NextState) {
        case eReadMore:
        case eReadMessage:
+       case eReadFile:
                ev_io_start(event_base, &IO->recv_event);
                break;
        case eSendReply:
        case eSendMore:
        case eReadPayload:
+       case eSendFile:
                become_session(IO->CitContext);
                IO_send_callback(loop, &IO->send_event, revents);
                break;
@@ -531,10 +612,30 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 static void
 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
+       const char *errmsg;
        ssize_t nbytes;
        AsyncIO *IO = watcher->data;
 
-       nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+       switch (IO->NextState) {
+       case eReadFile:
+               nbytes = FileRecvChunked(&IO->IOB, &errmsg);
+               if (nbytes < 0)
+                       StrBufPlain(IO->ErrMsg, errmsg, -1);
+               else 
+               {
+                       if (IO->IOB.ChunkSendRemain == 0)
+                       {
+                               IO->NextState = eSendReply;
+                       }
+                       else
+                               return;
+               }
+               break;
+       default:
+               nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+               break;
+       }
+
 #ifdef BIGBAD_IODBG
        {
                int rv = 0;
@@ -581,6 +682,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                syslog(LOG_DEBUG, 
                       "EVENT: Socket Invalid! %s \n",
                       strerror(errno));
+               ShutDownCLient(IO);
                return;
        }
 }
@@ -592,6 +694,7 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
        syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
        become_session(IO->CitContext);
        assert(IO->DNSFail);
+       assert(IO->DNSQuery->PostDNS);
        switch (IO->DNSQuery->PostDNS(IO))
        {
        case eAbort: