Source: backend/data_center/modules/server_socket.js

/**
 *      StreamRoller Copyright 2023 "SilenusTA https://www.twitch.tv/olddepressedgamer"
 * 
 *      StreamRoller is an all in one streaming solution designed to give a single
 *      'second monitor' control page and allow easy integration for configuring
 *      content (ie. tweets linked to chat, overlays triggered by messages, hue lights
 *      controlled by donations etc)
 * 
 *      This program is free software: you can redistribute it and/or modify
 *      it under the terms of the GNU Affero General Public License as published
 *      by the Free Software Foundation, either version 3 of the License, or
 *      (at your option) any later version.
 * 
 *      This program is distributed in the hope that it will be useful,
 *      but WITHOUT ANY WARRANTY; without even the implied warranty of
 *      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *      GNU Affero General Public License for more details.
 * 
 *      You should have received a copy of the GNU Affero General Public License
 *      along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
/** ######################### SERVER_SOCKET.js #################################
// @file This file handles the server socket. This will consume client
// sockets and handle messages received and sent out
// -------------------------- Creation ----------------------------------------
// @author Silenus aka twitch.tv/OldDepressedGamer
// GitHub: https://github.com/SilenusTA/StreamRoller
// Date: 14-Jan-2021
// --------------------------- functionality ----------------------------------
// On connection the client will receive a client id. After initial connection
// the client should join(register)/create a room it wishes to use. Multiple 
// rooms can be created/joined(registered).
// You can then send/monitor the channel for messages as required
// === Rooms ===
// The API expects clients to join a room by registering.
// ie. if an extension is provided for twitch chat it might create a
// room called 'TWITCH_CHAT' and the any client can register
// for messages to that room. Each module will have to specify
// the rooms they create so that users know what to register for
// ............................. usage ........................................
// ------------------------ Import socket.io
//    import DataCenterIo from "socket.io-client";
// ------------------------ setup handlers to receive messages from the system normally three are used
//    DataCenterSocket.on("connect", (data) => onDataCenterConnect(DataCenterSocket));
//    DataCenterSocket.on("disconnect", (reason) => onDataCenterDisconnect(reason));
//    DataCenterSocket.on("message", (data) => onDataCenterMessage(data));
// ------------------------ Connect to the websocket handle
//    DataCenterSocket = DataCenterIo("http://localhost:3000", { transports: ["websocket"] });
// ------------------------ send a message to a room in the data center
//    DataCenterSocket.emit(messagetype, 
//                          {CLIENT_ID, 
//                           SOCKET_NAME, 
//                           CHANNEL_NAME, 
//                           MESSAGE_TYPE,
//                           data
//                          });
// --------------------------- description -------------------------------------
// 1) After the connection a "connect" message should be received. In the handler
//    for this "onDataCenterDisconnect(reason)" above you should either create or
//    register to a room/channel to send and receive messages from.
// 2) All data packets are stringified JSON objects as shown below
// 3) CLIENT_ID is what you get in the first message to the client when connecting
//    and gets received by the onDataCenterConnect(DataCenterSocket)) function
// ....... A NOTE ON MESSAGE TYPES ......
// There are three places where message types come up.
// 1) initial connection, this will be the websoctet type, normally set to
//            { transports: ["websocket"] }
// 2) In emit/receive funcions
//             messagetype ( lower case in emit meessage above in usage section)
//             First parameter in a received message (the .on handlers above)                
//    this is one of the following
//       createchannel, register, message, disconnect,unregister
//    this tells us what type of websocket service you are requesting/recieving. 
//    Once up and running you will only use the 'message' service with a data 
//    message as shown below
// 5) In the data section of a message. This is the one we will mostly use during
//    normal operation (ie not connecting or disconnecting the socket)
//          MESSAGE_TYPE is one of "data" or "info" or "error"
// ....................... data message format .................................
// ============================================================================
*/

// ============================================================================
//                           IMPORTS/VARIABLES
// ============================================================================
// Description: Import/Variable section
// ----------------------------- notes ----------------------------------------
// none
// ============================================================================
import * as childprocess from "child_process";
import process from 'node:process';
import { Server } from "socket.io";
import * as cm from "./common.js";
import * as logger from "./logger.js";
import * as mh from "./message_handlers.js";
import * as v8 from 'node:v8';
import { Buffer } from 'buffer';

