Script pagure_events_py
[hide private]
[frames] | no frames]

Source Code for Script script-pagure_events_py

  1  #!/usr/bin/python3 
  2   
  3  import json 
  4  import pprint 
  5  import zmq 
  6  import sys 
  7  import os 
  8  import logging 
  9  import requests 
 10  import re 
 11  import munch 
 12   
 13  sys.path.append( 
 14      os.path.dirname(os.path.dirname(os.path.realpath(__file__))) 
 15  ) 
 16   
 17  from coprs import db, app, models 
 18  from coprs.logic.coprs_logic import CoprDirsLogic 
 19  from coprs.logic.builds_logic import BuildsLogic 
 20  from coprs.logic.complex_logic import ComplexLogic 
 21  from coprs.logic.packages_logic import PackagesLogic 
 22  from coprs import helpers 
 23   
 24  from urllib.parse import urlparse 
 25   
 26  SCM_SOURCE_TYPE = helpers.BuildSourceEnum("scm") 
 27   
 28  logging.basicConfig( 
 29      filename='{0}/pagure-events.log'.format(app.config.get('LOG_DIR')), 
 30      format='[%(asctime)s][%(levelname)6s]: %(message)s', 
 31      level=logging.DEBUG) 
 32   
 33  log = logging.getLogger(__name__) 
 34  log.addHandler(logging.StreamHandler(sys.stdout)) 
 35   
 36  if os.getenv('PAGURE_EVENTS_TESTONLY'): 
 37      ENDPOINT = 'tcp://stg.pagure.io:9940' 
 38  else: 
 39      ENDPOINT = 'tcp://hub.fedoraproject.org:9940' 
 40   
 41  log.info("ENDPOINT = {}".format(ENDPOINT)) 
 42   
 43  pagure_instances = { 
 44      'https://pagure.io/':             'io.pagure.prod.pagure', 
 45      'https://src.fedoraproject.org/': 'org.fedoraproject.prod.pagure', 
 46      'https://stg.pagure.io/':         'io.pagure.stg.pagure', # testing only 
 47  } 
 48   
 49  topics = [ 
 50      'git.receive', 
 51      'pull-request.new', 
 52      'pull-request.rebased', 
 53      'pull-request.updated', 
 54      'pull-request.comment.added', 
 55  ] 
 56   
 57  TOPICS = {} 
 58  for url, fedmsg_prefix in pagure_instances.items(): 
 59      for topic in topics: 
 60          TOPICS['{0}.{1}'.format(fedmsg_prefix, topic)] = url 
