aboutsummaryrefslogtreecommitdiff
path: root/v_windows/v/old/vlib/net/websocket/websocket_server.v
blob: 99af3e0ebf8cd812ae5fcc9bc40d4c890bc38f6e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
module websocket

import net
import net.openssl
import log
import time
import rand

// Server represents a websocket server connection
pub struct Server {
mut:
	logger                  &log.Log // logger used to log
	ls                      &net.TcpListener      // listener used to get incoming connection to socket
	accept_client_callbacks []AcceptClientFn      // accept client callback functions
	message_callbacks       []MessageEventHandler // new message callback functions
	close_callbacks         []CloseEventHandler   // close message callback functions
pub:
	family net.AddrFamily = .ip
	port   int  // port used as listen to incoming connections
	is_ssl bool // true if secure connection (not supported yet on server)
pub mut:
	clients       map[string]&ServerClient // clients connected to this server
	ping_interval int = 30 // interval for sending ping to clients (seconds)
	state         State // current state of connection
}

// ServerClient represents a connected client
struct ServerClient {
pub:
	resource_name string // resource that the client access
	client_key    string // unique key of client
pub mut:
	server &Server
	client &Client
}

// new_server instance a new websocket server on provided port and route
pub fn new_server(family net.AddrFamily, port int, route string) &Server {
	return &Server{
		ls: 0
		family: family
		port: port
		logger: &log.Log{
			level: .info
		}
		state: .closed
	}
}

// set_ping_interval sets the interval that the server will send ping messages to clients
pub fn (mut s Server) set_ping_interval(seconds int) {
	s.ping_interval = seconds
}

// listen start listen and process to incoming connections from websocket clients
pub fn (mut s Server) listen() ? {
	s.logger.info('websocket server: start listen on port $s.port')
	s.ls = net.listen_tcp(s.family, ':$s.port') ?
	s.set_state(.open)
	go s.handle_ping()
	for {
		mut c := s.accept_new_client() or { continue }
		go s.serve_client(mut c)
	}
	s.logger.info('websocket server: end listen on port $s.port')
}

// Close closes server (not implemented yet)
fn (mut s Server) close() {
	// TODO: implement close when moving to net from x.net
}

// handle_ping sends ping to all clients every set interval
fn (mut s Server) handle_ping() {
	mut clients_to_remove := []string{}
	for s.state == .open {
		time.sleep(s.ping_interval * time.second)
		for i, _ in s.clients {
			mut c := s.clients[i]
			if c.client.state == .open {
				c.client.ping() or {
					s.logger.debug('server-> error sending ping to client')
					c.client.close(1002, 'Closing connection: ping send error') or {
						// we want to continue even if error
						continue
					}
					clients_to_remove << c.client.id
				}
				if (time.now().unix - c.client.last_pong_ut) > s.ping_interval * 2 {
					clients_to_remove << c.client.id
					c.client.close(1000, 'no pong received') or { continue }
				}
			}
		}
		// TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges
		for client in clients_to_remove {
			lock  {
				s.clients.delete(client)
			}
		}
		clients_to_remove.clear()
	}
}

// serve_client accepts incoming connection and sets up the callbacks
fn (mut s Server) serve_client(mut c Client) ? {
	c.logger.debug('server-> Start serve client ($c.id)')
	defer {
		c.logger.debug('server-> End serve client ($c.id)')
	}
	mut handshake_response, mut server_client := s.handle_server_handshake(mut c) ?
	accept := s.send_connect_event(mut server_client) ?
	if !accept {
		s.logger.debug('server-> client not accepted')
		c.shutdown_socket() ?
		return
	}
	// the client is accepted
	c.socket_write(handshake_response.bytes()) ?
	lock  {
		s.clients[server_client.client.id] = server_client
	}
	s.setup_callbacks(mut server_client)
	c.listen() or {
		s.logger.error(err.msg)
		return err
	}
}

// setup_callbacks initialize all callback functions
fn (mut s Server) setup_callbacks(mut sc ServerClient) {
	if s.message_callbacks.len > 0 {
		for cb in s.message_callbacks {
			if cb.is_ref {
				sc.client.on_message_ref(cb.handler2, cb.ref)
			} else {
				sc.client.on_message(cb.handler)
			}
		}
	}
	if s.close_callbacks.len > 0 {
		for cb in s.close_callbacks {
			if cb.is_ref {
				sc.client.on_close_ref(cb.handler2, cb.ref)
			} else {
				sc.client.on_close(cb.handler)
			}
		}
	}
	// set standard close so we can remove client if closed
	sc.client.on_close_ref(fn (mut c Client, code int, reason string, mut sc ServerClient) ? {
		c.logger.debug('server-> Delete client')
		lock  {
			sc.server.clients.delete(sc.client.id)
		}
	}, sc)
}

// accept_new_client creates a new client instance for client that connects to the socket
fn (mut s Server) accept_new_client() ?&Client {
	mut new_conn := s.ls.accept() ?
	c := &Client{
		is_server: true
		conn: new_conn
		ssl_conn: openssl.new_ssl_conn()
		logger: s.logger
		state: .open
		last_pong_ut: time.now().unix
		id: rand.uuid_v4()
	}
	return c
}

// set_state sets current state in a thread safe way
fn (mut s Server) set_state(state State) {
	lock  {
		s.state = state
	}
}

// free manages manual free of memory for Server instance
pub fn (mut s Server) free() {
	unsafe {
		s.clients.free()
		s.accept_client_callbacks.free()
		s.message_callbacks.free()
		s.close_callbacks.free()
	}
}