1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gst
23 import gobject
24 import threading
25
26 from flumotion.component import decodercomponent as dc
27 from flumotion.common import messages
28 from flumotion.common.i18n import N_, gettexter
29
30 T_ = gettexter()
31
32 __version__ = "$Rev: 7162 $"
33
34 BASIC_AUDIO_CAPS = "audio/x-raw-int;audio/x-raw-float"
35 BASIC_VIDEO_CAPS = "video/x-raw-yuv;video/x-raw-rgb"
36
37
38
39
40 GST_AUTOPLUG_SELECT_TRY = 0
41 GST_AUTOPLUG_SELECT_SKIP = 2
42
43
45
46 - def __init__(self, name, caps, linked=False):
49
50
52 __gstdetails__ = ('SyncKeeper', 'Generic',
53 'Retimestamp the output to be contiguous and maintain '
54 'the sync', 'Xavier Queralt')
55 _audiosink = gst.PadTemplate("audio-in",
56 gst.PAD_SINK,
57 gst.PAD_ALWAYS,
58 gst.caps_from_string(BASIC_AUDIO_CAPS))
59 _videosink = gst.PadTemplate("video-in",
60 gst.PAD_SINK,
61 gst.PAD_ALWAYS,
62 gst.caps_from_string(BASIC_VIDEO_CAPS))
63 _audiosrc = gst.PadTemplate("audio-out",
64 gst.PAD_SRC,
65 gst.PAD_ALWAYS,
66 gst.caps_from_string(BASIC_AUDIO_CAPS))
67 _videosrc = gst.PadTemplate("video-out",
68 gst.PAD_SRC,
69 gst.PAD_ALWAYS,
70 gst.caps_from_string(BASIC_VIDEO_CAPS))
71
73 gst.Element.__init__(self)
74
75
76 self.audiosrc = gst.Pad(self._audiosrc, "audio-out")
77 self.add_pad(self.audiosrc)
78 self.videosrc = gst.Pad(self._videosrc, "video-out")
79 self.add_pad(self.videosrc)
80
81
82 self.audiosink = gst.Pad(self._audiosink, "audio-in")
83 self.audiosink.set_chain_function(lambda pad, buffer:
84 self.chainfunc(pad, buffer, self.audiosrc))
85 self.audiosink.set_event_function(lambda pad, buffer:
86 self.eventfunc(pad, buffer, self.audiosrc))
87 self.add_pad(self.audiosink)
88 self.videosink = gst.Pad(self._videosink, "video-in")
89 self.videosink.set_chain_function(lambda pad, buffer:
90 self.chainfunc(pad, buffer, self.videosrc))
91 self.videosink.set_event_function(lambda pad, buffer:
92 self.eventfunc(pad, buffer, self.videosrc))
93 self.add_pad(self.videosink)
94
95
96 self._lock = threading.Lock()
97 self._totalTime = 0L
98 self._syncTimestamp = 0L
99 self._syncOffset = 0L
100 self._resetReceived = True
101 self._sendNewSegment = True
102
104 for pad in [self.videosrc, self.audiosrc]:
105 pad.push_event(
106 gst.event_new_new_segment(True, 1.0, gst.FORMAT_TIME,
107 self._syncTimestamp, -1, 0))
108 self._sendNewSegment = False
109
111
112
113 if not self._totalTime and not self._resetReceived:
114 return
115 self._syncTimestamp = self._totalTime
116 self._syncOffset = start + (start - position)
117 self._resetReceived = False
118 self.info("Update sync point to % r, offset to %r" %
119 (gst.TIME_ARGS(self._syncTimestamp),
120 (gst.TIME_ARGS(self._syncOffset))))
121
123 self.log("Input %s timestamp: %s, %s" %
124 (srcpad is self.audiosrc and 'audio' or 'video',
125 gst.TIME_ARGS(buf.timestamp),
126 gst.TIME_ARGS(buf.duration)))
127
128 if not self._sendNewSegment:
129 self._send_new_segment()
130
131 try:
132 self._lock.acquire()
133 try:
134 buf.timestamp += self._syncTimestamp - self._syncOffset
135 except TypeError:
136 buf.timestamp = 0
137 dur = buf.duration != gst.CLOCK_TIME_NONE and buf.duration or 0
138 self._totalTime = max(buf.timestamp + dur, self._totalTime)
139
140 self.log("Output %s timestamp: %s, %s" %
141 (srcpad is self.audiosrc and 'audio' or 'video',
142 gst.TIME_ARGS(buf.timestamp),
143 gst.TIME_ARGS(buf.duration)))
144 finally:
145 self._lock.release()
146
147 srcpad.push(buf)
148 return gst.FLOW_OK
149
151 self.debug("Received event %r from %s" % (event, event.src))
152 try:
153 self._lock.acquire()
154 if event.type == gst.EVENT_NEWSEGMENT:
155 u, r, f, start, s, position = event.parse_new_segment()
156 self._update_sync_point(start, position)
157 if event.get_structure().get_name() == 'flumotion-reset':
158 self._resetReceived = True
159 self._send_new_segment = True
160 finally:
161 self._lock.release()
162
163
164 if event.type != gst.EVENT_NEWSEGMENT:
165 return srcpad.push_event(event)
166 return True
167
168 gobject.type_register(SyncKeeper)
169 gst.element_register(SyncKeeper, "synckeeper", gst.RANK_MARGINAL)
170
171
173 """
174 Generic decoder component using decodebin2.
175
176 It listen to the custom gstreamer event flumotion-reset,
177 and reset the decoding pipeline by removing the old one
178 and creating a new one.
179
180 Sub-classes must override _get_feeders_info() and return
181 a list of FeederInfo instances that describe the decoder
182 output.
183
184 When reset, if the new decoded pads do not match the
185 previously negotiated caps, feeder will not be connected,
186 and the decoder will go sad.
187 """
188
189 logCategory = "gen-decoder"
190 feeder_tmpl = ("identity name=%(ename)s single-segment=true "
191 "silent=true ! %(caps)s ! @feeder:%(pad)s@")
192
193
194
197
217
224
225
226
228 return 'decodebin2 name=decoder'
229
231 """
232 Must be overridden to returns a tuple of FeederInfo.
233 """
234 return None
235
236
237
239 return "%s-output" % feed_name
240
241
242
249
250
273
274
276
277 logCategory = "avgen-decoder"
278 feeder_tmpl = ("identity name=%(ename)s silent=true ! %(caps)s ! "
279 "sync.%(pad)s-in sync.%(pad)s-out ! @feeder:%(pad)s@")
280
284
286 return 'decodebin2 name=decoder synckeeper name=sync'
287