const channels = [];
let extensions = {};// all extensions
let connected_extensionlist = [];// extensions with a socket connection
let backend_server = null;
let server_socket = null;
let config = {};
let monitorHeapStats = false;
//monitor data over websocket
let monitorSocketReceivedData = false;
let socketReceivedSize = 0;
let monitorDataHandle = 0;
// these maintain a list of extensions that have requested an extension list
// it only handles 'connected' extensions and will ignore extensions without 
// a valid socket connection
let extensionlist_requesters = []
let extensionlist_requesters_handles = []
let all_extensionlist_requesters = []
let all_extensionlist_requesters_handles = []

// ============================================================================
//                           FUNCTION: start
// ============================================================================
/**
 * Starts the server
 * @param {Express} app 
 * @param {Object} server
 * @param {Array} exts 
 */
function start (app, server, exts)
{
    // create our extension array
    exts.forEach((elem, i) =>
    {
        extensions[elem] = {};
    });
    backend_server = server;
    cm.initcrypto();
    config = cm.loadConfig('datacenter');
    //setup our server socket on the http server
    try
    {
        // note. can't use the api functions here as we are creating a server socket
        server_socket = new Server(server, {
            transports: ["websocket"],
            maxHttpBufferSize: 1e8,// 100MB had to increase for flight sim
        });
        logger.log("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.start", "Server is running and waiting for clients");
        if (monitorSocketReceivedData)
            dataMonitorScheduler();
        // wait for messages
        server_socket.on("connection", (socket) =>
        {
            // call onConnect to store the connection details
            if (monitorSocketReceivedData)
            {
                onConnect(socket);
                socket.on("disconnect", (reason) => onDisconnect(socket, reason));
                socket.on("message", (data) => socketReceiveDebug(socket, data));
            }
            else
            {
                onConnect(socket);
                socket.on("disconnect", (reason) => onDisconnect(socket, reason));
                socket.on("message", (data) => onMessage(socket, data));
            }
        });
    } catch (err)
    {
        logger.err("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.start", "server startup failed:", err);
    }
}
// ============================================================================
//                           FUNCTION: onConnect
// ============================================================================
/**
 * receives connect messages
 * @param {Socket} socket socket received on
 */
function onConnect (socket)
{
    //logger.log("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onConnect", socket.id);
    socket.emit("connected", socket.id);
}
// ============================================================================
//                           FUNCTION: onDisconnect
// ============================================================================
/**
 * receive disconnect messages
 * @param {Socket} socket 
 * @param {string} reason 
 */
function onDisconnect (socket, reason)
{
    // set the extension as disconnected
    let ext = Object.keys(extensions).find((entry) => (extensions[entry].socket != undefined && extensions[entry].socket.id == socket.id))
    connected_extensionlist[ext] = socket.connected
    // the disconnected extension previously was on our requesters list so we need to remove it
    if (extensionlist_requesters.includes(ext))
    {
        clearTimeout(extensionlist_requesters_handles[ext])
        extensionlist_requesters.splice(extensionlist_requesters.indexOf(ext), 1)
        // update anyone who has previously reqested an extension list if we have changed it
        updateExtensionsListRequesters()
    }
    logger.info("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onDisconnect", reason, socket);
}
// ============================================================================
//                           FUNCTION: onMessage
// ============================================================================
/**
 * receives messages
 * @param {Socket} socket 
 * @param {Object} server_packet 
 */
