Source: extensions/users/users.js

/**
 *      StreamRoller Copyright 2023 "SilenusTA https://www.twitch.tv/olddepressedgamer"
 * 
 *      StreamRoller is an all in one streaming solution designed to give a single
 *      'second monitor' control page and allow easy integration for configuring
 *      content (ie. tweets linked to chat, overlays triggered by messages, hue lights
 *      controlled by donations etc)
 * 
 *      This program is free software: you can redistribute it and/or modify
 *      it under the terms of the GNU Affero General Public License as published
 *      by the Free Software Foundation, either version 3 of the License, or
 *      (at your option) any later version.
 * 
 *      This program is distributed in the hope that it will be useful,
 *      but WITHOUT ANY WARRANTY; without even the implied warranty of
 *      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *      GNU Affero General Public License for more details.
 * 
 *      You should have received a copy of the GNU Affero General Public License
 *      along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
/**
 * @extension Users
 * Collects data on users.i.e. last seen, a buffer of messages etc. 
 * TBD. needs more work due to size of data and finding a useful reason (from a streamers perspective)
 * Awaiting requests for what it should do before advancing the extension
 */
// ############################# users.js ##############################
// This extension handles users/viewers data. ie a place to store channel points etc
// ---------------------------- creation --------------------------------------
// Author: Silenus aka twitch.tv/OldDepressedGamer
// GitHub: https://github.com/SilenusTA/StreamRoller
// Date: 09-Mar-2023
// ============================================================================

// ============================================================================
//                           IMPORTS/VARIABLES
// ============================================================================
import * as fs from "fs";
import * as logger from "../../backend/data_center/modules/logger.js";
import sr_api from "../../backend/data_center/public/streamroller-message-api.cjs";
// these lines are a fix so that ES6 has access to dirname etc
import https from "https";
import { dirname } from "path";
import { fileURLToPath } from "url";
const __dirname = dirname(fileURLToPath(import.meta.url));

// how many times we have attempted to connect on failure
let ServerConnectionAttempts = 0;
let millisecondsInDay = 86400000;
const localConfig = {
    SYSTEM_LOGGING_TAG: "[EXTENSION]",
    DataCenterSocket: null,
    MaxServerConnectionAttempts: 20,
    heartBeatTimeout: 5000,
    heartBeatHandle: null,
};
const default_serverConfig = {
    __version__: 0.1,
    extensionname: "users",
    channel: "USERS_CHANNEL",
    enableusersextension: "off",
    cleardatausersextension: "off",
    maxuserstokeep: "50",
    maxusersmessagestokeep: "100",
    // how often to update the profile data for a user (default 30 days)
    profiletimeout: "30"
};
let serverConfig = structuredClone(default_serverConfig)
const serverData = { userData: { twitch: {}, youtube: {} } }

const triggersandactions =
{
    extensionname: serverConfig.extensionname,
    description: "User interactions",
    version: "0.2",
    channel: serverConfig.channel,
    triggers:
        [
            {
                name: "UsersNewChatter",
                displaytitle: "First Time Poster",
                description: "Someone has posted in the chat for the first time",
                messagetype: "trigger_NewChatter",
                parameters: {
                    name: "",
                    message: "",
                    platform: ""
                }
            }
        ],
}

// ============================================================================
//                           FUNCTION: initialise
// ============================================================================
/**
 * Starts the extension using the given data.
 * @param {object:Express} app 
 * @param {string} host 
 * @param {number} port 
 * @param {number} heartbeat 
 */
function initialise (app, host, port, heartbeat)
{
    if (typeof (heartbeat) != "undefined")
        localConfig.heartBeatTimeout = heartbeat;
    else
        logger.err(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".initialise", "DataCenterSocket no heartbeat passed:", heartbeat);
    try
    {
        localConfig.DataCenterSocket = sr_api.setupConnection(onDataCenterMessage, onDataCenterConnect,
            onDataCenterDisconnect, host, port);
    } catch (err)
    {
        logger.err(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".initialise", "config.DataCenterSocket connection failed:", err);
    }
}

// ============================================================================
//                           FUNCTION: onDataCenterDisconnect
// ============================================================================
/**
 * Disconnection message sent from the server
 * @param {String} reason 
 */