61 62 -def get_repeatedly(url):
63 log.info("getting url {}".format(url)) 64 for attempt in range(1, 4): 65 r = requests.get(url) 66 if r.status_code == requests.codes.ok: 67 return r.text 68 else: 69 log.error('Bad http status {0} from url {1}, attempt {2}'.format( 70 r.status_code, url, attempt)) 71 # pagure down? 72 return ""
73
74 -class ScmPackage(object):
75 - def __init__(self, db_row):
76 self.source_json_dict = json.loads(db_row.source_json) 77 self.clone_url = self.source_json_dict.get('clone_url') or '' 78 self.committish = self.source_json_dict.get('committish') or '' 79 self.subdirectory = self.source_json_dict.get('subdirectory') or '' 80 81 self.package = ComplexLogic.get_package_by_id_safe(db_row.package_id) 82 self.copr = self.package.copr
83
84 - def build(self, source_dict_update, copr_dir, update_callback, 85 scm_object_type, scm_object_id, scm_object_url):
86 87 if self.package.copr_dir.name != copr_dir.name: 88 package = PackagesLogic.get_or_create(copr_dir, self.package.name, self.package) 89 else: 90 package = self.package 91 92 db.session.execute('LOCK TABLE build IN EXCLUSIVE MODE') 93 return BuildsLogic.rebuild_package( 94 package, source_dict_update, copr_dir, update_callback, 95 scm_object_type, scm_object_id, scm_object_url)
96 97 @classmethod
98 - def get_candidates_for_rebuild(cls, clone_url):
99 if db.engine.url.drivername == 'sqlite': 100 placeholder = '?' 101 true = '1' 102 else: 103 placeholder = '%s' 104 true = 'true' 105 106 rows = db.engine.execute( 107 """ 108 SELECT package.id AS package_id, package.source_json AS source_json, package.copr_id AS copr_id 109 FROM package JOIN copr_dir ON package.copr_dir_id = copr_dir.id 110 WHERE package.source_type = {0} AND 111 package.webhook_rebuild = {1} AND 112 copr_dir.main = {2} AND 113 package.source_json ILIKE {placeholder} 114 """.format(SCM_SOURCE_TYPE, true, true, placeholder=placeholder), '%'+clone_url+'%' 115 ) 116 return [ScmPackage(row) for row in rows]
117 118
119 - def is_dir_in_commit(self, changed_files):
120 if not changed_files: 121 return False 122 123 sm = helpers.SubdirMatch(self.subdirectory) 124 for filename in changed_files: 125 if sm.match(filename): 126 return True 127 128 return False
129
130 131 -def event_info_from_pr_comment(data, base_url):
132 """ 133 Message handler for updated pull-request opened in pagure. 134 Topic: ``*.pagure.pull-request.comment.added`` 135 """ 136 if data['msg']['pullrequest']['status'] != 'Open': 137 log.info('Pull-request not open, discarding.') 138 return False 139 140 if not data['msg']['pullrequest']['comments']: 141 log.info('This is most odd, we\'re not seeing comments.') 142 return False 143 144 last_comment = data['msg']['pullrequest']['comments'][-1] 145 if not last_comment: 146 log.info('Can not access last comment, discarding.') 147 return False 148 149 if not 'comment' in last_comment or '[copr-build]' not in last_comment['comment']: 150 log.info('The [copr-build] is not present in the message.') 151 return False 152 153 return munch.Munch({ 154 'object_id': data['msg']['pullrequest']['id'], 155 'object_type': 'pull-request', 156 'base_project_url_path': data['msg']['pullrequest']['project']['url_path'], 157 'base_clone_url_path': data['msg']['pullrequest']['project']['fullname'], 158 'base_clone_url': base_url + data['msg']['pullrequest']['project']['fullname'], 159 'project_url_path': data['msg']['pullrequest']['repo_from']['url_path'], 160 'clone_url_path': data['msg']['pullrequest']['repo_from']['fullname'], 161 'clone_url': base_url + data['msg']['pullrequest']['repo_from']['fullname'], 162 'branch_from': data['msg']['pullrequest']['branch_from'], 163 'branch_to': data['msg']['pullrequest']['branch'], 164 'start_commit': data['msg']['pullrequest']['commit_start'], 165 'end_commit': data['msg']['pullrequest']['commit_stop'], 166 })
167
168 169 -def event_info_from_pr(data, base_url):
170 """ 171 Message handler for new pull-request opened in pagure. 172 Topic: ``*.pagure.pull-request.new`` 173 """ 174 return munch.Munch({ 175 'object_id': data['msg']['pullrequest']['id'], 176 'object_type': 'pull-request', 177 'base_project_url_path': data['msg']['pullrequest']['project']['url_path'], 178 'base_clone_url_path': data['msg']['pullrequest']['project']['fullname'], 179 'base_clone_url': base_url + data['msg']['pullrequest']['project']['fullname'], 180 'project_url_path': data['msg']['pullrequest']['repo_from']['url_path'], 181 'clone_url_path': data['msg']['pullrequest']['repo_from']['fullname'], 182 'clone_url': base_url + data['msg']['pullrequest']['repo_from']['fullname'], 183 'branch_from': data['msg']['pullrequest']['branch_from'], 184 'branch_to': data['msg']['pullrequest']['branch'], 185 'start_commit': data['msg']['pullrequest']['commit_start'], 186 'end_commit': data['msg']['pullrequest']['commit_stop'], 187 })
188
189 190 -def event_info_from_push(data, base_url):
191 """ 192 Message handler for push event in pagure. 193 Topic: ``*.pagure.git.receive`` 194 """ 195 return munch.Munch({ 196 'object_id': data['msg']['end_commit'], 197 'object_type': 'commit', 198 'base_project_url_path': data['msg']['repo']['url_path'], 199 'base_clone_url_path': data['msg']['repo']['fullname'], 200 'base_clone_url': base_url + data['msg']['repo']['fullname'], 201 'project_url_path': data['msg']['repo']['url_path'], 202 'clone_url_path': data['msg']['repo']['fullname'], 203 'clone_url': base_url + data['msg']['repo']['fullname'], 204 'branch_from': data['msg']['branch'], 205 'branch_to': data['msg']['branch'], 206 'start_commit': data['msg']['start_commit'], 207 'end_commit': data['msg']['end_commit'], 208 })
209
210 211 -def git_compare_urls(url1, url2):
212 url1 = re.sub(r'(\.git)?/*$', '', str(url1)) 213 url2 = re.sub(r'(\.git)?/*$', '', str(url2)) 214 o1 = urlparse(url1) 215 o2 = urlparse(url2) 216 return (o1.netloc == o2.netloc and o1.path == o2.path)
217
218 219 -def build_on_fedmsg_loop():
220 log.debug("Setting up poller...") 221 pp = pprint.PrettyPrinter(width=120) 222 223 ctx = zmq.Context() 224 s = ctx.socket(zmq.SUB) 225 226 # detect server hang/restart (still a chance to loose ~45s events) 227 # for more info see man tcp(7). 228 s.setsockopt(zmq.TCP_KEEPALIVE, 1) # turn on keep-alive 229 s.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 30) # start when 30s inactive 230 s.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 5) # send keep-alive packet each 5s 231 s.setsockopt(zmq.TCP_KEEPALIVE_CNT, 3) # restart after 3 fails 232 233 s.connect(ENDPOINT) 234 235 for topic in TOPICS: 236 s.setsockopt_string(zmq.SUBSCRIBE, topic) 237 238 poller = zmq.Poller() 239 poller.register(s, zmq.POLLIN) 240 241 while True: 242 log.debug('Polling...') 243 evts = poller.poll(10000) 244 if not evts: 245 continue 246 247 log.debug('Receiving...') 248 _, msg_bytes = s.recv_multipart() 249 msg = msg_bytes.decode('utf-8') 250 251 log.debug('Parsing...') 252 data = json.loads(msg) 253 254 log.info('Got topic: {}'.format(data['topic'])) 255 base_url = TOPICS.get(data['topic']) 256 if not base_url: 257 log.error('Unknown topic {} received. Continuing.') 258 continue 259 260 if re.match(r'^.*.pull-request.(new|rebased|updated)$', data['topic']): 261 event_info = event_info_from_pr(data, base_url) 262 elif re.match(r'^.*.pull-request.comment.added$', data['topic']): 263 event_info = event_info_from_pr_comment(data, base_url) 264 else: 265 event_info = event_info_from_push(data, base_url) 266 267 log.info('event_info = {}'.format(pp.pformat(event_info))) 268 269 if not event_info: 270 log.info('Received event was discarded. Continuing.') 271 continue 272 273 candidates = ScmPackage.get_candidates_for_rebuild(event_info.base_clone_url) 274 changed_files = set() 275 276 if candidates: 277 raw_commit_url = base_url + event_info.project_url_path + '/raw/' + event_info.start_commit 278 raw_commit_text = get_repeatedly(raw_commit_url) 279 changed_files |= helpers.raw_commit_changes(raw_commit_text) 280 281 if event_info.start_commit != event_info.end_commit: 282 # we want to show changes in start_commit + diff 283 # start_commit..end_commit 284 change_html_url = '{base_url}{project}/c/{start}..{end}'.format( 285 base_url=base_url, 286 project=event_info.project_url_path, 287 start=event_info.start_commit, 288 end=event_info.end_commit) 289 290 change_html_text = get_repeatedly(change_html_url) 291 changed_files |= helpers.pagure_html_diff_changed(change_html_text) 292 293 log.info("changed files: {}".format(", ".join(changed_files))) 294 295 for pkg in candidates: 296 package = '{}/{}(id={})'.format( 297 pkg.package.copr.full_name, 298 pkg.package.name, 299 pkg.package.id 300 ) 301 log.info('Considering pkg package: {}, source_json: {}' 302 .format(package, pkg.source_json_dict)) 303 304 if (git_compare_urls(pkg.clone_url, event_info.base_clone_url) 305 and (not pkg.committish or event_info.branch_to.endswith(pkg.committish)) 306 and pkg.is_dir_in_commit(changed_files)): 307 308 log.info('\t -> accepted.') 309 310 if event_info.object_type == 'pull-request': 311 dirname = pkg.copr.name + ':pr:' + str(event_info.object_id) 312 copr_dir = CoprDirsLogic.get_or_create(pkg.copr, dirname) 313 update_callback = 'pagure_flag_pull_request' 314 scm_object_url = os.path.join(base_url, event_info.project_url_path, 315 'c', str(event_info.end_commit)) 316 else: 317 copr_dir = pkg.copr.main_dir 318 update_callback = 'pagure_flag_commit' 319 scm_object_url = os.path.join(base_url, event_info.base_project_url_path, 320 'c', str(event_info.object_id)) 321 322 if not git_compare_urls(pkg.copr.scm_repo_url, event_info.base_clone_url): 323 update_callback = '' 324 325 source_dict_update = { 326 'clone_url': event_info.clone_url, 327 'committish': event_info.end_commit, 328 } 329 330 try: 331 build = pkg.build( 332 source_dict_update, 333 copr_dir, 334 update_callback, 335 event_info.object_type, 336 event_info.object_id, 337 scm_object_url 338 ) 339 if build: 340 log.info('\t -> {}'.format(build.to_dict())) 341 except Exception as e: 342 log.error(str(e)) 343 db.session.rollback() 344 else: 345 db.session.commit() 346 else: 347 log.info('\t -> skipping.')
348 349 350 if __name__ == '__main__': 351 while True: 352 try: 353 build_on_fedmsg_loop() 354 except KeyboardInterrupt: 355 sys.exit(1) 356 except: 357 log.exception('Error in fedmsg loop. Restarting it.') 358