function onMessage (socket, server_packet)
{
    //logger.err("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage", server_packet);
    // make sure we are using the same api version
    if (server_packet.version != config.apiVersion)
    {
        logger.err("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage",
            "Version mismatch:", server_packet.version, "!=", config.apiVersion);

        logger.info("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage",
            "!!!!!!! Message System API version doesn't match: ", server_packet);
        mh.errorMessage(socket, "Incorrect api version", server_packet);
        return;
    }

    // check that the sender has sent a name and id
    if (!server_packet.type || !server_packet.from || !server_packet.type === "" || !server_packet.from === "")
    {

        console.log(JSON.stringify(server_packet, null, 2))
        logger.err("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage",
            "!!!!!!! Invalid data: ", server_packet);
        mh.errorMessage(socket, "Missing type/from field", server_packet);
        return;
    }
    // add this socket to the extension if it doesn't already exist
    if (typeof (extensions[server_packet.from]) === "undefined" || !extensions[server_packet.from])
        extensions[server_packet.from] = {};
    // check we have a valid socket for this extension, don't need to check if the extension exists as it will have been added above
    if (typeof (extensions[server_packet.from].socket) === "undefined" || !extensions[server_packet.from].socket)
    {
        logger.log("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage", "registering new socket for ", server_packet.from);
        extensions[server_packet.from].socket = socket;
        connected_extensionlist[server_packet.from] = extensions[server_packet.from].socket.connected
        // if we add a new extension send the list out to anyone who has requested it so far to update them
        updateExtensionsListRequesters()
    }
    else
    {
        // note that we currently only have one slot per connection. this works for extensions that are only loaded once
        // but for webpage stuff we will need to allow more than one sockect for that extension name
        // we need to append the socket id to the extension name to fix this!!!
        if (extensions[server_packet.from].socket.id != socket.id)
        {
            logger.warn("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage", "Extension socket id changed for " + server_packet.from);
            logger.warn("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage", "Data ", server_packet);
            logger.warn("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage", "Previous id " + extensions[server_packet.from].socket.id);
            logger.warn("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage", "New id " + socket.id);
            //update the extensions socket as it has changed
            extensions[server_packet.from].socket = socket;

        }
        connected_extensionlist[server_packet.from] = extensions[server_packet.from].socket.connected
    }
    // process the clients request
    if (server_packet.type === "RequestSoftwareVersion")
        mh.sendSoftwareVersion(socket, server_packet.from);
    else if (server_packet.type === "RequestConfig")
        mh.sendConfig(socket, server_packet.from);
    else if (server_packet.type === "SaveConfig")
        mh.saveConfig(server_packet.from, server_packet.data);
    else if (server_packet.type === "RequestData")
        mh.sendData(socket, server_packet.from);
    else if (server_packet.type === "SaveData")
        mh.saveData(server_packet.from, server_packet.data);
    else if (server_packet.type === "StopServer")
        process.exit(0);
    else if (server_packet.type == "RestartServer")
    {
        console.log("Restarting...");

        //const child = childprocess.exec('start "Streamroller" node backend\\data_center\\server.js 5',
        const child = childprocess.exec('start "Streamroller" node_modules\\.bin\\node.exe backend\\data_center\\server.js 5',
            {
                detached: true,
                stdio: 'ignore'
            })
        child.unref();
        setTimeout(() =>
        {
            backend_server.close();
            process.exit();
        }, 1000);
    }
    else if (server_packet.type === "UpdateCredentials")
    {
        mh.UpdateCredentials(server_packet.data);
        //resend new credentials to extension
        mh.RetrieveCredentials(server_packet.data.ExtensionName, extensions);
    }
    else if (server_packet.type === "RequestCredentials")
        mh.RetrieveCredentials(server_packet.from, extensions);
    else if (server_packet.type === "DeleteCredentials")
        mh.DeleteCredentials(server_packet.from);
    else if (server_packet.type === "RequestExtensionsList")
    {
        if (!extensionlist_requesters.includes(server_packet.from))
            extensionlist_requesters.push(server_packet.from)
        mh.sendExtensionList(socket, server_packet.from, connected_extensionlist);
    }
    else if (server_packet.type === "RequestAllExtensionsList")
    {
        if (!all_extensionlist_requesters.includes(server_packet.from))
            all_extensionlist_requesters.push(server_packet.from)
        mh.sendExtensionList(socket, server_packet.from, extensions);
    }
    else if (server_packet.type === "RequestChannelsList")
        mh.sendChannelList(socket, server_packet.from, channels);
    else if (server_packet.type === "CreateChannel")
        mh.createChannel(server_socket, socket, server_packet.data, channels, server_packet.from);
    else if (server_packet.type === "JoinChannel")
        mh.joinChannel(server_socket, socket, server_packet.data, channels, server_packet.from);
    else if (server_packet.type === "LeaveChannel")
        mh.leaveChannel(socket, server_packet.from, server_packet.data);
    else if (server_packet.type === "ExtensionMessage")
    {
        if (server_packet.to === undefined)
            mh.errorMessage(socket, "No extension name specified for ExtensionMessage", server_packet);
        else
            mh.forwardMessage(socket, server_packet, channels, extensions);
    }
    else if (server_packet.type === "ChannelData")
    {
        if (server_packet.dest_channel === undefined)
            mh.errorMessage(socket, "No dest_channel specified for ChannelData", server_packet);
        else
            mh.forwardMessage(socket, server_packet, channels, extensions);
    }
    else if (server_packet.type === "SetLoggingLevel")
    {
        logger.log("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage", "logging set to level ", server_packet.data);
        mh.setLoggingLevel(server_socket, server_packet.data);
        config.logginglevel = server_packet.data;
        cm.saveConfig(config.extensionname, config);
    }
    else if (server_packet.type === "RequestLoggingLevel")
        mh.sendLoggingLevel(socket);
    else if (server_packet.type === "MonitorHeap")
    {
        // if we are turning it on then start the monitoring process
        if (server_packet.data == 1 && !monitorHeapStats)
        {
            monitorHeapStats = true;
            heapStatsScheduler()
        }
        else
            monitorHeapStats = false;
    }
    else
    {
        logger.err("[" + config.SYSTEM_LOGGING_TAG + "]server_socket.onMessage", "Unhandled message");
        console.log(JSON.stringify(server_packet, null, 2))
    }

}
// ============================================================================
//                      updateExtensionsListRequesters()
// ===========================================================================
function updateExtensionsListRequesters ()
{
    // update users who requested extensions with sockets
    for (let i = 0; i < extensionlist_requesters.length; i++)
    {
        // buffer the extensionlist messages using a timer
        clearTimeout(extensionlist_requesters_handles[extensionlist_requesters[i]])
        extensionlist_requesters_handles[extensionlist_requesters[i]] = setTimeout(() =>
        {
            // send the connected extension list
            mh.sendExtensionList(extensions[extensionlist_requesters[i]].socket, extensionlist_requesters[i], connected_extensionlist);
        }, 2000);
    }
    // update users who have requested the full list (even extensions without sockets)
    for (let i = 0; i < all_extensionlist_requesters.length; i++)
    {
        // buffer the extensionlist messages using a timer
        clearTimeout(all_extensionlist_requesters_handles[all_extensionlist_requesters[i]])
        all_extensionlist_requesters_handles[all_extensionlist_requesters[i]] = setTimeout(() =>
        {
            // send the connected extension list
            mh.sendExtensionList(extensions[all_extensionlist_requesters[i]].socket, all_extensionlist_requesters[i], extensions);
        }, 2000);
    }
}