function onDataCenterDisconnect (reason)
{
    logger.log(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".onDataCenterDisconnect", reason);
}
// ============================================================================
//                           FUNCTION: onDataCenterConnect
// ============================================================================
/**
 * Connection message handler
 * @param {*} socket 
 */
function onDataCenterConnect (socket)
{
    logger.log(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".onDataCenterConnect", "Creating our channel");
    sr_api.sendMessage(localConfig.DataCenterSocket,
        sr_api.ServerPacket("RequestConfig", serverConfig.extensionname));
    sr_api.sendMessage(localConfig.DataCenterSocket,
        sr_api.ServerPacket("RequestData", serverConfig.extensionname));
    sr_api.sendMessage(localConfig.DataCenterSocket,
        sr_api.ServerPacket("CreateChannel", serverConfig.extensionname, serverConfig.channel));
    sr_api.sendMessage(localConfig.DataCenterSocket,
        sr_api.ServerPacket("JoinChannel", serverConfig.extensionname, "TWITCH_CHAT"));
    sr_api.sendMessage(localConfig.DataCenterSocket,
        sr_api.ServerPacket("JoinChannel", serverConfig.extensionname, "TWITCH"));
    clearTimeout(localConfig.heartBeatHandle);
    localConfig.heartBeatHandle = setTimeout(heartBeatCallback, localConfig.heartBeatTimeout)

    // get a main list of twitch users who are bots
    getTwitchBotStatusList()
}
// ============================================================================
//                           FUNCTION: onDataCenterMessage
// ============================================================================
/**
 * receives message from the socket
 * @param {data} server_packet 
 */
