D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
psa
/
admin
/
plib
/
modules
/
site-import
/
backend
/
lib
/
python
/
parallels
/
plesk
/
source
/
mail
/
Filename :
utils.py
back
Copy
import json import os import re from parallels.core import MigrationError, safe_format from parallels.core.registry import Registry from parallels.core.runners.base import BaseRunner from parallels.core.runners.entities import ExecutionOptions from parallels.core.utils.common import is_empty from parallels.core.utils.json_utils import read_json from parallels.plesk.source.mail import messages def split_email(email): """Split e-mail address into mail user name (before "@") and domain name (after "@"). :type email: str :rtype: tuple[str] """ parts = email.split('@') if len(parts) != 2: raise MigrationError(safe_format(messages.INVALID_EMAIL_ADDRESS, address=email)) else: return parts def load_mail_migration_json_unencrypted_from_file(filename): """ :type filename: str :rtype: list | dict """ local_server = Registry.get_instance().get_context().migrator_server with local_server.runner() as runner: assert isinstance(runner, BaseRunner) raw_data = runner.get_file_contents(filename) return load_mail_migration_json_unencrypted(raw_data) def load_mail_migration_json_unencrypted(data): """ :type data: str :rtype: list | dict """ extension_id = os.getenv('PLESK_EXTENSION_ID') if is_empty(extension_id): extension_id = 'site-import' local_server = Registry.get_instance().get_context().migrator_server with local_server.runner() as runner: assert isinstance(runner, BaseRunner) command_results = runner.execute_command( u'plesk bin extension --exec {extension_id} read-mail-migration-list.php', dict(extension_id=extension_id), ExecutionOptions( stdin_content=data, log_output=False ) ) # take data between special delimiters to separate payload from debug output # (in some combinations of parameters in panel.ini, Plesk starts to write some of debug output directly to STDOUT) search_result = re.search('MAIL-MIGRATION-LIST>>>(.*)<<<MAIL-MIGRATION-LIST', command_results.stdout, re.DOTALL) if search_result is None: raise Exception(messages.INVALID_READ_MAIL_MIGRATION_LIST_OUTPUT) return json.loads(search_result.group(1)) def load_mail_migration_json_encrypted_from_file(filename): """ :type filename: str :rtype: list | dict """ return read_json(filename)