D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
psa
/
admin
/
plib
/
modules
/
site-import
/
backend
/
lib
/
python
/
parallels
/
plesk
/
actions
/
Filename :
convert.py
back
Copy
from parallels.core.utils.common import unique_by from parallels.plesk import messages import itertools from parallels.core.actions.base.common_action import CommonAction from parallels.core import target_data_model from parallels.core.utils.common import not_empty class ConvertAuxUserRoles(CommonAction): def get_description(self): return messages.ACTION_CONVERT_AUXILIARY_USER_ROLES_DESCRIPTION def get_failure_message(self, global_context): """ :type global_context: parallels.core.global_context.GlobalMigrationContext """ return messages.ACTION_CONVERT_AUXILIARY_USER_ROLES_FAILURE def run(self, global_context): """ :type global_context: parallels.core.global_context.GlobalMigrationContext """ for target_owner in global_context.target_model.iter_all_owners(): if not target_owner.login: roles = get_admin_aux_user_roles(global_context) else: source_owners = get_source_clients_and_resellers(global_context, target_owner.login) source_user_roles = (c.auxiliary_user_roles for c in source_owners) roles = list(itertools.chain.from_iterable(source_user_roles)) target_owner.auxiliary_user_roles.extend( # Make roles unique to resolve situation when role with the same name # exists on multiple source servers. Permissions set is not checked now, and # no warnings are emitted - this could be improved. unique_by(roles, lambda role: role.name) ) class ConvertAuxUsers(CommonAction): def get_description(self): return messages.ACTION_CONVERT_AUXILIARY_USERS def get_failure_message(self, global_context): """ :type global_context: parallels.core.global_context.GlobalMigrationContext """ return messages.ACTION_CONVERT_AUXILIARY_USERS_FAILURE def run(self, global_context): """Find and add auxusers to the target model. :type global_context: parallels.core.global_context.GlobalMigrationContext """ for target_owner in global_context.target_model.iter_all_owners(): if not target_owner.login: auxuser_list = get_admin_aux_users(global_context) else: source_owner = get_source_clients_and_resellers(global_context, target_owner.login) source_aux_users = (c.auxiliary_users for c in source_owner) auxuser_list = itertools.chain.from_iterable(source_aux_users) for auxuser in auxuser_list: self._add_auxuser(auxuser, target_owner, global_context) def _add_auxuser(self, raw_auxuser, client, context): """Convert aux users, append them to the target model.""" if raw_auxuser.name == 'admin': return converted_auxuser = self._convert_auxuser(raw_auxuser) password = self._get_auxuser_password( raw_auxuser, client.login, context ) converted_auxuser.password = password client.auxiliary_users.append(converted_auxuser) def _convert_auxuser(self, raw_auxuser): """Create aux user object for target model. :type raw_auxuser: parallels.core.dump.data_model.AuxiliaryUser """ personal_info = target_data_model.PersonalInfo( raw_auxuser.personal_info.get('email', self._get_auxuser_email(raw_auxuser)), '', address=raw_auxuser.personal_info.get('address', ''), city=raw_auxuser.personal_info.get('city', ''), state=raw_auxuser.personal_info.get('state', ''), postal_code=raw_auxuser.personal_info.get('zip', ''), locale=raw_auxuser.personal_info.get('locale', None), country_code=not_empty(raw_auxuser.personal_info.get('country'), 'XX'), primary_phone=raw_auxuser.personal_info.get('phone', ''), fax=raw_auxuser.personal_info.get('fax', ''), comment=raw_auxuser.personal_info.get('comment', ''), im=raw_auxuser.personal_info.get('im', ''), im_type=raw_auxuser.personal_info.get('im-type', 'Other') ) if raw_auxuser.contact is not None: auxuser_contact = raw_auxuser.contact else: auxuser_contact = raw_auxuser.name return target_data_model.AuxiliaryUser( login=raw_auxuser.name, password=None, name=auxuser_contact, roles=set(raw_auxuser.roles), personal_info=personal_info, is_active=raw_auxuser.is_active, subscription_name=raw_auxuser.subscription_name, is_domain_admin=raw_auxuser.is_domain_admin, external_email=raw_auxuser.external_email, company=raw_auxuser.personal_info.get('company', '') ) def _get_auxuser_email(self, raw_auxuser): # 'email' is mandatory parameter (according to importer), but it # may be missing in Plesk 8 domain admin or mail accounts if raw_auxuser.email: return raw_auxuser.email elif '@' in raw_auxuser.name: return raw_auxuser.name else: return u"admin@%s" % raw_auxuser.name def _get_auxuser_password(self, raw_auxuser, client_login, context): if raw_auxuser.password is not None and raw_auxuser.password.password_type == 'plain': password = raw_auxuser.password.password_text else: password = context.password_holder.get('auxiliary_user', (client_login, raw_auxuser.name)) return password def get_source_clients_and_resellers(context, login): """Search for clients with a given login in all available source panels.""" dumps = (dump for (id, dump) in context.migrator.iter_all_server_raw_dumps()) all_owners = [] for dump in dumps: all_owners.extend(dump.iter_all_clients()) all_owners.extend(dump.iter_resellers()) return [owner for owner in all_owners if owner.login == login] def get_admin_aux_user_roles(context): """Return a list of admin auxiliary user roles from all source panels.""" dumps = (dump for (id, dump) in context.migrator.iter_all_server_raw_dumps()) admin_user_roles = (dump.iter_admin_auxiliary_user_roles() for dump in dumps) return list(itertools.chain(*admin_user_roles)) def get_admin_aux_users(context): """Return a list of admin auxiliary users found in all source panels.""" dumps = (dump for (id, dump) in context.migrator.iter_all_server_raw_dumps()) admin_users = (dump.iter_admin_auxiliary_users() for dump in dumps) return list(itertools.chain(*admin_users))