function onDataCenterMessage (server_packet)
{
    //logger.log(localConfig.SYSTEM_LOGGING_TAG + config.EXTENSION_NAME + ".onDataCenterMessage", "message received ", server_packet);

    if (server_packet.type === "ConfigFile")
    {
        if (server_packet.data != "" && server_packet.to === serverConfig.extensionname)
        {
            if (server_packet.data.__version__ != default_serverConfig.__version__)
            {
                serverConfig = structuredClone(default_serverConfig);
                console.log("\x1b[31m" + serverConfig.extensionname + " ConfigFile Updated", "The config file has been Updated to the latest version v" + default_serverConfig.__version__ + ". Your settings may have changed" + "\x1b[0m");
            }
            else
                serverConfig = structuredClone(server_packet.data);
        }
    }
    else if (server_packet.type === "DataFile" && server_packet.data != "")
    {
        // check it is our data
        if (server_packet.to === serverConfig.extensionname)
            if (server_packet.data.userData)
                serverData.userData = JSON.parse(JSON.stringify(server_packet.data.userData));
    }
    else if (server_packet.type === "ExtensionMessage")
    {
        let extension_packet = server_packet.data;
        if (extension_packet.type === "RequestSettingsWidgetSmallCode")
        {
            SendSettingsWidgetSmall(extension_packet.from);
        }
        else if (extension_packet.type === "SettingsWidgetSmallData")
        {
            if (extension_packet.data.extensionname === serverConfig.extensionname)
            {

                if (serverConfig.cleardatausersextension == "on")
                    serverData.userData = { twitch: {}, youtube: {} };
                SaveDataToServer();

                serverConfig.enableusersextension = "off";
                for (const [key, value] of Object.entries(extension_packet.data))
                    serverConfig[key] = value;
                serverConfig.cleardatausersextension = "off";
                SaveConfigToServer();
                SendSettingsWidgetSmall("");
            }
        }
        else if (extension_packet.type === "SendTriggerAndActions")
        {
            sr_api.sendMessage(localConfig.DataCenterSocket,
                sr_api.ServerPacket("ExtensionMessage",
                    serverConfig.extensionname,
                    sr_api.ExtensionPacket(
                        "TriggerAndActions",
                        serverConfig.extensionname,
                        triggersandactions,
                        "",
                        server_packet.from
                    ),
                    "",
                    server_packet.from
                )
            )
        }
        // add this in to see all messages that are not handled
        /*
        else
            logger.log(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".onDataCenterMessage", "received unhandled ExtensionMessage ", server_packet);
        */

    }
    else if (server_packet.type === "ChannelData")
    {
        let extension_packet = server_packet.data;
        //serverData.userData = {};
        if (serverConfig.enableusersextension == "on")
        {
            if (extension_packet.type === "trigger_ChatJoin" || extension_packet.type === "trigger_ChatMessageReceived")
            {
                let platform = extension_packet.data.parameters.platform
                let username = ""
                if (extension_packet.type === "trigger_ChatJoin")
                    username = extension_packet.data.parameters.username.toLowerCase()
                else
                    username = extension_packet.data.parameters.sender.toLowerCase()
                let timeStamp = Date.now()
                // new platform
                if (serverData.userData[platform] == undefined)
                    serverData.userData[platform] = {};
                // new user
                if (serverData.userData[platform][username] == undefined)
                {
                    getUserIDData(username)
                    serverData.userData[platform][username] = {};
                    serverData.userData[platform][username].firstseen = timeStamp;
                    serverData.userData[platform][username]
                    setTwitchBotStatus(username)
                    sendNewUserTrigger(server_packet.data)
                }
                // check if we need to update the users profile data from twitch
                // ignore users with invalid names (chinese chars etc as twitch fails on that call)
                else if (
                    !serverData.userData[platform][username].userNameInvalid
                    && (serverData.userData[platform][username].lastProfileUpdate == undefined
                        ||
                        (timeStamp - serverData.userData[platform][username].lastProfileUpdate) >
                        (serverConfig.profiletimeout * millisecondsInDay)))
                {
                    getUserIDData(username)
                    setTwitchBotStatus(username)
                }

                serverData.userData[platform][username].lastseen = timeStamp;
                if (extension_packet.type === "trigger_ChatMessageReceived" && extension_packet.data.parameters.message)
                {
                    let message = extension_packet.data.parameters.message
                    if (!serverData.userData[platform][username].messages)
                        serverData.userData[platform][username].messages = [];
                    serverData.userData[platform][username].messages[timeStamp] = message;

                }
                pruneUsers(platform);
                SaveDataToServer();
            }
            //console.log(JSON.stringify(serverData, null, 2))

            else if (extension_packet.type === "trigger_TwitchUserDetails")
            {
                //serverData.userData = {}
                try
                {
                    let platform = "twitch";
                    let username = extension_packet.data.parameters.username.toLowerCase()
                    let timeStamp = Date.now()
                    if (serverData.userData[platform][username] == undefined)
                        serverData.userData[platform][username] = {};

                    for (const [key, value] of Object.entries(extension_packet.data.parameters))
                        serverData.userData[platform][username][key] = value;
                    serverData.userData[platform][username].lastseen = timeStamp;
                    serverData.userData[platform][username].lastProfileUpdate = timeStamp
                    pruneUsers(platform);
                    SaveDataToServer();
                }
                catch (error)
                {
                    logger.log(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + " packet trigger_TwitchUserDetails", error.message);
                }
            }
        }
        // add this in to see all messages that are not handled
        /*
    else
        logger.warn(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".onDataCenterMessage", "received message from unhandled channel ", server_packet.type, server_packet.channel, extension_packet.type);
    */
    }
    else if (server_packet.type === "UnknownChannel")
    {
        if (ServerConnectionAttempts++ < localConfig.MaxServerConnectionAttempts)
        {
            logger.info(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".onDataCenterMessage", "Channel " + server_packet.data + " doesn't exist, scheduling rejoin");
            setTimeout(() =>
            {
                sr_api.sendMessage(localConfig.DataCenterSocket,
                    sr_api.ServerPacket(
                        "JoinChannel", serverConfig.extensionname, server_packet.data
                    ));
            }, 5000);
        }
    }
    else if (server_packet.type === "InvalidMessage")
    {
        logger.err(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".onDataCenterMessage",
            "InvalidMessage ", server_packet.data.error, server_packet);
    }
    else if (server_packet.type === "LoggingLevel")
    {
        logger.setLoggingLevel(server_packet.data)
    }

    // ------------------------------------------------ unknown message type received -----------------------------------------------
    // add this in to see all messages that are not handled
    /*
else
    logger.warn(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname +
        ".onDataCenterMessage", "Unhandled message type", server_packet.type);
        */
}
// ===========================================================================
//                           FUNCTION: pruneUsers
// ===========================================================================
/**
 * Prune the data to keep data sizes manageable
 * @param {string} platform 
 */