// ===========================================================================
//                      heapStatsScheduler()
// ===========================================================================


// DEBUG monitoring heap status
function heapStatsScheduler ()
{
    setTimeout(() =>
    {
        dumpHeapStats()
    }
        , 1000);
}
function dumpHeapStats ()
{
    let maxHeap = v8.getHeapStatistics().heap_size_limit;
    let heap = v8.getHeapStatistics();
    let used_heap_size = heap.used_heap_size
    console.log("server Heap usage \t" + ((used_heap_size / maxHeap) * 100).toFixed(2) + "%", heap.number_of_native_contexts, heap.number_of_detached_contexts)
    if (monitorHeapStats)
        heapStatsScheduler()
}
//end Debugheap

// ===========================================================================
//                      DataMonitor
// ===========================================================================
function socketReceiveDebug (socket, server_packet)
{
    socketReceivedSize = socketReceivedSize + Buffer.byteLength(JSON.stringify(server_packet))
    onMessage(socket, server_packet)
}
function dataMonitorScheduler ()
{
    mh.sendDataLoad(server_socket, socketReceivedSize);
    socketReceivedSize = 0;
    if (monitorSocketReceivedData)
    {

        clearTimeout(monitorDataHandle);
        monitorDataHandle = setTimeout(() =>
        {
            dataMonitorScheduler();
        }, "1000");
    }
}
// ============================================================================
//                           EXPORTS:
// ============================================================================
export { start };