HEX
Server: Apache
System: Linux vps.rockyroadprinting.net 4.18.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User: rockyroadprintin (1011)
PHP: 8.2.29
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/simple_rpc/analyst_cleanup.py
import warnings

from logging import getLogger
from datetime import datetime, timedelta

from defence360agent.rpc_tools import ValidationError
from defence360agent.rpc_tools.lookup import RootEndpoints, bind
import defence360agent.subsys.panels.hosting_panel as hp

from defence360agent.utils.support import send_request
from defence360agent.utils.sshutil import (
    get_ssh_port,
    check_ssh_connection,
    install_pub_key,
)
from defence360agent.model.analyst_cleanup import AnalystCleanupRequest
from defence360agent.api.server.analyst_cleanup import AnalystCleanupAPI

logger = getLogger(__name__)

PREPARE_SERVER_GUIDE = "https://cloudlinux.zendesk.com/hc/en-us/articles/6245743410460-How-to-authenticate-your-server-for-Support-Team-and-use-the-SSH-access-form"
ZENDESK_REGISTRATION_URL = (
    "https://cloudlinux.zendesk.com/auth/v2/login/registration"
)


class AnalystCleanupEndpoints(RootEndpoints):
    async def _create_zendesk_ticket(
        self,
        email,
        subject,
        full_description,
    ) -> (str, str):
        """
        Creates a Zendesk ticket and return link and id of the ticket
        On any error raises ValidationError, which would be added to RPC answer
        """
        # Create Zendesk ticket
        try:
            ticket_url = await send_request(
                email,
                subject,
                full_description,
            )
            logger.info(f"Created ticket on url {ticket_url}")
            if ticket_url:
                # Extract Zendesk ID from URL
                return ticket_url, ticket_url.split("/")[-1]
            else:
                raise ValidationError("Failed to create support ticket")

        except Exception as e:
            logger.error(f"Failed to process cleanup request: {e}")
            raise ValidationError(
                f"Failed to process cleanup request: {str(e)}"
            )

    @bind("analyst-cleanup", "request")
    async def request_cleanup(self, email, username, message):
        """Handle analyst cleanup request"""
        # Check active tickets
        if active_ticket := AnalystCleanupRequest.get_active_request_link(
            username
        ):
            raise ValidationError(
                "You already have an active request for cleaning this user."
                " If you have additional information, you may follow"
                f" the link and provide new data here: {active_ticket}"
            )
        # Check if cleanup is allowed
        if not (await AnalystCleanupAPI.check_cleanup_allowed()):
            raise ValidationError(
                "You are not authorized to submit Analyst Cleanup requests. "
                "Contact sales@cloudlinux.com to get access"
            )
        email_status = await AnalystCleanupAPI.check_registered(email)
        # Check if email is registered
        if not email_status.get("result", False):
            raise ValidationError(
                f"{email_status.get('message', '')} Couldn't register"
                " your email in our Zendesk system. You can make it manually"
                f" by following the link {ZENDESK_REGISTRATION_URL} and then"
                " try sending the request again."
            )

        if email_status.get("is_new", False):
            warnings.warn(
                "We’ve set up a Zendesk account for you! To complete your"
                " registration, check your email and click the “Reset"
                " Password” button."
            )

        # Install public key
        key_installed = await install_pub_key(username)

        # Get SSH port and check connection
        ssh_port = await get_ssh_port()
        connection_ok = await check_ssh_connection(ssh_port)

        # Prepare ticket subject and description
        subject = "Analyst Cleanup Request"
        server_access = (
            f"{hp.HostingPanel().get_server_ip()}:{ssh_port}/{username}"
        )
        full_description = (
            f"Username: {username}\n"
            f"Server Access: {server_access}\n\n"
            f"Customer Message:\n{message}\n\n"
        )
        if not key_installed:
            warnings.warn("Support SSH public key is not installed", Warning)
            full_description += (
                "\n\nWARNING: Not able to install analyst's public key\n"
                " Please make it manually by reffering to"
                f" {PREPARE_SERVER_GUIDE}\n and provide credentials "
                "to zendesk ticket"
            )
        elif not connection_ok:
            warnings.warn("SSH connection test failed", Warning)
            full_description += (
                "\n\nWARNING: SSH connection test failed. Please verify SSH"
                " access and refer to the access request form."
            )

        # Create Zendesk ticket
        # In a case of no url|id
        # ValidationError is raised from _create_zendesk_ticket
        ticket_url, ticket_id = await self._create_zendesk_ticket(
            email,
            subject,
            full_description,
        )
        # Store request in database
        AnalystCleanupRequest.create_request(
            username=username,
            zendesk_id=ticket_id,
            ticket_link=ticket_url,
        )
        return {"items": {"ticket_url": ticket_url}}

    @bind("analyst-cleanup", "get-requests")
    async def request_status(self, username=None, limit=50, offset=0):
        """
        Get status of analyst cleanup requests for all or a specific user

        Completed tickets will only be visible for 2 weeks after their last update
        """
        # Get user's requests using the get_user_requests method from the model
        # This will return the most recent requests first (ordered by created_at desc)
        if username is None:
            requests = AnalystCleanupRequest.get_all_requests(limit, offset)
        else:
            requests = AnalystCleanupRequest.get_user_requests(
                username, limit, offset
            )

        # If no requests found, return appropriate response
        if not requests or len(requests) == 0:
            return []

        # Calculate the cutoff date (2 weeks ago)
        two_weeks_ago = datetime.utcnow() - timedelta(weeks=2)
        logger.info(f"Showing requests since {two_weeks_ago}")

        # Filter requests: show all except completed tickets older than 2 weeks
        filtered_requests = [
            {
                "username": req.username,
                "ticket_url": req.ticket_link,
                "status": req.status,
                "created_at": str(datetime.timestamp(req.created_at)),
                "last_update": str(datetime.timestamp(req.last_updated)),
                "zendesk_id": req.zendesk_id,
            }
            for req in requests
            if req.status != "completed" or req.last_updated > two_weeks_ago
        ]
        logger.info(f"Got requests: {filtered_requests}")
        # Return the request details
        return filtered_requests

    @bind("analyst-cleanup", "is-allowed")
    async def is_allowed(self):
        is_allowed = await AnalystCleanupAPI.check_cleanup_allowed()
        return {"items": {"is_allowed": is_allowed}}