File size: 2,795 Bytes
83607bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#include "include/tcp_inbound.h"
#include "include/conn_map.h"
#include "include/accumulator.h"
#include "hv/hsocket.h"
#include "hv/hthread.h"
#include "hv/TcpClient.h"
#include <string>

class TcpClientShell {
    public:
        bool init(hio_t* _io) {
            io = _io;
            int connfd = cli.createsocket(hio_peeraddr(io));
            if (connfd < 0) {
                return -1;
            }
            cli.onConnection = [this](const hv::SocketChannelPtr& channel) {
                std::string peeraddr = channel->peeraddr();
                if (channel->isConnected()) {
                    printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
                    if (wait_send_buf.getDataSize() > 0)
                    {
                        cli.send(wait_send_buf.getData(), wait_send_buf.getDataSize());
                    }
                } else {
                    printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
                    hio_close(io);
                }
            };

            cli.onMessage = [this](const hv::SocketChannelPtr& channel, hv::Buffer* buf) {
                hio_write(io, buf->data(), buf->size());
                printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
            };

            cli.onWriteComplete = [this](const hv::SocketChannelPtr& channel, hv::Buffer* buf) {

            };
            cli.start();
        }

        int send(const char* data, int size) {
            if (cli.isConnected())
            {
                return cli.send(data, size);
            } else {
                wait_send_buf.addData(data, size);
                return size;
            }
        }

        void close() {
            cli.closesocket();
        }

    private:
        hv::TcpClient cli;
        hio_t* io;
        Accumulator wait_send_buf;
};

static void tcp_on_close(hio_t* io) {
    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
    ConnMap<hio_t*, TcpClientShell>::getInstance().remove(io);
}

static void tcp_on_recv(hio_t* io, void* buf, int readbytes) {
    // echo
    hio_write(io, buf, readbytes);
    auto cli = ConnMap<hio_t*, TcpClientShell>::getInstance().get(io);
    if(cli) {
        cli->send((const char*) buf, readbytes);
    }
}

void tcp_on_accept(hio_t* io, hevent_t* ev) {
    hloop_t* loop = ev->loop;

    char localaddrstr[SOCKADDR_STRLEN] = {0};
    char peeraddrstr[SOCKADDR_STRLEN] = {0};
    printf("tid=%ld connfd=%d [%s] <= [%s]\n",
            (long)hv_gettid(),
            (int)hio_fd(io),
            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));

    hio_setcb_close(io, tcp_on_close);
    hio_setcb_read(io, tcp_on_recv);
    hio_read(io);
}