Package coprs :: Package logic :: Module coprs_logic
[hide private]
[frames] | no frames]

Source Code for Module coprs.logic.coprs_logic

  1  import os 
  2  import time 
  3  import datetime 
  4   
  5  from sqlalchemy import and_ 
  6  from sqlalchemy.sql import func 
  7  from sqlalchemy import asc 
  8  from sqlalchemy.event import listen 
  9  from sqlalchemy.orm.attributes import NEVER_SET 
 10  from sqlalchemy.orm.exc import NoResultFound 
 11  from sqlalchemy.orm.attributes import get_history 
 12   
 13  from copr_common.enums import ActionTypeEnum, BackendResultEnum 
 14  from coprs import db 
 15  from coprs import exceptions 
 16  from coprs import helpers 
 17  from coprs import models 
 18  from coprs.exceptions import MalformedArgumentException, BadRequest 
 19  from coprs.logic import users_logic 
 20  from coprs.whoosheers import CoprWhoosheer 
 21  from coprs.helpers import fix_protocol_for_backend 
 22   
 23  from coprs.logic.actions_logic import ActionsLogic 
 24  from coprs.logic.users_logic import UsersLogic 
25 26 27 -class CoprsLogic(object):
28 """ 29 Used for manipulating Coprs. 30 31 All methods accept user object as a first argument, 32 as this may be needed in future. 33 """ 34 35 @classmethod
36 - def get_all(cls):
37 """ Return all coprs without those which are deleted. """ 38 query = (db.session.query(models.Copr) 39 .join(models.Copr.user) 40 .options(db.contains_eager(models.Copr.user)) 41 .filter(models.Copr.deleted == False)) 42 return query
43 44 @classmethod
45 - def get_by_id(cls, copr_id):
46 return cls.get_all().filter(models.Copr.id == copr_id)
47 48 @classmethod
49 - def attach_build(cls, query):
50 query = (query.outerjoin(models.Copr.builds) 51 .options(db.contains_eager(models.Copr.builds)) 52 .order_by(models.Build.submitted_on.desc())) 53 return query
54 55 @classmethod
56 - def attach_mock_chroots(cls, query):
57 query = (query.outerjoin(*models.Copr.mock_chroots.attr) 58 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 59 .order_by(models.MockChroot.os_release.asc()) 60 .order_by(models.MockChroot.os_version.asc()) 61 .order_by(models.MockChroot.arch.asc())) 62 return query
63 64 @classmethod
65 - def get_multiple_by_username(cls, username, **kwargs):
66 with_builds = kwargs.get("with_builds", False) 67 with_mock_chroots = kwargs.get("with_mock_chroots", False) 68 69 query = ( 70 cls.get_all() 71 .filter(models.User.username == username) 72 ) 73 74 if with_builds: 75 query = cls.attach_build(query) 76 77 if with_mock_chroots: 78 query = cls.attach_mock_chroots(query) 79 80 return query
81 82 @classmethod
83 - def get_multiple_by_group_id(cls, group_id, **kwargs):
84 with_builds = kwargs.get("with_builds", False) 85 with_mock_chroots = kwargs.get("with_mock_chroots", False) 86 87 query = ( 88 cls.get_all() 89 .filter(models.Copr.group_id == group_id) 90 ) 91 92 if with_builds: 93 query = cls.attach_build(query) 94 95 if with_mock_chroots: 96 query = cls.attach_mock_chroots(query) 97 98 return query
99 100 @classmethod
101 - def get(cls, username, coprname, **kwargs):
102 query = cls.get_multiple_by_username(username, **kwargs) 103 query = query.filter(models.Copr.name == coprname) 104 return query
105 106 @classmethod
107 - def get_by_group_id(cls, group_id, coprname, **kwargs):
108 query = cls.get_multiple_by_group_id(group_id, **kwargs) 109 query = query.filter(models.Copr.name == coprname) 110 return query
111 112 @classmethod
113 - def get_multiple(cls, include_deleted=False, include_unlisted_on_hp=True):
114 query = ( 115 db.session.query(models.Copr) 116 .join(models.Copr.user) 117 .outerjoin(models.Group) 118 .options(db.contains_eager(models.Copr.user)) 119 ) 120 121 if not include_deleted: 122 query = query.filter(models.Copr.deleted.is_(False)) 123 124 if not include_unlisted_on_hp: 125 query = query.filter(models.Copr.unlisted_on_hp.is_(False)) 126 127 return query
128 129 @classmethod
130 - def set_query_order(cls, query, desc=False):
131 if desc: 132 query = query.order_by(models.Copr.id.desc()) 133 else: 134 query = query.order_by(models.Copr.id.asc()) 135 return query
136 137 # user_relation="owned", username=username, with_mock_chroots=False 138 @classmethod
139 - def get_multiple_owned_by_username(cls, username):
140 query = cls.get_multiple() 141 return query.filter(models.User.username == username)
142 143 @classmethod
144 - def filter_by_name(cls, query, name):
145 return query.filter(models.Copr.name == name)
146 147 @classmethod
148 - def filter_by_user_name(cls, query, username):
149 # should be already joined with the User table 150 return query.filter(models.User.username == username)
151 152 @classmethod
153 - def filter_by_group_name(cls, query, group_name):
154 # should be already joined with the Group table 155 return query.filter(models.Group.name == group_name)
156 157 @classmethod
158 - def filter_without_group_projects(cls, query):
159 return query.filter(models.Copr.group_id.is_(None))
160 161 @classmethod
162 - def join_builds(cls, query):
163 return (query.outerjoin(models.Copr.builds) 164 .options(db.contains_eager(models.Copr.builds)) 165 .order_by(models.Build.submitted_on.desc()))
166 167 @classmethod
168 - def join_mock_chroots(cls, query):
169 return (query.outerjoin(*models.Copr.mock_chroots.attr) 170 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 171 .order_by(models.MockChroot.os_release.asc()) 172 .order_by(models.MockChroot.os_version.asc()) 173 .order_by(models.MockChroot.arch.asc()))
174 175 @classmethod
176 - def get_playground(cls):
177 return cls.get_all().filter(models.Copr.playground == True)
178 179 @classmethod
180 - def set_playground(cls, user, copr):
181 if user.admin: 182 db.session.add(copr) 183 pass 184 else: 185 raise exceptions.InsufficientRightsException( 186 "User is not a system admin")
187 188 @classmethod
189 - def get_multiple_fulltext(cls, search_string):
190 query = (models.Copr.query.join(models.User) 191 .filter(models.Copr.deleted == False)) 192 if "/" in search_string: # copr search by its full name 193 if search_string[0] == '@': # searching for @group/project 194 group_name = "%{}%".format(search_string.split("/")[0][1:]) 195 project = "%{}%".format(search_string.split("/")[1]) 196 query = query.filter(and_(models.Group.name.ilike(group_name), 197 models.Copr.name.ilike(project), 198 models.Group.id == models.Copr.group_id)) 199 query = query.order_by(asc(func.length(models.Group.name)+func.length(models.Copr.name))) 200 else: # searching for user/project 201 user_name = "%{}%".format(search_string.split("/")[0]) 202 project = "%{}%".format(search_string.split("/")[1]) 203 query = query.filter(and_(models.User.username.ilike(user_name), 204 models.Copr.name.ilike(project), 205 models.User.id == models.Copr.user_id)) 206 query = query.order_by(asc(func.length(models.User.username)+func.length(models.Copr.name))) 207 else: # fulltext search 208 query = query.whooshee_search(search_string, whoosheer=CoprWhoosheer, order_by_relevance=100) 209 return query
210 211 @classmethod
212 - def add(cls, user, name, selected_chroots, repos=None, description=None, 213 instructions=None, check_for_duplicates=False, group=None, persistent=False, 214 auto_prune=True, use_bootstrap_container=False, follow_fedora_branching=False, **kwargs):
215 216 if not user.admin and persistent: 217 raise exceptions.NonAdminCannotCreatePersistentProject() 218 219 if not user.admin and not auto_prune: 220 raise exceptions.NonAdminCannotDisableAutoPrunning() 221 222 # form validation checks for duplicates 223 cls.new(user, name, group, check_for_duplicates=check_for_duplicates) 224 225 copr = models.Copr(name=name, 226 repos=repos or u"", 227 user=user, 228 description=description or u"", 229 instructions=instructions or u"", 230 created_on=int(time.time()), 231 persistent=persistent, 232 auto_prune=auto_prune, 233 use_bootstrap_container=use_bootstrap_container, 234 follow_fedora_branching=follow_fedora_branching, 235 **kwargs) 236 237 238 if group is not None: 239 UsersLogic.raise_if_not_in_group(user, group) 240 copr.group = group 241 242 copr_dir = models.CoprDir( 243 main=True, 244 name=name, 245 copr=copr) 246 247 db.session.add(copr_dir) 248 db.session.add(copr) 249 250 CoprChrootsLogic.new_from_names( 251 copr, selected_chroots) 252 253 db.session.flush() 254 ActionsLogic.send_create_gpg_key(copr) 255 256 return copr
257 258 @classmethod
259 - def new(cls, user, copr_name, group=None, check_for_duplicates=True):
260 if check_for_duplicates: 261 if group is None and cls.exists_for_user(user, copr_name).all(): 262 raise exceptions.DuplicateException( 263 "Copr: '{0}/{1}' already exists".format(user.name, copr_name)) 264 elif group: 265 if cls.exists_for_group(group, copr_name).all(): 266 db.session.rollback() 267 raise exceptions.DuplicateException( 268 "Copr: '@{0}/{1}' already exists".format(group.name, copr_name))
269 270 @classmethod
271 - def update(cls, user, copr):
272 # we should call get_history before other requests, otherwise 273 # the changes would be forgotten 274 if get_history(copr, "name").has_changes(): 275 raise MalformedArgumentException("Change name of the project is forbidden") 276 277 users_logic.UsersLogic.raise_if_cant_update_copr( 278 user, copr, "Only owners and admins may update their projects.") 279 280 if not user.admin and not copr.auto_prune: 281 raise exceptions.NonAdminCannotDisableAutoPrunning() 282 283 db.session.add(copr)
284 285 @classmethod
286 - def delete_unsafe(cls, user, copr):
287 """ 288 Deletes copr without termination of ongoing builds. 289 """ 290 cls.raise_if_cant_delete(user, copr) 291 # TODO: do we want to dump the information somewhere, so that we can 292 # search it in future? 293 cls.raise_if_unfinished_blocking_action( 294 copr, "Can't delete this project," 295 " another operation is in progress: {action}") 296 297 ActionsLogic.send_delete_copr(copr) 298 CoprDirsLogic.delete_all_by_copr(copr) 299 300 copr.deleted = True 301 return copr
302 303 @classmethod
304 - def exists_for_user(cls, user, coprname, incl_deleted=False):
305 existing = (models.Copr.query 306 .filter(models.Copr.name == coprname) 307 .filter(models.Copr.user_id == user.id)) 308 309 if not incl_deleted: 310 existing = existing.filter(models.Copr.deleted == False) 311 312 return cls.filter_without_group_projects(existing)
313 314 @classmethod
315 - def exists_for_group(cls, group, coprname, incl_deleted=False):
316 existing = (models.Copr.query 317 .filter(models.Copr.name == coprname) 318 .filter(models.Copr.group_id == group.id)) 319 320 if not incl_deleted: 321 existing = existing.filter(models.Copr.deleted == False) 322 323 return existing
324 325 @classmethod
326 - def unfinished_blocking_actions_for(cls, copr):
327 blocking_actions = [ActionTypeEnum("delete")] 328 329 actions = (models.Action.query 330 .filter(models.Action.object_type == "copr") 331 .filter(models.Action.object_id == copr.id) 332 .filter(models.Action.result == 333 BackendResultEnum("waiting")) 334 .filter(models.Action.action_type.in_(blocking_actions))) 335 336 return actions
337 338 @classmethod
339 - def get_yum_repos(cls, copr, empty=False):
340 repos = {} 341 release_tmpl = "{chroot.os_release}-{chroot.os_version}-{chroot.arch}" 342 build = models.Build.query.filter(models.Build.copr_id == copr.id).first() 343 if build or empty: 344 for chroot in copr.active_chroots: 345 release = release_tmpl.format(chroot=chroot) 346 repos[release] = fix_protocol_for_backend( 347 os.path.join(copr.repo_url, release + '/')) 348 return repos
349 350 @classmethod
351 - def raise_if_unfinished_blocking_action(cls, copr, message):
352 """ 353 Raise ActionInProgressException if given copr has an unfinished 354 action. Return None otherwise. 355 """ 356 357 unfinished_actions = cls.unfinished_blocking_actions_for(copr).all() 358 if unfinished_actions: 359 raise exceptions.ActionInProgressException( 360 message, unfinished_actions[0])
361 362 @classmethod
363 - def raise_if_cant_delete(cls, user, copr):
364 """ 365 Raise InsufficientRightsException if given copr cant be deleted 366 by given user. Return None otherwise. 367 """ 368 369 if not user.admin and user != copr.user: 370 raise exceptions.InsufficientRightsException( 371 "Only owners may delete their projects.")
372 373 @classmethod
374 - def delete_expired_projects(cls):
375 query = ( 376 models.Copr.query 377 .filter(models.Copr.delete_after.isnot(None)) 378 .filter(models.Copr.delete_after < datetime.datetime.now()) 379 .filter(models.Copr.deleted.isnot(True)) 380 ) 381 for copr in query.all(): 382 print("deleting project '{}'".format(copr.full_name)) 383 CoprsLogic.delete_unsafe(copr.user, copr)
384
385 386 387 -class CoprPermissionsLogic(object):
388 @classmethod
389 - def get(cls, copr, searched_user):
390 query = (models.CoprPermission.query 391 .filter(models.CoprPermission.copr == copr) 392 .filter(models.CoprPermission.user == searched_user)) 393 394 return query
395 396 @classmethod
397 - def get_for_copr(cls, copr):
398 query = models.CoprPermission.query.filter( 399 models.CoprPermission.copr == copr) 400 401 return query
402 403 @classmethod
404 - def get_admins_for_copr(cls, copr):
405 permissions = cls.get_for_copr(copr) 406 return [copr.user] + [p.user for p in permissions if p.copr_admin == helpers.PermissionEnum("approved")]
407 408 @classmethod
409 - def new(cls, copr_permission):
410 db.session.add(copr_permission)
411 412 @classmethod
413 - def update_permissions(cls, user, copr, copr_permission, 414 new_builder, new_admin):
415 416 users_logic.UsersLogic.raise_if_cant_update_copr( 417 user, copr, "Only owners and admins may update" 418 " their projects permissions.") 419 420 (models.CoprPermission.query 421 .filter(models.CoprPermission.copr_id == copr.id) 422 .filter(models.CoprPermission.user_id == copr_permission.user_id) 423 .update({"copr_builder": new_builder, 424 "copr_admin": new_admin}))
425 426 @classmethod
427 - def update_permissions_by_applier(cls, user, copr, copr_permission, new_builder, new_admin):
428 if copr_permission: 429 # preserve approved permissions if set 430 if (not new_builder or 431 copr_permission.copr_builder != helpers.PermissionEnum("approved")): 432 433 copr_permission.copr_builder = new_builder 434 435 if (not new_admin or 436 copr_permission.copr_admin != helpers.PermissionEnum("approved")): 437 438 copr_permission.copr_admin = new_admin 439 else: 440 perm = models.CoprPermission( 441 user=user, 442 copr=copr, 443 copr_builder=new_builder, 444 copr_admin=new_admin) 445 446 cls.new(perm)
447 448 @classmethod
449 - def delete(cls, copr_permission):
450 db.session.delete(copr_permission)
451 452 @classmethod
453 - def validate_permission(cls, user, copr, permission, state):
454 allowed = ['admin', 'builder'] 455 if permission not in allowed: 456 raise BadRequest( 457 "invalid permission '{0}', allowed {1}".format(permission, 458 '|'.join(allowed))) 459 460 allowed = helpers.PermissionEnum.vals.keys() 461 if state not in allowed: 462 raise BadRequest( 463 "invalid '{0}' permission state '{1}', " 464 "use {2}".format(permission, state, '|'.join(allowed))) 465 466 if user.id == copr.user_id: 467 raise BadRequest("user '{0}' is owner of the '{1}' " 468 "project".format(user.name, copr.full_name))
469 470 @classmethod
471 - def set_permissions(cls, request_user, copr, user, permission, state):
472 users_logic.UsersLogic.raise_if_cant_update_copr( 473 request_user, copr, 474 "only owners and admins may update their projects permissions.") 475 476 cls.validate_permission(user, copr, permission, state) 477 478 perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id) 479 perm_o = db.session.merge(perm_o) 480 old_state = perm_o.get_permission(permission) 481 482 new_state = helpers.PermissionEnum(state) 483 perm_o.set_permission(permission, new_state) 484 db.session.merge(perm_o) 485 486 return (old_state, new_state) if old_state != new_state else None
487 488 @classmethod
489 - def request_permission(cls, copr, user, permission, req_bool):
490 approved = helpers.PermissionEnum('approved') 491 state = None 492 if req_bool is True: 493 state = 'request' 494 elif req_bool is False: 495 state = 'nothing' 496 else: 497 raise BadRequest("invalid '{0}' permission request '{1}', " 498 "expected True or False".format(permission, 499 req_bool)) 500 501 cls.validate_permission(user, copr, permission, state) 502 perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id) 503 perm_o = db.session.merge(perm_o) 504 old_state = perm_o.get_permission(permission) 505 if old_state == approved and state == 'request': 506 raise BadRequest("You already are '{0}' in '{1}'".format( 507 permission, copr.full_name)) 508 509 new_state = helpers.PermissionEnum(state) 510 perm_o.set_permission(permission, new_state) 511 512 if old_state != new_state: 513 return (old_state, new_state) 514 return None
515
516 517 -class CoprDirsLogic(object):
518 @classmethod
519 - def get_or_create(cls, copr, dirname, main=False):
520 copr_dir = cls.get_by_copr(copr, dirname).first() 521 522 if copr_dir: 523 return copr_dir 524 525 copr_dir = models.CoprDir( 526 name=dirname, copr=copr, main=main) 527 528 db.session.add(copr_dir) 529 return copr_dir
530 531 @classmethod
532 - def get_by_copr(cls, copr, dirname):
533 return (db.session.query(models.CoprDir) 534 .join(models.Copr) 535 .filter(models.Copr.id==copr.id) 536 .filter(models.CoprDir.name==dirname))
537 538 @classmethod
539 - def get_by_ownername(cls, ownername, dirname):
540 return (db.session.query(models.CoprDir) 541 .filter(models.CoprDir.name==dirname) 542 .filter(models.CoprDir.ownername==ownername))
543 544 @classmethod
545 - def delete(cls, copr_dir):
546 db.session.delete(copr_dir)
547 548 @classmethod
549 - def delete_all_by_copr(cls, copr):
550 for copr_dir in copr.dirs: 551 db.session.delete(copr_dir)
552
553 554 -def on_auto_createrepo_change(target_copr, value_acr, old_value_acr, initiator):
555 """ Emit createrepo action when auto_createrepo re-enabled""" 556 if old_value_acr == NEVER_SET: 557 # created new copr, not interesting 558 return 559 if not old_value_acr and value_acr: 560 # re-enabled 561 ActionsLogic.send_createrepo(target_copr)
562 563 564 listen(models.Copr.auto_createrepo, 'set', on_auto_createrepo_change, 565 active_history=True, retval=False)
566 567 568 -class BranchesLogic(object):
569 @classmethod
570 - def get_or_create(cls, name, session=db.session):
571 item = session.query(models.DistGitBranch).filter_by(name=name).first() 572 if item: 573 return item 574 575 branch = models.DistGitBranch() 576 branch.name = name 577 session.add(branch) 578 return branch
579
580 581 -class CoprChrootsLogic(object):
582 @classmethod
583 - def get_multiple(cls, include_deleted=False):
584 query = models.CoprChroot.query.join(models.Copr) 585 if not include_deleted: 586 query = query.filter(models.Copr.deleted.is_(False)) 587 return query
588 589 @classmethod
590 - def mock_chroots_from_names(cls, names):
591 592 db_chroots = models.MockChroot.query.all() 593 mock_chroots = [] 594 for ch in db_chroots: 595 if ch.name in names: 596 mock_chroots.append(ch) 597 598 return mock_chroots
599 600 @classmethod
601 - def get_by_name(cls, copr, chroot_name, active_only=True):
602 mc = MockChrootsLogic.get_from_name(chroot_name, active_only=active_only).one() 603 query = ( 604 models.CoprChroot.query.join(models.MockChroot) 605 .filter(models.CoprChroot.copr_id == copr.id) 606 .filter(models.MockChroot.id == mc.id) 607 ) 608 return query
609 610 @classmethod
611 - def get_by_name_safe(cls, copr, chroot_name):
612 """ 613 :rtype: models.CoprChroot 614 """ 615 try: 616 return cls.get_by_name(copr, chroot_name).one() 617 except NoResultFound: 618 return None
619 620 @classmethod
621 - def new(cls, mock_chroot):
622 db.session.add(mock_chroot)
623 624 @classmethod
625 - def new_from_names(cls, copr, names):
629 630 @classmethod
631 - def create_chroot(cls, user, copr, mock_chroot, buildroot_pkgs=None, repos=None, comps=None, comps_name=None, 632 with_opts="", without_opts="", 633 delete_after=None, delete_notify=None):
634 """ 635 :type user: models.User 636 :type mock_chroot: models.MockChroot 637 """ 638 if buildroot_pkgs is None: 639 buildroot_pkgs = "" 640 if repos is None: 641 repos = "" 642 UsersLogic.raise_if_cant_update_copr( 643 user, copr, 644 "Only owners and admins may update their projects.") 645 646 chroot = models.CoprChroot(copr=copr, mock_chroot=mock_chroot) 647 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, chroot, 648 with_opts, without_opts, delete_after, delete_notify) 649 return chroot
650 651 @classmethod
652 - def update_chroot(cls, user, copr_chroot, buildroot_pkgs=None, repos=None, comps=None, comps_name=None, 653 with_opts="", without_opts="", delete_after=None, delete_notify=None):
654 """ 655 :type user: models.User 656 :type copr_chroot: models.CoprChroot 657 """ 658 UsersLogic.raise_if_cant_update_copr( 659 user, copr_chroot.copr, 660 "Only owners and admins may update their projects.") 661 662 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, 663 copr_chroot, with_opts, without_opts, delete_after, delete_notify) 664 return copr_chroot
665 666 @classmethod
667 - def _update_chroot(cls, buildroot_pkgs, repos, comps, comps_name, 668 copr_chroot, with_opts, without_opts, delete_after, delete_notify):
669 if buildroot_pkgs is not None: 670 copr_chroot.buildroot_pkgs = buildroot_pkgs 671 672 if repos is not None: 673 copr_chroot.repos = repos.replace("\n", " ") 674 675 if with_opts is not None: 676 copr_chroot.with_opts = with_opts 677 678 if without_opts is not None: 679 copr_chroot.without_opts = without_opts 680 681 if comps_name is not None: 682 copr_chroot.update_comps(comps) 683 copr_chroot.comps_name = comps_name 684 ActionsLogic.send_update_comps(copr_chroot) 685 686 if delete_after is not None: 687 copr_chroot.delete_after = delete_after 688 689 if delete_notify is not None: 690 copr_chroot.delete_notify = delete_notify 691 692 db.session.add(copr_chroot)
693 694 @classmethod
695 - def update_from_names(cls, user, copr, names):
696 UsersLogic.raise_if_cant_update_copr( 697 user, copr, 698 "Only owners and admins may update their projects.") 699 current_chroots = copr.mock_chroots 700 new_chroots = cls.mock_chroots_from_names(names) 701 # add non-existing 702 for mock_chroot in new_chroots: 703 if mock_chroot not in current_chroots: 704 db.session.add( 705 models.CoprChroot(copr=copr, mock_chroot=mock_chroot)) 706 707 # delete no more present 708 to_remove = [] 709 for mock_chroot in current_chroots: 710 if mock_chroot in new_chroots: 711 continue 712 if not mock_chroot.is_active: 713 continue 714 # can't delete here, it would change current_chroots and break 715 # iteration 716 to_remove.append(mock_chroot) 717 718 for mc in to_remove: 719 copr.mock_chroots.remove(mc)
720 721 @classmethod
722 - def remove_comps(cls, user, copr_chroot):
723 UsersLogic.raise_if_cant_update_copr( 724 user, copr_chroot.copr, 725 "Only owners and admins may update their projects.") 726 727 copr_chroot.comps_name = None 728 copr_chroot.comps_zlib = None 729 ActionsLogic.send_update_comps(copr_chroot) 730 db.session.add(copr_chroot)
731 732 @classmethod
733 - def remove_copr_chroot(cls, user, copr_chroot):
734 """ 735 :param models.CoprChroot chroot: 736 """ 737 UsersLogic.raise_if_cant_update_copr( 738 user, copr_chroot.copr, 739 "Only owners and admins may update their projects.") 740 741 db.session.delete(copr_chroot)
742 743 @classmethod
744 - def filter_outdated(cls, query):
745 return query.filter(models.CoprChroot.delete_after >= datetime.datetime.now())
746 747 @classmethod
748 - def filter_outdated_to_be_deleted(cls, query):
749 return query.filter(models.CoprChroot.delete_after < datetime.datetime.now())
750
751 752 -class MockChrootsLogic(object):
753 @classmethod
754 - def get(cls, os_release, os_version, arch, active_only=False, noarch=False):
764 765 @classmethod
766 - def get_from_name(cls, chroot_name, active_only=False, noarch=False):
767 """ 768 chroot_name should be os-version-architecture, e.g. fedora-rawhide-x86_64 769 the architecture could be optional with noarch=True 770 771 Return MockChroot object for textual representation of chroot 772 """ 773 774 name_tuple = cls.tuple_from_name(chroot_name, noarch=noarch) 775 return cls.get(name_tuple[0], name_tuple[1], name_tuple[2], 776 active_only=active_only, noarch=noarch)
777 778 @classmethod
779 - def get_multiple(cls, active_only=False):
780 query = models.MockChroot.query 781 if active_only: 782 query = query.filter(models.MockChroot.is_active == True) 783 return query
784 785 @classmethod
786 - def add(cls, name):
787 name_tuple = cls.tuple_from_name(name) 788 if cls.get(*name_tuple).first(): 789 raise exceptions.DuplicateException( 790 "Mock chroot with this name already exists.") 791 new_chroot = models.MockChroot(os_release=name_tuple[0], 792 os_version=name_tuple[1], 793 arch=name_tuple[2]) 794 cls.new(new_chroot) 795 return new_chroot
796 797 @classmethod
798 - def active_names(cls):
799 return [ch.name for ch in cls.get_multiple(active_only=True).all()]
800 801 @classmethod
802 - def new(cls, mock_chroot):
803 db.session.add(mock_chroot)
804 805 @classmethod
806 - def edit_by_name(cls, name, is_active):
807 name_tuple = cls.tuple_from_name(name) 808 mock_chroot = cls.get(*name_tuple).first() 809 if not mock_chroot: 810 raise exceptions.NotFoundException( 811 "Mock chroot with this name doesn't exist.") 812 813 mock_chroot.is_active = is_active 814 cls.update(mock_chroot) 815 return mock_chroot
816 817 @classmethod
818 - def update(cls, mock_chroot):
819 db.session.add(mock_chroot)
820 821 @classmethod
822 - def delete_by_name(cls, name):
823 name_tuple = cls.tuple_from_name(name) 824 mock_chroot = cls.get(*name_tuple).first() 825 if not mock_chroot: 826 raise exceptions.NotFoundException( 827 "Mock chroot with this name doesn't exist.") 828 829 cls.delete(mock_chroot)
830 831 @classmethod
832 - def delete(cls, mock_chroot):
833 db.session.delete(mock_chroot)
834 835 @classmethod
836 - def tuple_from_name(cls, name, noarch=False):
837 """ 838 input should be os-version-architecture, e.g. fedora-rawhide-x86_64 839 840 the architecture could be optional with noarch=True 841 842 returns ("os", "version", "arch") or ("os", "version", None) 843 """ 844 split_name = name.rsplit("-", 1) if noarch else name.rsplit("-", 2) 845 846 valid = False 847 if noarch and len(split_name) in [2, 3]: 848 valid = True 849 if not noarch and len(split_name) == 3: 850 valid = True 851 852 if not valid: 853 raise MalformedArgumentException( 854 "Chroot name is not valid") 855 856 if noarch and len(split_name) == 2: 857 split_name.append(None) 858 859 return tuple(split_name)
860 861 @classmethod
862 - def prunerepo_finished(cls, chroots_pruned):
863 for chroot_name in chroots_pruned: 864 chroot = cls.get_from_name(chroot_name).one() 865 if not chroot.is_active: 866 chroot.final_prunerepo_done = True 867 868 db.session.commit() 869 return True
870 871 @classmethod
873 query = models.MockChroot.query 874 chroots = {} 875 for chroot in query: 876 if chroot.is_active or not chroot.final_prunerepo_done: 877 chroots[chroot.name] = chroot.is_active 878 879 return chroots
880