HEX
Server: Apache
System: Linux host.creative4all.com 4.18.0-553.27.1.el8_10.x86_64 #1 SMP Tue Nov 5 04:50:16 EST 2024 x86_64
User: agrimasfadeltral (1173)
PHP: 8.3.30
Disabled: exec,passthru,shell_exec,system,proc_open,parse_ini_file,show_source
Upload Files
File: //opt/cpmigrate/user.py
"""
Responsible for all things user.
"""

import difflib
import json
import logging
import os
from typing import TYPE_CHECKING
import re
import time
from dataclasses import dataclass
from enum import Enum
import requests
from cpapis import uapi
from journal import save_state

if TYPE_CHECKING:
    from transfer import Transfer


@dataclass
class Domain:
    """Holds domain name and if it passed validation."""

    name: str
    passed: bool = False


class Status(Enum):
    """Status of the User's transfer"""

    FAILED_PKG = -1
    FAILED_PKG_XFER = -2
    FAILED_RSYNC = -3
    FAILED_PKG_RESTORE = -4
    VERIFICATION_FAILED = -5
    NOT_STARTED = 0
    STARTED_PKG = 1
    FINISHED_PKG = 2
    STARTED_PKG_XFER = 3
    FINISHED_PKG_XFER = 4
    STARTED_PKG_RESTORE = 5
    FINISHED_PKG_RESTORE = 6
    STARTED_RSYNC = 7
    FINISHED_RSYNC = 8
    COMPLETED = 9


