Implement asynchroneous networking.
[citadel.git] / citadel / event_client.c
index f1abcbef7ff9396550809c03be7b3af22edec87e..9a7566f7d4d84f777eac6c1a2e8085b631467695 100644 (file)
@@ -145,9 +145,11 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
        case eConnect:
        case eSendReply: 
        case eSendMore:
+       case eSendFile:
        case eReadMessage: 
        case eReadMore:
        case eReadPayload:
+       case eReadFile:
                ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
                break;
        case eTerminateConnection:
@@ -260,6 +262,7 @@ void ShutDownCLient(AsyncIO *IO)
 
 eReadState HandleInbound(AsyncIO *IO)
 {
+       const char *Err = NULL;
        eReadState Finished = eBufferNotEmpty;
        
        become_session(IO->CitContext);
@@ -297,10 +300,22 @@ eReadState HandleInbound(AsyncIO *IO)
                        ev_io_stop(event_base, &IO->recv_event);
                        IO->NextState = IO->ReadDone(IO);
                        Finished = StrBufCheckBuffer(&IO->RecvBuf);
+                       if ((IO->NextState == eReadFile) && 
+                           (Finished == eBufferNotEmpty))
+                       {
+                               Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
+                               if (Finished == eReadSuccess)
+                               {
+                                       IO->NextState = eSendReply;                             
+                               }
+                       }
                }
        }
 
        switch (IO->NextState) {
+       case eSendFile:
+               ev_io_start(event_base, &IO->send_event);
+               break;
        case eSendReply:
        case eSendMore:
                assert(IO->SendDone);
@@ -309,9 +324,12 @@ eReadState HandleInbound(AsyncIO *IO)
                break;
        case eReadPayload:
        case eReadMore:
+       case eReadFile:
                ev_io_start(event_base, &IO->recv_event);
                break;
        case eTerminateConnection:
+//////TODOxxxx
+               break;
        case eAbort:
                ShutDownCLient(IO);
                break;
@@ -331,6 +349,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
        int rc;
        AsyncIO *IO = watcher->data;
+       const char *errmsg = NULL;
 
        become_session(IO->CitContext);
 #ifdef BIGBAD_IODBG
@@ -357,7 +376,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                if (!rv) printf("failed to write debug to %s!\n", fn);
                fprintf(fd, "]\n");
 #endif
-               rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
+               switch (IO->NextState) {
+               case eSendFile:
+                       rc = FileSendChunked(&IO->IOB, &errmsg);
+                       if (rc < 0)
+                               StrBufPlain(IO->ErrMsg, errmsg, -1);
+                       break;
+               default:
+                       rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
+               }
 
 #ifdef BIGBAD_IODBG
                fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
@@ -379,6 +406,34 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                                ev_io_start(event_base, &IO->send_event);
                        }
                        break;
+               case eSendFile:
+                       if (IO->IOB.ChunkSendRemain > 0) {
+                               ev_io_start(event_base, &IO->recv_event);
+                       } else {
+                               assert(IO->ReadDone);
+                               IO->NextState = IO->ReadDone(IO);
+                               switch(IO->NextState) {
+                               case eSendDNSQuery:
+                               case eReadDNSReply:
+                               case eDBQuery:
+                               case eConnect:
+                                       break;
+                               case eSendReply: 
+                               case eSendMore:
+                               case eSendFile:
+                                       ev_io_start(event_base, &IO->send_event);
+                                       break;
+                               case eReadMessage: 
+                               case eReadMore:
+                               case eReadPayload:
+                               case eReadFile:
+                                       break;
+                               case eTerminateConnection:
+                               case eAbort:
+                                       break;
+                               }
+                       }
+                       break;
                case eSendReply:
                    if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) 
                        break;
@@ -386,6 +441,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                case eReadMore:
                case eReadMessage:
                case eReadPayload:
+               case eReadFile:
                        if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
                                HandleInbound(IO);
                        }
@@ -419,11 +475,13 @@ set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
        switch(IO->NextState) {
        case eReadMore:
        case eReadMessage:
+       case eReadFile:
                ev_io_start(event_base, &IO->recv_event);
                break;
        case eSendReply:
        case eSendMore:
        case eReadPayload:
+       case eSendFile:
                become_session(IO->CitContext);
                IO_send_callback(loop, &IO->send_event, revents);
                break;
@@ -531,10 +589,21 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 static void
 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
+       const char *errmsg;
+       long rc;
        ssize_t nbytes;
        AsyncIO *IO = watcher->data;
 
-       nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+       switch (IO->NextState) {
+       case eReadFile:
+               rc = FileRecvChunked(&IO->IOB, &errmsg);
+               if (rc < 0)
+                       StrBufPlain(IO->ErrMsg, errmsg, -1);
+               break;
+       default:
+               nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+               break;
+       }
 #ifdef BIGBAD_IODBG
        {
                int rv = 0;