function pruneUsers (platform)
{

    let count = 0
    try
    {
        while (Object.keys(serverData.userData[platform]).length > serverConfig.maxuserstokeep)
        {
            // some surge protection. only delete upto 100 at a time
            if (count++ > 100)
                return;

            // prunes message stored
            let currentOldestMessage = 0;
            let oldestMessageId = "";
            let currentOldest = 0;
            let oldestId = "";
            for (const [user, userData] of Object.entries(serverData.userData[platform]))
            {
                //#################
                //find oldest users
                if (user != "botList")
                {
                    //#########################
                    // check number of messages
                    if (userData.messages)
                    {
                        while (Object.entries(userData.messages).length > serverConfig.maxusersmessagestokeep)
                        {
                            for (const [userMessageId, userMessageData] of Object.entries(userData.messages))
                            {
                                if (currentOldestMessage == 0)
                                {
                                    if (userMessageData)
                                        currentOldest = userMessageData.timeStamp;
                                    oldestMessageId = userMessageId
                                }
                                if (userMessageData.timeStamp < currentOldestMessage)
                                {
                                    if (userMessageData)
                                        currentOldestMessage = userMessageData.timeStamp;
                                    oldestMessageId = userMessageId
                                }
                            }
                            // delete the oldest message found
                            if (serverData.userData[platform][user].messages[oldestMessageId])
                                delete serverData.userData[platform][user].messages[oldestMessageId];
                        }
                    }

                    if (currentOldest == 0)
                    {
                        currentOldest = serverData.userData[platform][user].lastseen;
                        oldestId = user
                    }
                    if (serverData.userData[platform][user].lastseen < currentOldest)
                    {
                        currentOldest = serverData.userData[platform][user].lastseen;
                        oldestId = user
                    }
                }
            }
            // delete the oldest user found
            if (serverData.userData[platform][oldestId])
                delete serverData.userData[platform][oldestId];
        }
    }
    catch (error)
    {
        logger.err(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname +
            ".pruneUsers", platform, error.message);
    }
}
// ===========================================================================
//                           FUNCTION: SendSettingsWidgetSmall
// ===========================================================================
/**
 * send some modal code to be displayed on the admin page 
 * @param {String} tochannel 
 */
function SendSettingsWidgetSmall (tochannel)
{

    fs.readFile(__dirname + "/userssettingswidgetsmall.html", function (err, filedata)
    {
        if (err)
            logger.err(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname +
                ".SendSettingsWidgetSmall", "failed to load modal", err);
        //throw err;
        else
        {
            let modalstring = filedata.toString();
            for (const [key, value] of Object.entries(serverConfig))
            {
                if (value === "on")
                    modalstring = modalstring.replace(key + "checked", "checked");
                else if (typeof (value) == "string")
                    modalstring = modalstring.replace(key + "text", value);
            }
            sr_api.sendMessage(localConfig.DataCenterSocket,
                sr_api.ServerPacket(
                    "ExtensionMessage", // this type of message is just forwarded on to the extension
                    serverConfig.extensionname,
                    sr_api.ExtensionPacket(
                        "SettingsWidgetSmallCode", // message type
                        serverConfig.extensionname, //our name
                        modalstring,// data
                        "",
                        tochannel,
                        serverConfig.channel
                    ),
                    "",
                    tochannel // in this case we only need the "to" channel as we will send only to the requester
                ))
        }
    });
}
// ============================================================================
//                           FUNCTION: sendNewUserTrigger
// ============================================================================
/**
 * sends trigger_NewChatter message 
 * @param {object} data 
 */
function sendNewUserTrigger (data)
{
    sr_api.sendMessage(localConfig.DataCenterSocket,
        sr_api.ServerPacket(
            "ChannelData",
            serverConfig.extensionname,
            sr_api.ExtensionPacket(
                "trigger_NewChatter",
                serverConfig.extensionname,
                data,
                serverConfig.channel
            ),
            serverConfig.channel
        ));
}
// ============================================================================
//                           FUNCTION: SaveConfigToServer
// ============================================================================
/**
 * Sends our config to the server to be saved for next time we run
 */
