File: //opt/cpmigrate/user.py
"""
Responsible for all things user.
"""
import difflib
import json
import logging
import os
from typing import TYPE_CHECKING
import re
import time
from dataclasses import dataclass
from enum import Enum
import requests
from cpapis import uapi
from journal import save_state
if TYPE_CHECKING:
from transfer import Transfer
@dataclass
class Domain:
"""Holds domain name and if it passed validation."""
name: str
passed: bool = False
class Status(Enum):
"""Status of the User's transfer"""
FAILED_PKG = -1
FAILED_PKG_XFER = -2
FAILED_RSYNC = -3
FAILED_PKG_RESTORE = -4
VERIFICATION_FAILED = -5
NOT_STARTED = 0
STARTED_PKG = 1
FINISHED_PKG = 2
STARTED_PKG_XFER = 3
FINISHED_PKG_XFER = 4
STARTED_PKG_RESTORE = 5
FINISHED_PKG_RESTORE = 6
STARTED_RSYNC = 7
FINISHED_RSYNC = 8
COMPLETED = 9
class User:
"""Primary class for handling all user related tasks."""
def __init__(self, name: str, transfer: 'Transfer'):
self.name = name
self.xfer = transfer
self.pkglogpath: str = '/var/cpanel/pkgacct_sessions'
self.pkgacct_id: str | None = None
self.pkgacct_pid = None
self.restore_id: str | None = None
self.log: str | None = None
self.error_log: str | None = None
self.ipaddr = None
self.running = False
self.status: Status = Status.NOT_STARTED
self.notes: list[str] = []
self.domains: list[Domain] = []
self.journal = transfer.journal
def report(self):
"""Reports on user completion status and any relevant notes"""
message = f'{self.status} '
if len(self.notes) > 0:
message += "- Notes:\n{}".format('\n\n'.join(self.notes))
if self.status != Status.COMPLETED or len(self.notes) > 0:
self.warning(message)
else:
self.info(message)
def error(self, msg, note=False):
"""Logs and prints an error for this user"""
logging.error("(%s) %s", self.name, msg)
if note:
self.notes.append(msg)
def info(self, msg):
"""Logs and prints info for this user"""
logging.info("[User %s] %s", self.name, msg)
def warning(self, msg, note=False):
"""Logs and prints a warning for this user"""
logging.warning("[User %s] %s", self.name, msg)
if note:
self.notes.append(msg)
def is_status(self, status, action=None):
"""Returns whether this user is on the status provided"""
if self.status != status:
if action:
self.warning(f"Skipping {action} as I am not on {status}.")
return False
return True
def legacy_load_domains(self):
"""Legacy handler for loading domains to support older cPanel
versions.
"""
self.load_maindomain()
self.load_addondomains()
self.load_subdomains()
self.load_parkdomains()
def load_maindomain(self):
"""Loads the primary domain for this user."""
data = self.xfer.origin_cpapi2_call(
user=self.name, module='DomainLookup', func='getmaindomain'
)
if data:
main_domain = data[0].get('main_domain')
if main_domain:
self.domains.append(Domain(main_domain))
else:
self.error("Could not retrieve main domain?")
def load_addondomains(self):
"""Loads the addon domains for this user."""
data = self.xfer.origin_cpapi2_call(
user=self.name, module='AddonDomain', func='listaddondomains'
)
if data:
for addon in data:
domain = addon.get('domain')
if '*' not in domain:
self.domains.append(Domain(domain))
def load_parkdomains(self):
"""Loads the parked domains for this user."""
data = self.xfer.origin_cpapi2_call(
user=self.name, module='Park', func='listparkeddomains'
)
if data:
for park in data:
domain = park.get('domain')
if '*' not in domain:
self.domains.append(Domain(domain))
def load_subdomains(self):
"""Loads the subdomains for this user."""
data = self.xfer.origin_cpapi2_call(
user=self.name, module='SubDomain', func='listsubdomains'
)
if data:
for subdomain in data:
domain = subdomain.get('domain')
if '*' not in domain:
self.domains.append(Domain(domain))
@save_state
def start_pkgacct(self):
"""Starts the PKGACCT on the origin server."""
if not self.is_status(Status.NOT_STARTED, 'pkg_acct'):
return
resp = self.xfer.origin_whmapi_call(
'start_background_pkgacct', {'user': self.name, 'skiphomedir': '1'}
)
self.info("Started PKGACCT.")
self.pkgacct_id = resp.get('session_id')
if self.pkgacct_id:
self.pkglogpath = os.path.join(self.pkglogpath, self.pkgacct_id)
log = resp.get('complete_master_log')
error_log = resp.get('complete_master_error_log')
self.log = os.path.join(self.pkglogpath, log)
self.error_log = os.path.join(self.pkglogpath, error_log)
self.status = Status.STARTED_PKG
self.info(f"PKGACCT log: {self.log}")
self.wait_pkglog_exists()
else:
self.status = Status.FAILED_PKG_XFER
self.error("Failed to obtain sessionid for PKGACCT.")
def wait_pkglog_exists(self):
"""Waits for the pkgacct log to exist, which means it finished."""
self.xfer.print_progress("Waiting.")
exists = None
while not exists:
self.xfer.print_progress()
ret_code, _ = self.xfer.origin_command(
f"/usr/bin/test -f {self.log}", quiet=True
)
if ret_code == 0:
exists = True
time.sleep(0.2)
print()
@save_state
def check_pkgacct(self):
"""Confirms that pkgacct did complete on the origin server."""
if not self.is_status(Status.STARTED_PKG):
return
self.info("Confirming that PKGACCT did complete.")
ret_code, _ = self.xfer.origin_command(
f"/bin/grep 'pkgacct completed' {self.log}", sleep=2, quiet=True
)
if ret_code == 0:
self.info("PKGACCT completed successfully.")
self.status = Status.FINISHED_PKG
else:
self.error("PKGACCT appears to have failed.")
self.status = Status.FAILED_PKG
@save_state
def transfer_pkgacct(self):
"""Begins transferring the PKGACCT to the target server."""
if not self.is_status(Status.FINISHED_PKG, 'transfer_pkgacct'):
return
self.info("Starting transfer of PKGACCT.")
self.status = Status.STARTED_PKG_XFER
rsync_success = self.xfer.do_rsync(
origin=(
f'root@{self.xfer.origin_server}:'
f'/home/cpmove-{self.name}.tar.gz'
),
destination='/home/',
name=f'cpmove-{self.name}-xfer',
user=self,
)
if rsync_success:
self.status = Status.FINISHED_PKG_XFER
self.info("Completed PKGACCT transfer successfully.")
else:
self.status = Status.FAILED_PKG_XFER
self.error("PKGACCT transfer has reported as failed.")
@save_state
def restore_pkgacct(self):
"""Ensures package exists and then starts the restorepkg."""
if not self.is_status(Status.FINISHED_PKG_XFER, 'restore_pkg'):
return
self.info("Starting PKGACCT restore.")
pkg_path = f'/home/cpmove-{self.name}.tar.gz'
if not os.path.exists(pkg_path):
self.error(f"Failed to find PKGACCT {pkg_path}.")
self.status = Status.FAILED_PKG_RESTORE
return
resp = self.xfer.whmapi_call(
'start_local_cpmove_restore',
{'cpmovepath': pkg_path, 'username': self.name},
)
self.restore_id = resp.get('transfer_session_id')
if self.restore_id:
self.status = Status.STARTED_PKG_RESTORE
else:
self.error("Failed to start PKGACCT restore.")
@save_state
def check_restorepkg(self):
"""Checks and waits for restorepkg completion."""
if not self.is_status(Status.STARTED_PKG_RESTORE):
return
self.xfer.print_progress("Waiting.")
while True:
state = self.get_restorepkg_state()
if state in ['RESTORE_PENDING', 'RESTORE_INPROGRESS', 'RUNNING']:
self.xfer.print_progress()
elif state in ['FAILED', 'ABORTED']:
print()
self.error("Failed to restore PKGACCT.")
self.status = Status.FAILED_PKG_RESTORE
break
elif state == 'COMPLETED':
print()
self.verify_restorepkg()
break
else:
print()
self.status = Status.FINISHED_PKG_RESTORE
self.error(f"Unknown restore pkgacct state received: {state}")
break
time.sleep(5) # No recursions
def get_restorepkg_state(self):
"""Returns the state for the restorepkg."""
resp = self.xfer.whmapi_call(
'get_transfer_session_state',
{'transfer_session_id': self.restore_id},
)
return resp.get('state_name')
@save_state
def verify_restorepkg(self):
"""Check transfer session logs for failures. Session returns as
COMPLETE even though it fails.
"""
resp = self.xfer.whmapi_call(
'fetch_transfer_session_log',
{'transfer_session_id': self.restore_id, 'logfile': 'master.log'},
)
logs = resp.get('log')
if logs:
for log in logs.split('\n'):
try:
json_log = json.loads(log)
contents = json_log.get('contents')
if contents:
msg = contents.get('msg')
if isinstance(msg, dict):
failure = msg.get('failure')
if failure:
self.error(
f"Restoration of PKGACCT has failed. "
f"Reason: {failure}",
note=True,
)
self.status = Status.FAILED_PKG_RESTORE
return
except json.decoder.JSONDecodeError:
pass # Don't care, cpanel sucks
self.status = Status.FINISHED_PKG_RESTORE
self.info("Restoration of PKGACCT has completed successfully.")
self.cleanup_cpmove()
self.cleanup_cpmove_origin()
@save_state
def cleanup_cpmove(self):
"""Cleans up the cpmove file. To be called after restoration."""
pkg_path = f'/home/cpmove-{self.name}.tar.gz'
if os.path.exists(pkg_path):
os.remove(pkg_path)
self.info(f"Cleaned up cpmove file: {pkg_path}")
@save_state
def cleanup_cpmove_origin(self):
"""
Cleans up the cpmove file on the origin server after restoration.
"""
pkg_path = f'/home/cpmove-{self.name}.tar.gz'
ret_code, _ = self.xfer.origin_command(
f'/bin/rm -f -- {pkg_path}', sleep=1, quiet=True
)
if ret_code == 0:
self.info(f"Cleaned up cpmove file on origin: {pkg_path}")
else:
self.error(f"Failed to clean up cpmove file on origin: {pkg_path}")
@save_state
def rsync_maildir(self):
"""Handles the mail directory RSYNC from origin to target server."""
self.info("Starting mail directory rsync.")
rsync_success = self.xfer.do_rsync(
origin=f'root@{self.xfer.origin_server}:/home/{self.name}/mail/',
destination=f'/home/{self.name}/mail/',
name=f'rsync-mail-{self.name}',
user=self,
)
if rsync_success:
self.info("Completed mail directory rsync successfully.")
else:
self.error("Failed to rsync mail directory.", note=True)
def should_rsync_homedir(self):
"""Checks if we should rsync the home directory."""
return (
self.xfer.check_should_resync()
or self.is_status(Status.FINISHED_PKG_RESTORE)
or self.is_status(Status.STARTED_RSYNC)
)
@save_state
def rsync_homedir(self):
"""Handles the homedir RSYNC from origin to target server."""
if not self.should_rsync_homedir():
self.warning(
"Skipping rsync_homedir as I am not on "
f"{Status.FINISHED_PKG_RESTORE} or {Status.STARTED_RSYNC}."
)
return
self.info("Starting homedir rsync.")
self.status = Status.STARTED_RSYNC
rsync_success = self.xfer.do_rsync(
origin=f'root@{self.xfer.origin_server}:/home/{self.name}/',
destination=f'/home/{self.name}/',
name=f'rsync-{self.name}',
user=self,
)
if rsync_success:
self.status = Status.FINISHED_RSYNC
self.info("Completed homedir rsync successfully.")
else:
self.status = Status.FAILED_RSYNC
@save_state
def test_sites(self):
"""Tests all domains on the account to verify transfer"""
if not self.is_status(Status.FINISHED_RSYNC, 'test_sites'):
return
self.info("Testing all domains match the origin and target.")
if len(self.domains) == 0:
self.status = Status.VERIFICATION_FAILED
return
for domain in self.domains:
lstatus, ltext = self.test_website(
domain=domain.name, ipaddr=self.xfer.my_ipaddr
)
rstatus, rtext = self.test_website(domain.name, self.ipaddr)
if lstatus != rstatus:
self.error(
f"Status code for {domain.name} returned {lstatus} "
f"instead of {rstatus}. Please check manually.",
note=True,
)
self.status = Status.VERIFICATION_FAILED
else:
if rtext == ltext:
self.info(f"{domain.name} passed verification.")
domain.passed = True
continue
diff = difflib.unified_diff(
rtext.splitlines(),
ltext.splitlines(),
fromfile="Origin server",
tofile="Target server (this server)",
)
if self.check_directory_index(rtext, ltext):
self.log_site_diff(domain.name, diff, index=True)
else:
self.log_site_diff(domain.name, diff)
if self.status is not Status.VERIFICATION_FAILED:
self.status = Status.COMPLETED
def check_directory_index(self, site_output1, site_output2):
"""Checks if both site outputs are directory index."""
if 'Index of /' in site_output1:
if 'Index of /' in site_output2:
return True
return False
def log_site_diff(self, domain, diff, index=False):
"""Logs the site output differences, if any."""
log = os.path.join(self.xfer.log_path, f'{domain}-diff.log')
if index:
self.warning(
f"Directory index detected on both servers for {domain}:"
f"\n{log}"
)
else:
self.warning(
f"Difference in site output found for {domain}:\n{log}",
note=True,
)
with open(log, 'w', encoding="utf-8") as out:
out.write(f"Difference in site output found for {domain}: \n")
for line in diff:
out.write(f"\n{line}")
def test_website(self, domain, ipaddr):
"""Tests a website using the provided IP address.
Returns status code and content of site.
Redirects will be followed if within same domain.
Args:
domain (str): domain to be tested
ipaddr (str): IP address of host to connect to
Returns:
[int, str]: status code, content of site
"""
test_domain = domain
resp = self.make_request(test_domain, ipaddr)
redirect_count = 0
try:
while 'Location' in resp.headers and redirect_count < 4:
test_domain = resp.headers['Location']
redirect_count += 1
domain_check = re.match(
fr"https?:\/\/(?:www\.)?{domain}", test_domain
)
if domain_check:
test_domain = resp.url
resp = self.make_request(test_domain, ipaddr)
else:
self.warning(
f"The domain, {domain}, has redirected outside of "
f"context to {test_domain} on {ipaddr}. Cannot "
"properly validate this domain."
)
return [200, '']
return [resp.status_code, resp.text]
except AttributeError:
self.error(
f"Validation test for {domain} on {ipaddr} failed.", note=True
)
return [0, '']
def make_request(self, domain, ipaddr):
"""Makes a request to the IP address for the provided domain.
Args:
domain (str): domain to check
ipaddr (str): IP address of host
Returns:
requests.Response: what is returned from the request
"""
try:
resp = requests.get(
f"http://{ipaddr}/",
verify=False,
headers={'Host': domain, 'Cache-Control': 'no-cache'},
allow_redirects=False,
timeout=20,
)
return resp
except requests.exceptions.RequestException:
self.warning(f"Failed to connect to {domain} on {ipaddr}.")
return None
def uapi_call(self, command, args=None):
"""Makes a UAPI call for this user."""
if not args:
args = {}
data = uapi(command, self.name, args)
result = data.get('result')
if result:
success = bool(result.get('status'))
if success:
return True, result.get('data')
return False, ', '.join(result.get('errors'))
self.error(
f"UAPI call failed. Command: {command}\n"
f"Args: {args}\nResponse: {result}"
)
return False, None
def capture_state(self):
"""Captures the state of the of the User class for journal."""
return {
'name': self.name,
'ipaddr': self.ipaddr,
'pkgacct_id': self.pkgacct_id,
'pkgacct_pid': self.pkgacct_pid,
'restore_id': self.restore_id,
'status': self.status.name,
'notes': self.notes,
'domains': [domain.__dict__ for domain in self.domains],
}
def load_state(self, loadstate):
"""Loads everything needed from the journal for the User class."""
self.ipaddr = loadstate.get('ipaddr')
self.pkgacct_id = loadstate.get('pkgacct_id')
self.pkgacct_pid = loadstate.get('pkgacct_pid')
self.restore_id = loadstate.get('restore_id')
self.status = Status[loadstate.get('status')]
self.notes = loadstate.get('notes')
for domain in loadstate.get('domains'):
self.domains.append(Domain(**domain))
def __repr__(self):
return f"<User {self.name} {self.status.name}>"