HEX
Server: Apache
System: Linux vps.rockyroadprinting.net 4.18.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User: rockyroadprintin (1011)
PHP: 8.2.29
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //opt/imunify360/venv/lib64/python3.11/site-packages/imav/malwarelib/difflib/differ.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import logging
import os
import shutil
import tempfile
import time
from functools import cached_property
from io import BytesIO
from pathlib import Path

from peewee import DoesNotExist

from defence360agent.utils import safe_fileops
from imav.contracts.config import Malware as Config
from imav.malwarelib.config import MalwareHitStatus, MalwareScanResourceType
from imav.malwarelib.cleanup.cleaner import MalwareCleaner
from imav.malwarelib.cleanup.storage import CleanupStorage
from imav.malwarelib.model import MalwareHit
from imav.utils import get_files_diff

logger = logging.getLogger(__name__)

IMUNIFY_USER = "_imunify"
IMUNIFY_GROUP = "_imunify"


class DiffError(Exception):
    pass


class SafeFilePath(os.PathLike):
    def __init__(self, path, user=None, missing_ok=False):
        self._path = Path(path)
        self._user = user
        self._missing_ok = missing_ok

    def __str__(self):
        return str(self._path)

    def __fspath__(self):
        return self.__str__()

    def __getattr__(self, attr):
        return getattr(self._path, attr)

    def check_readability(self) -> bool:
        """
        Return True if the file is readable by the user or
        raise UnsafeFileOperation otherwise
        """
        with self.safe_open():
            return True

    def safe_open(self, mode="rb"):
        if self._missing_ok and not self._path.exists():
            return BytesIO(b"")
        if self._user:
            return safe_fileops.safe_open_file(
                self._path,
                mode=mode,
                user=self._user,
                respect_homedir=False,
            )
        else:
            return self.open(mode)


class MalwareHitDiff:
    """
    Used to compare infected and cleaned versions of a malicious file.
    """

    def __init__(self, id: int, user: str = None):
        self._id = id
        self._user = user
        self._cleaner = MalwareCleaner(
            loop=None, sink=None, watch_progress=False
        )

    @cached_property
    def hit(self):
        try:
            return MalwareHit.get(
                MalwareHit.id == self._id,
                MalwareHit.resource_type == MalwareScanResourceType.FILE.value,
                MalwareHit.malicious == True,  # noqa: E712
                *([MalwareHit.user == self._user] * bool(self._user)),
            )
        except DoesNotExist:
            raise DiffError(
                f"No malware file hit found (id={self._id},"
                f" user={self._user})."
            )

    async def get_unified_diff_for_cleaned_file(self) -> bytes:
        diff = b""
        # compare the current cleaned version with the original file
        if self.hit.status in MalwareHitStatus.CLEANED:
            cleaned_file_path = SafeFilePath(
                self.hit.orig_file_path,
                user=self._user,
                missing_ok=True,
            )
            infected_file_path = SafeFilePath(
                CleanupStorage.get_hit_store_path(self.hit),
                user=None,
            )
            diff = await self._get_diff(
                infected_file_path,
                cleaned_file_path,
                cleaned_at=self.hit.cleaned_at,
            )
        else:
            logger.warning(
                "Malware hit has unexpected status=%s. Use the empty diff.",
                self.hit.status,
            )
        return diff

    async def clean_and_get_unified_diff(self) -> bytes:
        diff = b""
        if self.hit.status == MalwareHitStatus.FOUND:  # infected
            # clean copy of file and compare with the original file
            infected_file_path = SafeFilePath(
                self.hit.orig_file_path, user=self._user
            )
            # do not attempt any of the following actions
            # if the user does not have read permissions
            infected_file_path.check_readability()
            with tempfile.NamedTemporaryFile(
                mode="w+", dir=Config.TEMP_CLEANUP_DIR
            ) as temp_file:
                cleaned_file_path = SafeFilePath(
                    temp_file.name, user=None, missing_ok=True
                )
                await safe_fileops.safe_move(
                    self.hit.orig_file,
                    cleaned_file_path,
                    src_unlink=False,
                    dst_overwrite=True,
                    safe_src=False,
                    safe_dst=True,
                )
                # so that procu2.php has access to the file
                shutil.chown(
                    cleaned_file_path, user=IMUNIFY_USER, group=IMUNIFY_GROUP
                )
                result, error, cmd = await self._cleaner.start(
                    IMUNIFY_USER, [str(cleaned_file_path)]
                )
                hit_result = result.get(str(cleaned_file_path))
                if hit_result and (
                    hit_result.is_cleaned() or hit_result.is_removed()
                ):
                    diff = await self._get_diff(
                        infected_file_path,
                        cleaned_file_path,
                        cleaned_at=time.time(),
                    )
                else:
                    logger.warning(
                        "File %s was not cleaned to check diff: %s, %s, %s",
                        self.hit.orig_file,
                        result,
                        error,
                        cmd,
                    )
        else:
            logger.warning(
                "Malware hit has unexpected status=%s. Use the empty diff.",
                self.hit.status,
            )
        return diff

    async def _get_diff(
        self,
        infected_file_path: SafeFilePath,
        cleaned_file_path: SafeFilePath,
        *,
        cleaned_at: float,
    ):
        if not infected_file_path.exists():
            raise FileNotFoundError(
                f"Original file not found for hit(id={self.hit.id})."
            )
        if (
            cleaned_file_path.exists()
            and cleaned_file_path.stat().st_ctime > cleaned_at
        ):
            raise DiffError(
                "The file was modified after cleaning, diff is not valid."
            )
        with infected_file_path.safe_open() as infected_file, cleaned_file_path.safe_open() as cleaned_file:
            # don't block the whole loop while reading files
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
                None, get_files_diff, infected_file, cleaned_file
            )