function SaveConfigToServer ()
{
    sr_api.sendMessage(localConfig.DataCenterSocket, sr_api.ServerPacket(
        "SaveConfig",
        serverConfig.extensionname,
        serverConfig))
}
// ============================================================================
//                           FUNCTION: SaveDataToServer
// ============================================================================
/**
 * Save our data to the server
 */
function SaveDataToServer ()
{
    sr_api.sendMessage(localConfig.DataCenterSocket,
        sr_api.ServerPacket(
            "SaveData",
            serverConfig.extensionname,
            serverData));
}
// ===========================================================================
//                           FUNCTION: getTwitchBotStatusList
// ===========================================================================
/**
 * Update our know twitch bot list
 */
async function getTwitchBotStatusList ()
{
    try
    {
        let url = "https://api.twitchinsights.net/v1/bots/all";
        let queryResult = ""
        https.get(url, response =>
        {
            response.on("data", (chunk) =>
            {
                queryResult += chunk;
            })
            response.on("end", () =>
            {
                try
                {
                    let BotsData = JSON.parse(queryResult);
                    let Bots = []
                    let i;
                    // Bots[0] is the timestamp of this data
                    Bots.push(Date.now())
                    for (i in BotsData.bots)
                        Bots.push(BotsData.bots[i][0])

                    serverData.userData["twitch"].botList = Bots;
                    SaveDataToServer()

                } catch (error)
                {
                    console.error(error.message);
                }
            })
        })
            .on('error', err =>
            {
                logger.err(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".getTwitchBotStatusList", "ERROR", "Failed http.get for bot list", err, err.message)
            });

    }
    catch (err)
    {
        if (err._statusCode == 400)
        {
            logger.err(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".getTwitchBotStatusList", "ERROR", "Failed to get bot list");
            console.error(err._body);
        }
        else
        {
            logger.err(localConfig.SYSTEM_LOGGING_TAG + serverConfig.extensionname + ".getTwitchBotStatusList", "ERROR", "Error");
            console.error(err);
        }
    }
}
// ===========================================================================
//                           FUNCTION: setTwitchBotStatus
// ===========================================================================
/**
 * Update user data based on known bot list
 * @param {string} name 
 */
async function setTwitchBotStatus (name)
{

    if (serverData.userData["twitch"].botList && serverData.userData["twitch"].botList.includes(name))
        serverData.userData["twitch"][name].isBot = true;
    else
        serverData.userData["twitch"][name].isBot = false;
    SaveDataToServer()
}
// ===========================================================================
//                           FUNCTION: getUserIDData
// ===========================================================================
/**
 * sends action_TwitchGetUser to twitch extension to get user data back
 * @param {string} username 
 */
function getUserIDData (username)
{
    // request user data so we can store the twitch id with the username
    sr_api.sendMessage(localConfig.DataCenterSocket,
        sr_api.ServerPacket(
            "ExtensionMessage",
            serverConfig.extensionname,
            sr_api.ExtensionPacket(
                "action_TwitchGetUser",
                serverConfig.extensionname,
                { "username": username },
                "",
                "twitch"
            ),
            "",
            "twitch"
        ));
}
// ============================================================================
//                           FUNCTION: heartBeat
// ============================================================================
/**
 * Sends out heartbeat messages so other extensions can see our status
 */
function heartBeatCallback ()
{
    let colour = 'red'

    //is everything conencted and running
    if (serverConfig.enableusersextension === "on")
        colour = 'green'
    else
        colour = "red"

    sr_api.sendMessage(localConfig.DataCenterSocket,
        sr_api.ServerPacket("ChannelData",
            serverConfig.extensionname,
            sr_api.ExtensionPacket(
                "HeartBeat",
                serverConfig.extensionname,
                {
                    color: colour
                },
                serverConfig.channel),
            serverConfig.channel
        ),
    );
    localConfig.heartBeatHandle = setTimeout(heartBeatCallback, localConfig.heartBeatTimeout)
}
// ============================================================================
//                                  EXPORTS
// Note that initialise is mandatory to allow the server to start this extension
// ============================================================================
export { initialise, triggersandactions };