tpws: rewrite connection closing

This commit is contained in:
bol-van 2024-08-26 15:15:40 +03:00
parent 15751976e0
commit e69fdac9ab
13 changed files with 89 additions and 11 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -260,6 +260,12 @@ static bool conn_has_unsent_pair(tproxy_conn_t *conn)
return conn_has_unsent(conn) || (conn_partner_alive(conn) && conn_has_unsent(conn->partner));
}
static void conn_shutdown(tproxy_conn_t *conn)
{
if (shutdown(conn->fd,SHUT_WR)<0)
DLOG_PERROR("shutdown");
conn->bShutdown = true;
}
static ssize_t send_or_buffer(send_buffer_t *sb, int fd, const void *buf, size_t len, int flags, int ttl)
{
@ -518,10 +524,10 @@ static bool epoll_update_flow(tproxy_conn_t *conn)
{
if (conn->bFlowInPrev==conn->bFlowIn && conn->bFlowOutPrev==conn->bFlowOut && conn->bPrevRdhup==(conn->state==CONN_RDHUP))
return true; // unchanged, no need to syscall
DBGPRINT("SET FLOW fd=%d to in=%d out=%d state_rdhup=%d\n",conn->fd,conn->bFlowIn,conn->bFlowOut,conn->state==CONN_RDHUP);
uint32_t evtmask = (conn->state==CONN_RDHUP ? 0 : EPOLLRDHUP)|(conn->bFlowIn?EPOLLIN:0)|(conn->bFlowOut?EPOLLOUT:0);
if (!epoll_set(conn, evtmask))
return false;
DBGPRINT("SET FLOW fd=%d to in=%d out=%d state_rdhup=%d\n",conn->fd,conn->bFlowIn,conn->bFlowOut,conn->state==CONN_RDHUP);
conn->bFlowInPrev = conn->bFlowIn;
conn->bFlowOutPrev = conn->bFlowOut;
conn->bPrevRdhup = (conn->state==CONN_RDHUP);
@ -688,11 +694,11 @@ static bool epoll_set_flow_pair(tproxy_conn_t *conn)
DBGPRINT("epoll_set_flow_pair fd=%d remote=%d partner_fd=%d bHasUnsent=%d bHasUnsentPartner=%d state_rdhup=%d\n",
conn->fd , conn->remote, conn_partner_alive(conn) ? conn->partner->fd : 0, bHasUnsent, bHasUnsentPartner, conn->state==CONN_RDHUP);
if (!epoll_set_flow(conn, !bHasUnsentPartner && (conn->state!=CONN_RDHUP), bHasUnsent || conn->state==CONN_RDHUP))
if (!epoll_set_flow(conn, !bHasUnsentPartner && (conn->state != CONN_RDHUP), bHasUnsent))
return false;
if (conn_partner_alive(conn))
{
if (!epoll_set_flow(conn->partner, !bHasUnsent && (conn->partner->state!=CONN_RDHUP), bHasUnsentPartner || conn->partner->state==CONN_RDHUP))
if (!epoll_set_flow(conn->partner, !bHasUnsent && (conn->partner->state != CONN_RDHUP), bHasUnsentPartner))
return false;
}
return true;
@ -724,6 +730,20 @@ static bool handle_unsent(tproxy_conn_t *conn)
DBGPRINT("conn_buffers_send wr=%zd\n",wr);
if (wr<0) return false;
}
if (!conn_has_unsent(conn) && conn_partner_alive(conn) && conn->partner->state==CONN_RDHUP)
{
if (!conn->bShutdown)
{
DBGPRINT("fd=%d no more has unsent. partner in RDHUP state. executing delayed shutdown.\n", conn->fd);
conn_shutdown(conn);
}
if (conn->state==CONN_RDHUP && !conn_has_unsent(conn->partner))
{
DBGPRINT("both partners are in RDHUP state and have no unsent. closing.\n");
return false;
}
}
return epoll_set_flow_pair(conn);
}
@ -1198,7 +1218,6 @@ static bool remove_closed_connections(int efd, struct tailhead *close_list)
{
TAILQ_REMOVE(close_list, conn, conn_ptrs);
shutdown(conn->fd,SHUT_RDWR);
epoll_del(conn);
VPRINT("Socket fd=%d (partner_fd=%d, remote=%d) closed, connection removed. total_read=%" PRIu64 " total_write=%" PRIu64 " event_count=%u\n",
conn->fd, conn->partner ? conn->partner->fd : 0, conn->remote, conn->trd, conn->twr, conn->event_count);
@ -1303,6 +1322,7 @@ static void conn_close_with_partner_check(struct tailhead *conn_list, struct tai
}
}
static bool handle_resolve_pipe(tproxy_conn_t **conn, struct tailhead *conn_list, int fd)
{
ssize_t rd;
@ -1475,7 +1495,20 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
else
{
print_legs();
VPRINT("Socket fd=%d (local) connected\n", conn->fd);
if (params.debug>=1)
{
struct sockaddr_storage sa;
socklen_t salen=sizeof(sa);
char ip_port[48];
if (getpeername(conn->fd,(struct sockaddr *)&sa,&salen))
*ip_port=0;
else
ntop46_port((struct sockaddr*)&sa,ip_port,sizeof(ip_port));
VPRINT("Socket fd=%d (local) connected from %s\n", conn->fd, ip_port);
}
}
}
else
@ -1498,8 +1531,17 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
VPRINT("Socket fd=%d (partner_fd=%d, remote=%d) %s so_error=%d (%s)\n",conn->fd,conn->partner ? conn->partner->fd : 0,conn->remote,se,errn,strerror(errn));
proxy_remote_conn_ack(conn,errn);
read_all_and_buffer(conn,3);
if (errn==ECONNRESET && conn->remote && params.tamper && conn_partner_alive(conn)) rst_in(&conn->partner->track);
if (errn==ECONNRESET && conn_partner_alive(conn))
{
if (conn->remote && params.tamper) rst_in(&conn->partner->track);
struct linger lin;
lin.l_onoff=1;
lin.l_linger=0;
DBGPRINT("setting LINGER=0 to partner to force mirrored RST close\n");
if (setsockopt(conn->partner->fd,SOL_SOCKET,SO_LINGER,&lin,sizeof(lin))<0)
DLOG_PERROR("setsockopt (SO_LINGER)");
}
conn_close_with_partner_check(&conn_list,&close_list,conn);
continue;
}
@ -1518,22 +1560,52 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
read_all_and_buffer(conn,2);
if (!conn->remote && params.tamper) hup_out(&conn->track);
conn->state = CONN_RDHUP; // only writes. do not receive RDHUP anymore
if (conn_has_unsent(conn))
{
DBGPRINT("conn fd=%d has unsent, not closing\n", conn->fd);
conn->state = CONN_RDHUP; // only writes
DBGPRINT("conn fd=%d has unsent\n", conn->fd);
epoll_set_flow(conn,false,true);
}
else
{
DBGPRINT("conn fd=%d has no unsent, closing\n", conn->fd);
conn_close_with_partner_check(&conn_list,&close_list,conn);
DBGPRINT("conn fd=%d has no unsent\n", conn->fd);
conn->bFlowIn = false;
epoll_update_flow(conn);
if (conn_partner_alive(conn))
{
if (conn_has_unsent(conn->partner))
DBGPRINT("partner has unset. partner shutdown delayed.\n");
else
{
DBGPRINT("partner has no unsent. shutting down partner.\n");
conn_shutdown(conn->partner);
if (conn->partner->state==CONN_RDHUP)
{
DBGPRINT("both partners are in RDHUP state and have no unsent. closing.\n");
conn_close_with_partner_check(&conn_list,&close_list,conn);
}
}
}
else
{
DBGPRINT("partner is absent or not alive. closing.\n");
close_tcp_conn(&conn_list,&close_list,conn);
}
}
continue;
}
if (events[i].events & (EPOLLIN|EPOLLOUT))
{
const char *se;
switch (events[i].events & (EPOLLIN|EPOLLOUT))
{
case EPOLLIN: se="EPOLLIN"; break;
case EPOLLOUT: se="EPOLLOUT"; break;
case EPOLLIN|EPOLLOUT: se="EPOLLIN EPOLLOUT"; break;
default: se=NULL;
}
if (se) DBGPRINT("%s\n",se);
// will not receive this until successful check_connection_attempt()
if (!handle_epoll(conn, &conn_list, events[i].events))
{
@ -1541,6 +1613,12 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
conn_close_with_partner_check(&conn_list,&close_list,conn);
continue;
}
if ((conn->state == CONN_RDHUP) && conn_partner_alive(conn) && !conn->partner->bShutdown && !conn_has_unsent(conn))
{
DBGPRINT("conn fd=%d has no unsent. shutting down partner.\n", conn->fd);
conn_shutdown(conn->partner);
}
}
}

View File

@ -69,7 +69,7 @@ struct tproxy_conn
// these value are used in flow control. we do not use ET (edge triggered) polling
// if we dont disable notifications they will come endlessly until condition becomes false and will eat all cpu time
bool bFlowIn,bFlowOut, bFlowInPrev,bFlowOutPrev, bPrevRdhup;
bool bFlowIn,bFlowOut, bShutdown, bFlowInPrev,bFlowOutPrev, bPrevRdhup;
// total read,write
uint64_t trd,twr, tnrd;