class User:
    """Primary class for handling all user related tasks."""

    def __init__(self, name: str, transfer: 'Transfer'):
        self.name = name
        self.xfer = transfer
        self.pkglogpath: str = '/var/cpanel/pkgacct_sessions'
        self.pkgacct_id: str | None = None
        self.pkgacct_pid = None
        self.restore_id: str | None = None
        self.log: str | None = None
        self.error_log: str | None = None
        self.ipaddr = None
        self.running = False
        self.status: Status = Status.NOT_STARTED
        self.notes: list[str] = []
        self.domains: list[Domain] = []
        self.journal = transfer.journal

    def report(self):
        """Reports on user completion status and any relevant notes"""
        message = f'{self.status} '
        if len(self.notes) > 0:
            message += "- Notes:\n{}".format('\n\n'.join(self.notes))

        if self.status != Status.COMPLETED or len(self.notes) > 0:
            self.warning(message)
        else:
            self.info(message)

    def error(self, msg, note=False):
        """Logs and prints an error for this user"""
        logging.error("(%s) %s", self.name, msg)
        if note:
            self.notes.append(msg)

    def info(self, msg):
        """Logs and prints info for this user"""
        logging.info("[User %s] %s", self.name, msg)

    def warning(self, msg, note=False):
        """Logs and prints a warning for this user"""
        logging.warning("[User %s] %s", self.name, msg)
        if note:
            self.notes.append(msg)

    def is_status(self, status, action=None):
        """Returns whether this user is on the status provided"""
        if self.status != status:
            if action:
                self.warning(f"Skipping {action} as I am not on {status}.")
            return False
        return True

    def legacy_load_domains(self):
        """Legacy handler for loading domains to support older cPanel
        versions.
        """
        self.load_maindomain()
        self.load_addondomains()
        self.load_subdomains()
        self.load_parkdomains()

    def load_maindomain(self):
        """Loads the primary domain for this user."""
        data = self.xfer.origin_cpapi2_call(
            user=self.name, module='DomainLookup', func='getmaindomain'
        )
        if data:
            main_domain = data[0].get('main_domain')
            if main_domain:
                self.domains.append(Domain(main_domain))
            else:
                self.error("Could not retrieve main domain?")

    def load_addondomains(self):
        """Loads the addon domains for this user."""
        data = self.xfer.origin_cpapi2_call(
            user=self.name, module='AddonDomain', func='listaddondomains'
        )
        if data:
            for addon in data:
                domain = addon.get('domain')
                if '*' not in domain:
                    self.domains.append(Domain(domain))

    def load_parkdomains(self):
        """Loads the parked domains for this user."""
        data = self.xfer.origin_cpapi2_call(
            user=self.name, module='Park', func='listparkeddomains'
        )
        if data:
            for park in data:
                domain = park.get('domain')
                if '*' not in domain:
                    self.domains.append(Domain(domain))

    def load_subdomains(self):
        """Loads the subdomains for this user."""
        data = self.xfer.origin_cpapi2_call(
            user=self.name, module='SubDomain', func='listsubdomains'
        )
        if data:
            for subdomain in data:
                domain = subdomain.get('domain')
                if '*' not in domain:
                    self.domains.append(Domain(domain))

    @save_state
    def start_pkgacct(self):
        """Starts the PKGACCT on the origin server."""
        if not self.is_status(Status.NOT_STARTED, 'pkg_acct'):
            return
        resp = self.xfer.origin_whmapi_call(
            'start_background_pkgacct', {'user': self.name, 'skiphomedir': '1'}
        )
        self.info("Started PKGACCT.")
        self.pkgacct_id = resp.get('session_id')
        if self.pkgacct_id:
            self.pkglogpath = os.path.join(self.pkglogpath, self.pkgacct_id)

            log = resp.get('complete_master_log')
            error_log = resp.get('complete_master_error_log')
            self.log = os.path.join(self.pkglogpath, log)
            self.error_log = os.path.join(self.pkglogpath, error_log)

            self.status = Status.STARTED_PKG
            self.info(f"PKGACCT log: {self.log}")
            self.wait_pkglog_exists()
        else:
            self.status = Status.FAILED_PKG_XFER
            self.error("Failed to obtain sessionid for PKGACCT.")

    def wait_pkglog_exists(self):
        """Waits for the pkgacct log to exist, which means it finished."""
        self.xfer.print_progress("Waiting.")
        exists = None
        while not exists:
            self.xfer.print_progress()
            ret_code, _ = self.xfer.origin_command(
                f"/usr/bin/test -f {self.log}", quiet=True
            )
            if ret_code == 0:
                exists = True
            time.sleep(0.2)
        print()

    @save_state
    def check_pkgacct(self):
        """Confirms that pkgacct did complete on the origin server."""
        if not self.is_status(Status.STARTED_PKG):
            return
        self.info("Confirming that PKGACCT did complete.")
        ret_code, _ = self.xfer.origin_command(
            f"/bin/grep 'pkgacct completed' {self.log}", sleep=2, quiet=True
        )
        if ret_code == 0:
            self.info("PKGACCT completed successfully.")
            self.status = Status.FINISHED_PKG
        else:
            self.error("PKGACCT appears to have failed.")
            self.status = Status.FAILED_PKG

    @save_state
    def transfer_pkgacct(self):
        """Begins transferring the PKGACCT to the target server."""
        if not self.is_status(Status.FINISHED_PKG, 'transfer_pkgacct'):
            return

        self.info("Starting transfer of PKGACCT.")
        self.status = Status.STARTED_PKG_XFER
        rsync_success = self.xfer.do_rsync(
            origin=(
                f'root@{self.xfer.origin_server}:'
                f'/home/cpmove-{self.name}.tar.gz'
            ),
            destination='/home/',
            name=f'cpmove-{self.name}-xfer',
            user=self,
        )
        if rsync_success:
            self.status = Status.FINISHED_PKG_XFER
            self.info("Completed PKGACCT transfer successfully.")
        else:
            self.status = Status.FAILED_PKG_XFER
            self.error("PKGACCT transfer has reported as failed.")

    @save_state
    def restore_pkgacct(self):
        """Ensures package exists and then starts the restorepkg."""
        if not self.is_status(Status.FINISHED_PKG_XFER, 'restore_pkg'):
            return

        self.info("Starting PKGACCT restore.")
        pkg_path = f'/home/cpmove-{self.name}.tar.gz'

        if not os.path.exists(pkg_path):
            self.error(f"Failed to find PKGACCT {pkg_path}.")
            self.status = Status.FAILED_PKG_RESTORE
            return

        resp = self.xfer.whmapi_call(
            'start_local_cpmove_restore',
            {'cpmovepath': pkg_path, 'username': self.name},
        )

        self.restore_id = resp.get('transfer_session_id')
        if self.restore_id:
            self.status = Status.STARTED_PKG_RESTORE
        else:
            self.error("Failed to start PKGACCT restore.")

    @save_state
    def check_restorepkg(self):
        """Checks and waits for restorepkg completion."""
        if not self.is_status(Status.STARTED_PKG_RESTORE):
            return

        self.xfer.print_progress("Waiting.")

        while True:
            state = self.get_restorepkg_state()
            if state in ['RESTORE_PENDING', 'RESTORE_INPROGRESS', 'RUNNING']:
                self.xfer.print_progress()
            elif state in ['FAILED', 'ABORTED']:
                print()
                self.error("Failed to restore PKGACCT.")
                self.status = Status.FAILED_PKG_RESTORE
                break
            elif state == 'COMPLETED':
                print()
                self.verify_restorepkg()
                break
            else:
                print()
                self.status = Status.FINISHED_PKG_RESTORE
                self.error(f"Unknown restore pkgacct state received: {state}")
                break

            time.sleep(5)  # No recursions

    def get_restorepkg_state(self):
        """Returns the state for the restorepkg."""
        resp = self.xfer.whmapi_call(
            'get_transfer_session_state',
            {'transfer_session_id': self.restore_id},
        )
        return resp.get('state_name')

    @save_state
    def verify_restorepkg(self):
        """Check transfer session logs for failures. Session returns as
        COMPLETE even though it fails.
        """
        resp = self.xfer.whmapi_call(
            'fetch_transfer_session_log',
            {'transfer_session_id': self.restore_id, 'logfile': 'master.log'},
        )

        logs = resp.get('log')
        if logs:
            for log in logs.split('\n'):
                try:
                    json_log = json.loads(log)
                    contents = json_log.get('contents')
                    if contents:
                        msg = contents.get('msg')
                        if isinstance(msg, dict):
                            failure = msg.get('failure')
                            if failure:
                                self.error(
                                    f"Restoration of PKGACCT has failed. "
                                    f"Reason: {failure}",
                                    note=True,
                                )
                                self.status = Status.FAILED_PKG_RESTORE
                                return
                except json.decoder.JSONDecodeError:
                    pass  # Don't care, cpanel sucks

        self.status = Status.FINISHED_PKG_RESTORE
        self.info("Restoration of PKGACCT has completed successfully.")
        self.cleanup_cpmove()
        self.cleanup_cpmove_origin()

    @save_state
    def cleanup_cpmove(self):
        """Cleans up the cpmove file. To be called after restoration."""
        pkg_path = f'/home/cpmove-{self.name}.tar.gz'
        if os.path.exists(pkg_path):
            os.remove(pkg_path)
            self.info(f"Cleaned up cpmove file: {pkg_path}")

    @save_state
    def cleanup_cpmove_origin(self):
        """
        Cleans up the cpmove file on the origin server after restoration.
        """
        pkg_path = f'/home/cpmove-{self.name}.tar.gz'
        ret_code, _ = self.xfer.origin_command(
            f'/bin/rm -f -- {pkg_path}', sleep=1, quiet=True
        )

        if ret_code == 0:
            self.info(f"Cleaned up cpmove file on origin: {pkg_path}")
        else:
            self.error(f"Failed to clean up cpmove file on origin: {pkg_path}")

    @save_state
    def rsync_maildir(self):
        """Handles the mail directory RSYNC from origin to target server."""
        self.info("Starting mail directory rsync.")

        rsync_success = self.xfer.do_rsync(
            origin=f'root@{self.xfer.origin_server}:/home/{self.name}/mail/',
            destination=f'/home/{self.name}/mail/',
            name=f'rsync-mail-{self.name}',
            user=self,
        )

        if rsync_success:
            self.info("Completed mail directory rsync successfully.")
        else:
            self.error("Failed to rsync mail directory.", note=True)

    def should_rsync_homedir(self):
        """Checks if we should rsync the home directory."""
        return (
            self.xfer.check_should_resync()
            or self.is_status(Status.FINISHED_PKG_RESTORE)
            or self.is_status(Status.STARTED_RSYNC)
        )

    @save_state
    def rsync_homedir(self):
        """Handles the homedir RSYNC from origin to target server."""
        if not self.should_rsync_homedir():
            self.warning(
                "Skipping rsync_homedir as I am not on "
                f"{Status.FINISHED_PKG_RESTORE} or {Status.STARTED_RSYNC}."
            )
            return

        self.info("Starting homedir rsync.")
        self.status = Status.STARTED_RSYNC
        rsync_success = self.xfer.do_rsync(
            origin=f'root@{self.xfer.origin_server}:/home/{self.name}/',
            destination=f'/home/{self.name}/',
            name=f'rsync-{self.name}',
            user=self,
        )

        if rsync_success:
            self.status = Status.FINISHED_RSYNC
            self.info("Completed homedir rsync successfully.")
        else:
            self.status = Status.FAILED_RSYNC

    @save_state
    def test_sites(self):
        """Tests all domains on the account to verify transfer"""
        if not self.is_status(Status.FINISHED_RSYNC, 'test_sites'):
            return
        self.info("Testing all domains match the origin and target.")
        if len(self.domains) == 0:
            self.status = Status.VERIFICATION_FAILED
            return
        for domain in self.domains:
            lstatus, ltext = self.test_website(
                domain=domain.name, ipaddr=self.xfer.my_ipaddr
            )
            rstatus, rtext = self.test_website(domain.name, self.ipaddr)
            if lstatus != rstatus:
                self.error(
                    f"Status code for {domain.name} returned {lstatus} "
                    f"instead of {rstatus}. Please check manually.",
                    note=True,
                )
                self.status = Status.VERIFICATION_FAILED
            else:
                if rtext == ltext:
                    self.info(f"{domain.name} passed verification.")
                    domain.passed = True
                    continue
                diff = difflib.unified_diff(
                    rtext.splitlines(),
                    ltext.splitlines(),
                    fromfile="Origin server",
                    tofile="Target server (this server)",
                )
                if self.check_directory_index(rtext, ltext):
                    self.log_site_diff(domain.name, diff, index=True)
                else:
                    self.log_site_diff(domain.name, diff)
        if self.status is not Status.VERIFICATION_FAILED:
            self.status = Status.COMPLETED

    def check_directory_index(self, site_output1, site_output2):
        """Checks if both site outputs are directory index."""
        if 'Index of /' in site_output1:
            if 'Index of /' in site_output2:
                return True

        return False

    def log_site_diff(self, domain, diff, index=False):
        """Logs the site output differences, if any."""
        log = os.path.join(self.xfer.log_path, f'{domain}-diff.log')
        if index:
            self.warning(
                f"Directory index detected on both servers for {domain}:"
                f"\n{log}"
            )
        else:
            self.warning(
                f"Difference in site output found for {domain}:\n{log}",
                note=True,
            )
        with open(log, 'w', encoding="utf-8") as out:
            out.write(f"Difference in site output found for {domain}: \n")
            for line in diff:
                out.write(f"\n{line}")

    def test_website(self, domain, ipaddr):
        """Tests a website using the provided IP address.
            Returns status code and content of site.
            Redirects will be followed if within same domain.

        Args:
            domain (str): domain to be tested
            ipaddr (str): IP address of host to connect to

        Returns:
            [int, str]: status code, content of site
        """
        test_domain = domain
        resp = self.make_request(test_domain, ipaddr)
        redirect_count = 0
        try:
            while 'Location' in resp.headers and redirect_count < 4:
                test_domain = resp.headers['Location']
                redirect_count += 1
                domain_check = re.match(
                    fr"https?:\/\/(?:www\.)?{domain}", test_domain
                )
                if domain_check:
                    test_domain = resp.url
                    resp = self.make_request(test_domain, ipaddr)
                else:
                    self.warning(
                        f"The domain, {domain}, has redirected outside of "
                        f"context to {test_domain} on {ipaddr}. Cannot "
                        "properly validate this domain."
                    )
                    return [200, '']
            return [resp.status_code, resp.text]
        except AttributeError:
            self.error(
                f"Validation test for {domain} on {ipaddr} failed.", note=True
            )
            return [0, '']

    def make_request(self, domain, ipaddr):
        """Makes a request to the IP address for the provided domain.

        Args:
            domain (str): domain to check
            ipaddr (str): IP address of host

        Returns:
            requests.Response: what is returned from the request
        """
        try:
            resp = requests.get(
                f"http://{ipaddr}/",
                verify=False,
                headers={'Host': domain, 'Cache-Control': 'no-cache'},
                allow_redirects=False,
                timeout=20,
            )
            return resp
        except requests.exceptions.RequestException:
            self.warning(f"Failed to connect to {domain} on {ipaddr}.")
            return None

    def uapi_call(self, command, args=None):
        """Makes a UAPI call for this user."""
        if not args:
            args = {}
        data = uapi(command, self.name, args)
        result = data.get('result')
        if result:
            success = bool(result.get('status'))
            if success:
                return True, result.get('data')
            return False, ', '.join(result.get('errors'))
        self.error(
            f"UAPI call failed. Command: {command}\n"
            f"Args: {args}\nResponse: {result}"
        )
        return False, None

    def capture_state(self):
        """Captures the state of the of the User class for journal."""
        return {
            'name': self.name,
            'ipaddr': self.ipaddr,
            'pkgacct_id': self.pkgacct_id,
            'pkgacct_pid': self.pkgacct_pid,
            'restore_id': self.restore_id,
            'status': self.status.name,
            'notes': self.notes,
            'domains': [domain.__dict__ for domain in self.domains],
        }

    def load_state(self, loadstate):
        """Loads everything needed from the journal for the User class."""
        self.ipaddr = loadstate.get('ipaddr')
        self.pkgacct_id = loadstate.get('pkgacct_id')
        self.pkgacct_pid = loadstate.get('pkgacct_pid')
        self.restore_id = loadstate.get('restore_id')
        self.status = Status[loadstate.get('status')]
        self.notes = loadstate.get('notes')
        for domain in loadstate.get('domains'):
            self.domains.append(Domain(**domain))

    def __repr__(self):
        return f"<User {self.name} {self.status.name}>"