diff options
Diffstat (limited to 'v_windows/v/vlib/net/websocket/websocket_server.v')
-rw-r--r-- | v_windows/v/vlib/net/websocket/websocket_server.v | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/v_windows/v/vlib/net/websocket/websocket_server.v b/v_windows/v/vlib/net/websocket/websocket_server.v new file mode 100644 index 0000000..99af3e0 --- /dev/null +++ b/v_windows/v/vlib/net/websocket/websocket_server.v @@ -0,0 +1,189 @@ +module websocket + +import net +import net.openssl +import log +import time +import rand + +// Server represents a websocket server connection +pub struct Server { +mut: + logger &log.Log // logger used to log + ls &net.TcpListener // listener used to get incoming connection to socket + accept_client_callbacks []AcceptClientFn // accept client callback functions + message_callbacks []MessageEventHandler // new message callback functions + close_callbacks []CloseEventHandler // close message callback functions +pub: + family net.AddrFamily = .ip + port int // port used as listen to incoming connections + is_ssl bool // true if secure connection (not supported yet on server) +pub mut: + clients map[string]&ServerClient // clients connected to this server + ping_interval int = 30 // interval for sending ping to clients (seconds) + state State // current state of connection +} + +// ServerClient represents a connected client +struct ServerClient { +pub: + resource_name string // resource that the client access + client_key string // unique key of client +pub mut: + server &Server + client &Client +} + +// new_server instance a new websocket server on provided port and route +pub fn new_server(family net.AddrFamily, port int, route string) &Server { + return &Server{ + ls: 0 + family: family + port: port + logger: &log.Log{ + level: .info + } + state: .closed + } +} + +// set_ping_interval sets the interval that the server will send ping messages to clients +pub fn (mut s Server) set_ping_interval(seconds int) { + s.ping_interval = seconds +} + +// listen start listen and process to incoming connections from websocket clients +pub fn (mut s Server) listen() ? { + s.logger.info('websocket server: start listen on port $s.port') + s.ls = net.listen_tcp(s.family, ':$s.port') ? + s.set_state(.open) + go s.handle_ping() + for { + mut c := s.accept_new_client() or { continue } + go s.serve_client(mut c) + } + s.logger.info('websocket server: end listen on port $s.port') +} + +// Close closes server (not implemented yet) +fn (mut s Server) close() { + // TODO: implement close when moving to net from x.net +} + +// handle_ping sends ping to all clients every set interval +fn (mut s Server) handle_ping() { + mut clients_to_remove := []string{} + for s.state == .open { + time.sleep(s.ping_interval * time.second) + for i, _ in s.clients { + mut c := s.clients[i] + if c.client.state == .open { + c.client.ping() or { + s.logger.debug('server-> error sending ping to client') + c.client.close(1002, 'Closing connection: ping send error') or { + // we want to continue even if error + continue + } + clients_to_remove << c.client.id + } + if (time.now().unix - c.client.last_pong_ut) > s.ping_interval * 2 { + clients_to_remove << c.client.id + c.client.close(1000, 'no pong received') or { continue } + } + } + } + // TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges + for client in clients_to_remove { + lock { + s.clients.delete(client) + } + } + clients_to_remove.clear() + } +} + +// serve_client accepts incoming connection and sets up the callbacks +fn (mut s Server) serve_client(mut c Client) ? { + c.logger.debug('server-> Start serve client ($c.id)') + defer { + c.logger.debug('server-> End serve client ($c.id)') + } + mut handshake_response, mut server_client := s.handle_server_handshake(mut c) ? + accept := s.send_connect_event(mut server_client) ? + if !accept { + s.logger.debug('server-> client not accepted') + c.shutdown_socket() ? + return + } + // the client is accepted + c.socket_write(handshake_response.bytes()) ? + lock { + s.clients[server_client.client.id] = server_client + } + s.setup_callbacks(mut server_client) + c.listen() or { + s.logger.error(err.msg) + return err + } +} + +// setup_callbacks initialize all callback functions +fn (mut s Server) setup_callbacks(mut sc ServerClient) { + if s.message_callbacks.len > 0 { + for cb in s.message_callbacks { + if cb.is_ref { + sc.client.on_message_ref(cb.handler2, cb.ref) + } else { + sc.client.on_message(cb.handler) + } + } + } + if s.close_callbacks.len > 0 { + for cb in s.close_callbacks { + if cb.is_ref { + sc.client.on_close_ref(cb.handler2, cb.ref) + } else { + sc.client.on_close(cb.handler) + } + } + } + // set standard close so we can remove client if closed + sc.client.on_close_ref(fn (mut c Client, code int, reason string, mut sc ServerClient) ? { + c.logger.debug('server-> Delete client') + lock { + sc.server.clients.delete(sc.client.id) + } + }, sc) +} + +// accept_new_client creates a new client instance for client that connects to the socket +fn (mut s Server) accept_new_client() ?&Client { + mut new_conn := s.ls.accept() ? + c := &Client{ + is_server: true + conn: new_conn + ssl_conn: openssl.new_ssl_conn() + logger: s.logger + state: .open + last_pong_ut: time.now().unix + id: rand.uuid_v4() + } + return c +} + +// set_state sets current state in a thread safe way +fn (mut s Server) set_state(state State) { + lock { + s.state = state + } +} + +// free manages manual free of memory for Server instance +pub fn (mut s Server) free() { + unsafe { + s.clients.free() + s.accept_client_callbacks.free() + s.message_callbacks.free() + s.close_callbacks.free() + } +} |