From f5c4671bfbad96bf346bd7e9a21fc4317b4959df Mon Sep 17 00:00:00 2001 From: Indrajith K L Date: Sat, 3 Dec 2022 17:00:20 +0530 Subject: Adds most of the tools --- v_windows/v/vlib/net/websocket/websocket_client.v | 488 ++++++++++++++++++++++ 1 file changed, 488 insertions(+) create mode 100644 v_windows/v/vlib/net/websocket/websocket_client.v (limited to 'v_windows/v/vlib/net/websocket/websocket_client.v') diff --git a/v_windows/v/vlib/net/websocket/websocket_client.v b/v_windows/v/vlib/net/websocket/websocket_client.v new file mode 100644 index 0000000..48f8c5c --- /dev/null +++ b/v_windows/v/vlib/net/websocket/websocket_client.v @@ -0,0 +1,488 @@ +// websocket module implements websocket client and a websocket server +// attribution: @thecoderr the author of original websocket client +[manualfree] +module websocket + +import net +import net.http +import net.openssl +import net.urllib +import time +import log +import rand + +const ( + empty_bytearr = []byte{} // used as empty response to avoid allocation +) + +// Client represents websocket client +pub struct Client { + is_server bool +mut: + ssl_conn &openssl.SSLConn // secure connection used when wss is used + flags []Flag // flags used in handshake + fragments []Fragment // current fragments + message_callbacks []MessageEventHandler // all callbacks on_message + error_callbacks []ErrorEventHandler // all callbacks on_error + open_callbacks []OpenEventHandler // all callbacks on_open + close_callbacks []CloseEventHandler // all callbacks on_close +pub: + is_ssl bool // true if secure socket is used + uri Uri // uri of current connection + id string // unique id of client +pub mut: + header http.Header // headers that will be passed when connecting + conn &net.TcpConn // underlying TCP socket connection + nonce_size int = 16 // size of nounce used for masking + panic_on_callback bool // set to true of callbacks can panic + state State // current state of connection + logger &log.Log // logger used to log messages + resource_name string // name of current resource + last_pong_ut i64 // last time in unix time we got a pong message +} + +// Flag represents different types of headers in websocket handshake +enum Flag { + has_accept // Webs + has_connection + has_upgrade +} + +// State represents the state of the websocket connection. +pub enum State { + connecting = 0 + open + closing + closed +} + +// Message represents a whole message combined from 1 to n frames +pub struct Message { +pub: + opcode OPCode // websocket frame type of this message + payload []byte // payload of the message +} + +// OPCode represents the supported websocket frame types +pub enum OPCode { + continuation = 0x00 + text_frame = 0x01 + binary_frame = 0x02 + close = 0x08 + ping = 0x09 + pong = 0x0A +} + +// new_client instance a new websocket client +pub fn new_client(address string) ?&Client { + uri := parse_uri(address) ? + return &Client{ + conn: 0 + is_server: false + ssl_conn: openssl.new_ssl_conn() + is_ssl: address.starts_with('wss') + logger: &log.Log{ + level: .info + } + uri: uri + state: .closed + id: rand.uuid_v4() + header: http.new_header() + } +} + +// connect connects to remote websocket server +pub fn (mut ws Client) connect() ? { + ws.assert_not_connected() ? + ws.set_state(.connecting) + ws.logger.info('connecting to host $ws.uri') + ws.conn = ws.dial_socket() ? + // Todo: make setting configurable + ws.conn.set_read_timeout(time.second * 30) + ws.conn.set_write_timeout(time.second * 30) + ws.handshake() ? + ws.set_state(.open) + ws.logger.info('successfully connected to host $ws.uri') + ws.send_open_event() +} + +// listen listens and processes incoming messages +pub fn (mut ws Client) listen() ? { + mut log := 'Starting client listener, server($ws.is_server)...' + ws.logger.info(log) + unsafe { log.free() } + defer { + ws.logger.info('Quit client listener, server($ws.is_server)...') + if ws.state == .open { + ws.close(1000, 'closed by client') or {} + } + } + for ws.state == .open { + msg := ws.read_next_message() or { + if ws.state in [.closed, .closing] { + return + } + ws.debug_log('failed to read next message: $err') + ws.send_error_event('failed to read next message: $err') + return err + } + if ws.state in [.closed, .closing] { + return + } + ws.debug_log('got message: $msg.opcode') + match msg.opcode { + .text_frame { + log = 'read: text' + ws.debug_log(log) + unsafe { log.free() } + ws.send_message_event(msg) + unsafe { msg.free() } + } + .binary_frame { + ws.debug_log('read: binary') + ws.send_message_event(msg) + unsafe { msg.free() } + } + .ping { + ws.debug_log('read: ping, sending pong') + ws.send_control_frame(.pong, 'PONG', msg.payload) or { + ws.logger.error('error in message callback sending PONG: $err') + ws.send_error_event('error in message callback sending PONG: $err') + if ws.panic_on_callback { + panic(err) + } + continue + } + if msg.payload.len > 0 { + unsafe { msg.free() } + } + } + .pong { + ws.debug_log('read: pong') + ws.last_pong_ut = time.now().unix + ws.send_message_event(msg) + if msg.payload.len > 0 { + unsafe { msg.free() } + } + } + .close { + log = 'read: close' + ws.debug_log(log) + unsafe { log.free() } + defer { + ws.manage_clean_close() + } + if msg.payload.len > 0 { + if msg.payload.len == 1 { + ws.close(1002, 'close payload cannot be 1 byte') ? + return error('close payload cannot be 1 byte') + } + code := (int(msg.payload[0]) << 8) + int(msg.payload[1]) + if code in invalid_close_codes { + ws.close(1002, 'invalid close code: $code') ? + return error('invalid close code: $code') + } + reason := if msg.payload.len > 2 { msg.payload[2..] } else { []byte{} } + if reason.len > 0 { + ws.validate_utf_8(.close, reason) ? + } + if ws.state !in [.closing, .closed] { + // sending close back according to spec + ws.debug_log('close with reason, code: $code, reason: $reason') + r := reason.bytestr() + ws.close(code, r) ? + } + unsafe { msg.free() } + } else { + if ws.state !in [.closing, .closed] { + ws.debug_log('close with reason, no code') + // sending close back according to spec + ws.close(1000, 'normal') ? + } + unsafe { msg.free() } + } + return + } + .continuation { + ws.logger.error('unexpected opcode continuation, nothing to continue') + ws.send_error_event('unexpected opcode continuation, nothing to continue') + ws.close(1002, 'nothing to continue') ? + return error('unexpected opcode continuation, nothing to continue') + } + } + } +} + +// manage_clean_close closes connection in a clean websocket way +fn (mut ws Client) manage_clean_close() { + ws.send_close_event(1000, 'closed by client') +} + +// ping sends ping message to server +pub fn (mut ws Client) ping() ? { + ws.send_control_frame(.ping, 'PING', []) ? +} + +// pong sends pong message to server, +pub fn (mut ws Client) pong() ? { + ws.send_control_frame(.pong, 'PONG', []) ? +} + +// write_ptr writes len bytes provided a byteptr with a websocket messagetype +pub fn (mut ws Client) write_ptr(bytes &byte, payload_len int, code OPCode) ?int { + // ws.debug_log('write_ptr code: $code') + if ws.state != .open || ws.conn.sock.handle < 1 { + // todo: send error here later + return error('trying to write on a closed socket!') + } + mut header_len := 2 + if payload_len > 125 { 2 } else { 0 } + + if payload_len > 0xffff { 6 } else { 0 } + if !ws.is_server { + header_len += 4 + } + mut header := []byte{len: header_len, init: `0`} // [`0`].repeat(header_len) + header[0] = byte(int(code)) | 0x80 + masking_key := create_masking_key() + if ws.is_server { + if payload_len <= 125 { + header[1] = byte(payload_len) + } else if payload_len > 125 && payload_len <= 0xffff { + len16 := C.htons(payload_len) + header[1] = 126 + unsafe { C.memcpy(&header[2], &len16, 2) } + } else if payload_len > 0xffff && payload_len <= 0x7fffffff { + len_bytes := htonl64(u64(payload_len)) + header[1] = 127 + unsafe { C.memcpy(&header[2], len_bytes.data, 8) } + } + } else { + if payload_len <= 125 { + header[1] = byte(payload_len | 0x80) + header[2] = masking_key[0] + header[3] = masking_key[1] + header[4] = masking_key[2] + header[5] = masking_key[3] + } else if payload_len > 125 && payload_len <= 0xffff { + len16 := C.htons(payload_len) + header[1] = (126 | 0x80) + unsafe { C.memcpy(&header[2], &len16, 2) } + header[4] = masking_key[0] + header[5] = masking_key[1] + header[6] = masking_key[2] + header[7] = masking_key[3] + } else if payload_len > 0xffff && payload_len <= 0x7fffffff { + len64 := htonl64(u64(payload_len)) + header[1] = (127 | 0x80) + unsafe { C.memcpy(&header[2], len64.data, 8) } + header[10] = masking_key[0] + header[11] = masking_key[1] + header[12] = masking_key[2] + header[13] = masking_key[3] + } else { + ws.close(1009, 'frame too large') ? + return error('frame too large') + } + } + len := header.len + payload_len + mut frame_buf := []byte{len: len} + unsafe { + C.memcpy(&frame_buf[0], &byte(header.data), header.len) + if payload_len > 0 { + C.memcpy(&frame_buf[header.len], bytes, payload_len) + } + } + if !ws.is_server { + for i in 0 .. payload_len { + frame_buf[header_len + i] ^= masking_key[i % 4] & 0xff + } + } + written_len := ws.socket_write(frame_buf) ? + unsafe { + frame_buf.free() + masking_key.free() + header.free() + } + return written_len +} + +// write writes a byte array with a websocket messagetype to socket +pub fn (mut ws Client) write(bytes []byte, code OPCode) ?int { + return ws.write_ptr(&byte(bytes.data), bytes.len, code) +} + +// write_str, writes a string with a websocket texttype to socket +pub fn (mut ws Client) write_string(str string) ?int { + return ws.write_ptr(str.str, str.len, .text_frame) +} + +// close closes the websocket connection +pub fn (mut ws Client) close(code int, message string) ? { + ws.debug_log('sending close, $code, $message') + if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { + ws.debug_log('close: Websocket allready closed ($ws.state), $message, $code handle($ws.conn.sock.handle)') + err_msg := 'Socket allready closed: $code' + return error(err_msg) + } + defer { + ws.shutdown_socket() or {} + ws.reset_state() + } + ws.set_state(.closing) + // mut code32 := 0 + if code > 0 { + code_ := C.htons(code) + message_len := message.len + 2 + mut close_frame := []byte{len: message_len} + close_frame[0] = byte(code_ & 0xFF) + close_frame[1] = byte(code_ >> 8) + // code32 = (close_frame[0] << 8) + close_frame[1] + for i in 0 .. message.len { + close_frame[i + 2] = message[i] + } + ws.send_control_frame(.close, 'CLOSE', close_frame) ? + unsafe { close_frame.free() } + } else { + ws.send_control_frame(.close, 'CLOSE', []) ? + } + ws.fragments = [] +} + +// send_control_frame sends a control frame to the server +fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []byte) ? { + ws.debug_log('send control frame $code, frame_type: $frame_typ') + if ws.state !in [.open, .closing] && ws.conn.sock.handle > 1 { + return error('socket is not connected') + } + header_len := if ws.is_server { 2 } else { 6 } + frame_len := header_len + payload.len + mut control_frame := []byte{len: frame_len} + mut masking_key := if !ws.is_server { create_masking_key() } else { websocket.empty_bytearr } + defer { + unsafe { + control_frame.free() + if masking_key.len > 0 { + masking_key.free() + } + } + } + control_frame[0] = byte(int(code) | 0x80) + if !ws.is_server { + control_frame[1] = byte(payload.len | 0x80) + control_frame[2] = masking_key[0] + control_frame[3] = masking_key[1] + control_frame[4] = masking_key[2] + control_frame[5] = masking_key[3] + } else { + control_frame[1] = byte(payload.len) + } + if code == .close { + if payload.len >= 2 { + if !ws.is_server { + mut parsed_payload := []byte{len: payload.len + 1} + unsafe { C.memcpy(parsed_payload.data, &payload[0], payload.len) } + parsed_payload[payload.len] = `\0` + for i in 0 .. payload.len { + control_frame[6 + i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff + } + unsafe { parsed_payload.free() } + } else { + unsafe { C.memcpy(&control_frame[2], &payload[0], payload.len) } + } + } + } else { + if !ws.is_server { + if payload.len > 0 { + for i in 0 .. payload.len { + control_frame[header_len + i] = (payload[i] ^ masking_key[i % 4]) & 0xff + } + } + } else { + if payload.len > 0 { + unsafe { C.memcpy(&control_frame[2], &payload[0], payload.len) } + } + } + } + ws.socket_write(control_frame) or { + return error('send_control_frame: error sending $frame_typ control frame.') + } +} + +// parse_uri parses the url to a Uri +fn parse_uri(url string) ?&Uri { + u := urllib.parse(url) ? + request_uri := u.request_uri() + v := request_uri.split('?') + mut port := u.port() + uri := u.str() + if port == '' { + port = if uri.starts_with('ws://') { + '80' + } else if uri.starts_with('wss://') { + '443' + } else { + u.port() + } + } + querystring := if v.len > 1 { '?' + v[1] } else { '' } + return &Uri{ + url: url + hostname: u.hostname() + port: port + resource: v[0] + querystring: querystring + } +} + +// set_state sets current state of the websocket connection +fn (mut ws Client) set_state(state State) { + lock { + ws.state = state + } +} + +// assert_not_connected returns error if the connection is not connected +fn (ws Client) assert_not_connected() ? { + match ws.state { + .connecting { return error('connect: websocket is connecting') } + .open { return error('connect: websocket already open') } + .closing { return error('connect: reconnect on closing websocket not supported, please use new client') } + else {} + } +} + +// reset_state resets the websocket and initialize default settings +fn (mut ws Client) reset_state() { + lock { + ws.state = .closed + ws.ssl_conn = openssl.new_ssl_conn() + ws.flags = [] + ws.fragments = [] + } +} + +// debug_log handles debug logging output for client and server +fn (mut ws Client) debug_log(text string) { + if ws.is_server { + ws.logger.debug('server-> $text') + } else { + ws.logger.debug('client-> $text') + } +} + +// free handles manual free memory of Message struct +pub fn (m &Message) free() { + unsafe { m.payload.free() } +} + +// free handles manual free memory of Client struct +pub fn (c &Client) free() { + unsafe { + c.flags.free() + c.fragments.free() + c.message_callbacks.free() + c.error_callbacks.free() + c.open_callbacks.free() + c.close_callbacks.free() + c.header.free() + } +} -- cgit v1.2.3