1
2
3 import json
4 import pprint
5 import zmq
6 import sys
7 import os
8 import logging
9 import requests
10 import re
11 import munch
12
13 sys.path.append(
14 os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
15 )
16
17 from coprs import db, app, models
18 from coprs.logic.coprs_logic import CoprDirsLogic
19 from coprs.logic.builds_logic import BuildsLogic
20 from coprs.logic.complex_logic import ComplexLogic
21 from coprs.logic.packages_logic import PackagesLogic
22 from coprs import helpers
23
24 from urllib.parse import urlparse
25
26 SCM_SOURCE_TYPE = helpers.BuildSourceEnum("scm")
27
28 logging.basicConfig(
29 filename='{0}/pagure-events.log'.format(app.config.get('LOG_DIR')),
30 format='[%(asctime)s][%(levelname)6s]: %(message)s',
31 level=logging.DEBUG)
32
33 log = logging.getLogger(__name__)
34 log.addHandler(logging.StreamHandler(sys.stdout))
35
36 if os.getenv('PAGURE_EVENTS_TESTONLY'):
37 ENDPOINT = 'tcp://stg.pagure.io:9940'
38 else:
39 ENDPOINT = 'tcp://hub.fedoraproject.org:9940'
40
41 log.info("ENDPOINT = {}".format(ENDPOINT))
42
43 pagure_instances = {
44 'https://pagure.io/': 'io.pagure.prod.pagure',
45 'https://src.fedoraproject.org/': 'org.fedoraproject.prod.pagure',
46 'https://stg.pagure.io/': 'io.pagure.stg.pagure',
47 }
48
49 topics = [
50 'git.receive',
51 'pull-request.new',
52 'pull-request.rebased',
53 'pull-request.updated',
54 'pull-request.comment.added',
55 ]
56
57 TOPICS = {}
58 for url, fedmsg_prefix in pagure_instances.items():
59 for topic in topics:
60 TOPICS['{0}.{1}'.format(fedmsg_prefix, topic)] = url
63 log.info("getting url {}".format(url))
64 for attempt in range(1, 4):
65 r = requests.get(url)
66 if r.status_code == requests.codes.ok:
67 return r.text
68 else:
69 log.error('Bad http status {0} from url {1}, attempt {2}'.format(
70 r.status_code, url, attempt))
71
72 return ""
73
83
84 - def build(self, source_dict_update, copr_dir, update_callback,
85 scm_object_type, scm_object_id, scm_object_url):
96
97 @classmethod
99 if db.engine.url.drivername == 'sqlite':
100 placeholder = '?'
101 true = '1'
102 else:
103 placeholder = '%s'
104 true = 'true'
105
106 rows = db.engine.execute(
107 """
108 SELECT package.id AS package_id, package.source_json AS source_json, package.copr_id AS copr_id
109 FROM package JOIN copr_dir ON package.copr_dir_id = copr_dir.id
110 WHERE package.source_type = {0} AND
111 package.webhook_rebuild = {1} AND
112 copr_dir.main = {2} AND
113 package.source_json ILIKE {placeholder}
114 """.format(SCM_SOURCE_TYPE, true, true, placeholder=placeholder), '%'+clone_url+'%'
115 )
116 return [ScmPackage(row) for row in rows]
117
118
120 if not changed_files:
121 return False
122
123 sm = helpers.SubdirMatch(self.subdirectory)
124 for filename in changed_files:
125 if sm.match(filename):
126 return True
127
128 return False
129
167
170 """
171 Message handler for new pull-request opened in pagure.
172 Topic: ``*.pagure.pull-request.new``
173 """
174 return munch.Munch({
175 'object_id': data['msg']['pullrequest']['id'],
176 'object_type': 'pull-request',
177 'base_project_url_path': data['msg']['pullrequest']['project']['url_path'],
178 'base_clone_url_path': data['msg']['pullrequest']['project']['fullname'],
179 'base_clone_url': base_url + data['msg']['pullrequest']['project']['fullname'],
180 'project_url_path': data['msg']['pullrequest']['repo_from']['url_path'],
181 'clone_url_path': data['msg']['pullrequest']['repo_from']['fullname'],
182 'clone_url': base_url + data['msg']['pullrequest']['repo_from']['fullname'],
183 'branch_from': data['msg']['pullrequest']['branch_from'],
184 'branch_to': data['msg']['pullrequest']['branch'],
185 'start_commit': data['msg']['pullrequest']['commit_start'],
186 'end_commit': data['msg']['pullrequest']['commit_stop'],
187 })
188
191 """
192 Message handler for push event in pagure.
193 Topic: ``*.pagure.git.receive``
194 """
195 return munch.Munch({
196 'object_id': data['msg']['end_commit'],
197 'object_type': 'commit',
198 'base_project_url_path': data['msg']['repo']['url_path'],
199 'base_clone_url_path': data['msg']['repo']['fullname'],
200 'base_clone_url': base_url + data['msg']['repo']['fullname'],
201 'project_url_path': data['msg']['repo']['url_path'],
202 'clone_url_path': data['msg']['repo']['fullname'],
203 'clone_url': base_url + data['msg']['repo']['fullname'],
204 'branch_from': data['msg']['branch'],
205 'branch_to': data['msg']['branch'],
206 'start_commit': data['msg']['start_commit'],
207 'end_commit': data['msg']['end_commit'],
208 })
209
212 url1 = re.sub(r'(\.git)?/*$', '', str(url1))
213 url2 = re.sub(r'(\.git)?/*$', '', str(url2))
214 o1 = urlparse(url1)
215 o2 = urlparse(url2)
216 return (o1.netloc == o2.netloc and o1.path == o2.path)
217
220 log.debug("Setting up poller...")
221 pp = pprint.PrettyPrinter(width=120)
222
223 ctx = zmq.Context()
224 s = ctx.socket(zmq.SUB)
225
226
227
228 s.setsockopt(zmq.TCP_KEEPALIVE, 1)
229 s.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 30)
230 s.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 5)
231 s.setsockopt(zmq.TCP_KEEPALIVE_CNT, 3)
232
233 s.connect(ENDPOINT)
234
235 for topic in TOPICS:
236 s.setsockopt_string(zmq.SUBSCRIBE, topic)
237
238 poller = zmq.Poller()
239 poller.register(s, zmq.POLLIN)
240
241 while True:
242 log.debug('Polling...')
243 evts = poller.poll(10000)
244 if not evts:
245 continue
246
247 log.debug('Receiving...')
248 _, msg_bytes = s.recv_multipart()
249 msg = msg_bytes.decode('utf-8')
250
251 log.debug('Parsing...')
252 data = json.loads(msg)
253
254 log.info('Got topic: {}'.format(data['topic']))
255 base_url = TOPICS.get(data['topic'])
256 if not base_url:
257 log.error('Unknown topic {} received. Continuing.')
258 continue
259
260 if re.match(r'^.*.pull-request.(new|rebased|updated)$', data['topic']):
261 event_info = event_info_from_pr(data, base_url)
262 elif re.match(r'^.*.pull-request.comment.added$', data['topic']):
263 event_info = event_info_from_pr_comment(data, base_url)
264 else:
265 event_info = event_info_from_push(data, base_url)
266
267 log.info('event_info = {}'.format(pp.pformat(event_info)))
268
269 if not event_info:
270 log.info('Received event was discarded. Continuing.')
271 continue
272
273 candidates = ScmPackage.get_candidates_for_rebuild(event_info.base_clone_url)
274 changed_files = set()
275
276 if candidates:
277 raw_commit_url = base_url + event_info.project_url_path + '/raw/' + event_info.start_commit
278 raw_commit_text = get_repeatedly(raw_commit_url)
279 changed_files |= helpers.raw_commit_changes(raw_commit_text)
280
281 if event_info.start_commit != event_info.end_commit:
282
283
284 change_html_url = '{base_url}{project}/c/{start}..{end}'.format(
285 base_url=base_url,
286 project=event_info.project_url_path,
287 start=event_info.start_commit,
288 end=event_info.end_commit)
289
290 change_html_text = get_repeatedly(change_html_url)
291 changed_files |= helpers.pagure_html_diff_changed(change_html_text)
292
293 log.info("changed files: {}".format(", ".join(changed_files)))
294
295 for pkg in candidates:
296 package = '{}/{}(id={})'.format(
297 pkg.package.copr.full_name,
298 pkg.package.name,
299 pkg.package.id
300 )
301 log.info('Considering pkg package: {}, source_json: {}'
302 .format(package, pkg.source_json_dict))
303
304 if (git_compare_urls(pkg.clone_url, event_info.base_clone_url)
305 and (not pkg.committish or event_info.branch_to.endswith(pkg.committish))
306 and pkg.is_dir_in_commit(changed_files)):
307
308 log.info('\t -> accepted.')
309
310 if event_info.object_type == 'pull-request':
311 dirname = pkg.copr.name + ':pr:' + str(event_info.object_id)
312 copr_dir = CoprDirsLogic.get_or_create(pkg.copr, dirname)
313 update_callback = 'pagure_flag_pull_request'
314 scm_object_url = os.path.join(base_url, event_info.project_url_path,
315 'c', str(event_info.end_commit))
316 else:
317 copr_dir = pkg.copr.main_dir
318 update_callback = 'pagure_flag_commit'
319 scm_object_url = os.path.join(base_url, event_info.base_project_url_path,
320 'c', str(event_info.object_id))
321
322 if not git_compare_urls(pkg.copr.scm_repo_url, event_info.base_clone_url):
323 update_callback = ''
324
325 source_dict_update = {
326 'clone_url': event_info.clone_url,
327 'committish': event_info.end_commit,
328 }
329
330 try:
331 build = pkg.build(
332 source_dict_update,
333 copr_dir,
334 update_callback,
335 event_info.object_type,
336 event_info.object_id,
337 scm_object_url
338 )
339 if build:
340 log.info('\t -> {}'.format(build.to_dict()))
341 except Exception as e:
342 log.error(str(e))
343 db.session.rollback()
344 else:
345 db.session.commit()
346 else:
347 log.info('\t -> skipping.')
348
349
350 if __name__ == '__main__':
351 while True:
352 try:
353 build_on_fedmsg_loop()
354 except KeyboardInterrupt:
355 sys.exit(1)
356 except:
357 log.exception('Error in fedmsg loop. Restarting it.')
358