import sys import gst from Togra import GstTexture from gtk import gdk import gobject import time cube_texs = {} out_pipe = None toplevels = [] # For each media file we want to play, we create a GstThread containing: # gnomevfssrc ! decodebin and iterate it. As it generates pads we do: # for audio, append audioconvert ! audioscale ! audio/x-raw-int,rate=48000,width=16,channels=2 ! queue # for video, append a queue # As soon as we have both a audio and video pad, or no-more-pads or either queue fills, we finish the preroll # state. At this stage, we attach the audio queue to a volume control and the adder in the audio output thread (PAUSED) # We attach the video queue to ffmpegcolorspace ! videoscale ! tograsink (PAUSED) # Once all the CubeTex's are out of the preroll state, we can kick all the outputs into the PLAYING state at the same # time and achieve clock sync. # Class to manage a togra sink class CubeTex: READY, PREROLLING, PLAYING, ERRORED = range(4) def __init__ (self, toplevel, location, audio_out, next): global toplevels self.location = location self.audio_out = audio_out self.texture = GstTexture(256, 256) self.vol = None self.toplevel = toplevel self.next = next self.state = CubeTex.READY self.vid_q = None self.aud_q = None self.pipe = gst.element_factory_make ("thread") toplevels.append (self.pipe) src = gst.element_factory_make ("gnomevfssrc") dec = gst.element_factory_make ("decodebin") self.pipe.add_many (src, dec) src.link (dec) self.dec = dec self.src = src src.set_property("location", location) self.preroll_sigs = [] self.preroll_sigs.append ((dec, dec.connect ('new-decoded-pad', self.handle_new_pad))) self.preroll_sigs.append ((dec, dec.connect ('no-more-pads', self.finish_preroll))) self.eos_id = self.pipe.connect ('eos', self.handle_eos) self.errsig_id = self.pipe.connect ('error', self.handle_error) self.vid_pipeline = gst.parse_launch("{ ffmpegcolorspace name=in ! videoscale name=out }") out_elem = self.vid_pipeline.get_by_name("out") sink = self.texture.getSink() sink_bin = out_elem.get_managing_bin(); sink_bin.add (sink) out_elem.link(sink) # Prepare audio output elements self.vol = gst.element_factory_make ("volume") self.change_vol(0.0) self.aud_bin = gst.element_factory_make ("bin") ac = gst.element_factory_make ("audioconvert") scale = gst.element_factory_make ("audioscale") self.aud_bin.add_many (ac, scale) ac.link (scale) self.aud_bin.add_ghost_pad (ac.get_pad ("sink"), "sink") self.aud_bin.add_ghost_pad (scale.get_pad ("src"), "src") def start(self): if self.state == CubeTex.READY: print "Starting", self.location if self.pipe.set_state (gst.STATE_PLAYING) == gst.STATE_ASYNC: self.pipe.wait_state_change() self.state = CubeTex.PREROLLING def finish_preroll (self, element = None): # Disconnect preroll signal handlers # attach the audio queue to a volume control and the adder in the audio output thread (PAUSED) # We attach the video queue to { ffmpegcolorspace ! videoscale ! tograsink } (PAUSED) for sig in self.preroll_sigs: sig[0].disconnect (sig[1]) self.preroll_sigs = [] # Don't bother with either if we don't have video pad if self.vid_q: if self.aud_q and self.audio_out: if self.vol.set_state (gst.STATE_READY) == gst.STATE_ASYNC: self.vol.wait_state_change() self.audio_out[0].add (self.vol) self.aud_q.link (self.vol) sinkpad = self.audio_out[1].get_request_pad ("sink%d") self.vol.get_pad ("src").link (sinkpad) pipeline = self.vid_pipeline in_elem = pipeline.get_by_name("in") if pipeline.set_state (gst.STATE_READY) == gst.STATE_ASYNC: pipeline.wait_state_change() pipeline.sync_children_state() self.vid_q.link (in_elem) self.toplevel.add (pipeline) self.state = CubeTex.PLAYING self.commit() def commit (self): # Remove from the set of prerolling elements # and if none left, start the sinks. if self.next: self.next.start() gobject.idle_add (self.sync_out_pipe, False) else: # print "Ready to go!" gobject.idle_add (self.sync_out_pipe, True) def sync_out_pipe (self, change_state): if change_state: time.sleep (0.5) if self.toplevel.set_state (gst.STATE_PLAYING) == gst.STATE_ASYNC: self.toplevel.wait_state_change() self.toplevel.sync_children_state() self.audio_out[0].sync_children_state() return False def handle_eos (self, element): # Do something clever - disconnect and restart the stream. # print self.location, "hit EOS" if self.pipe.set_state (gst.STATE_PAUSED) == gst.STATE_ASYNC: self.pipe.wait_state_change() self.disconnect_da_stuff() for sig in self.preroll_sigs: sig[0].disconnect (sig[1]) self.preroll_sigs = [] if self.pipe.set_state (gst.STATE_READY) == gst.STATE_ASYNC: self.pipe.wait_state_change() self.state = CubeTex.READY # Don't restart the stream # return print "Stream", self.location, "ended, restarting." dec = self.dec self.preroll_sigs.append ((dec, dec.connect ('new-decoded-pad', self.handle_new_pad))) self.preroll_sigs.append ((dec, dec.connect ('no-more-pads', self.finish_preroll))) if self.pipe.set_state (gst.STATE_PLAYING) == gst.STATE_ASYNC: self.pipe.wait_state_change() self.state = CubeTex.PREROLLING def handle_error (self, element, orig_elem, error, message): # If we made it to playing before erroring, restart the # stream since it was probably a disconnection if self.state == CubeTex.PLAYING: return self.handle_eos (element) # Disconnect ourselves from the pipeline since we can't play anything now anyway. self.disconnect_da_stuff() self.state = CubeTex.ERRORED self.commit() def disconnect_da_stuff (self): if self.vid_q: if self.state == CubeTex.PLAYING: pipeline = self.vid_pipeline in_elem = pipeline.get_by_name("in") self.vid_q.unlink (in_elem) self.toplevel.remove (pipeline) if pipeline.set_state (gst.STATE_NULL) == gst.STATE_ASYNC: pipeline.wait_state_change() pipeline.sync_children_state() self.dec.unlink (self.vid_q) self.pipe.remove (self.vid_q) self.vid_q = None if self.aud_q: if self.state == CubeTex.PLAYING: self.vol.unlink (self.audio_out[1]) self.audio_out[0].remove (self.vol) self.aud_q.unlink (self.vol) # Detach the audio elements from the decodebin pad = self.aud_bin.get_pad ("sink") peer = pad.get_peer() if peer: peer.unlink (pad) # Detach the audio elements from the output queue self.aud_bin.get_pad ("src").unlink (self.aud_q.get_pad("sink")) self.pipe.remove (self.aud_bin) self.pipe.remove (self.aud_q) self.aud_q = None def handle_new_pad (self, element, pad, more_pads): # decodebin has produced a new pad. # for audio, append audioconvert ! audioscale ! audio/x-raw-int,rate=48000,width=16,channels=2 ! queue # for video, append a queue s = pad.get_caps() mime_type = s.get_structure(0).get_name() # print self.location, "new pad of type", mime_type if mime_type.startswith("video/x-raw") and not self.vid_q: out_q = gst.element_factory_make ("queue") out_q.set_property('max-size-time', long (1.5 * gst.SECOND)) out_q.set_property('max-size-bytes', 15000000) out_q.set_property('max-size-buffers', 0) self.preroll_sigs.append ((out_q, out_q.connect ('overrun', self.finish_preroll))) self.pipe.add (out_q) sinkpad = out_q.get_pad ("sink") pad.link (sinkpad) self.vid_q = out_q elif mime_type.startswith("audio/x-raw") and self.audio_out and not self.aud_q: out_q = gst.element_factory_make ("queue") out_q.set_property('max-size-time', long (1.5 * gst.SECOND)) out_q.set_property('max-size-bytes', 10000000) out_q.set_property('max-size-buffers', 0) self.preroll_sigs.append ((out_q, out_q.connect ('overrun', self.finish_preroll))) self.pipe.add_many (self.aud_bin, out_q) caps = gst.caps_from_string ('audio/x-raw-int,width=16,channels=2,rate=48000') self.aud_bin.link_filtered (out_q, caps) qpad = self.aud_bin.get_pad ("sink") pad.link (qpad) self.aud_q = out_q else: pad.set_active (False) self.pipe.sync_children_state() if self.aud_q and self.vid_q: self.finish_preroll () def change_vol (self, gain): self.vol.set_property ("volume", gain) def get_texture_objects(locations, soundsink): global cube_texs, toplevels textures = [] top_pipe = gst.element_factory_make ("pipeline") toplevels.append (top_pipe) if soundsink: audio_pipe = gst.element_factory_make ("thread") audio_elem = gst.element_factory_make ("adder") scale = gst.element_factory_make ("audioscale") sink = gst.element_factory_make (soundsink) audio_pipe.add_many (audio_elem, scale, sink) caps = gst.caps_from_string ('audio/x-raw-int,width=16,channels=2,rate=48000') audio_elem.link_filtered (scale, caps) scale.link (sink) top_pipe.add (audio_pipe) audio_out = (audio_pipe, audio_elem) else: audio_out = None if top_pipe.set_state (gst.STATE_READY) == gst.STATE_ASYNC: top_pipe.wait_state_change() top_pipe.sync_children_state() next = None for location in locations: cube_tex = CubeTex (top_pipe, location, audio_out, next) cube_texs[cube_tex.texture] = cube_tex next = cube_tex textures.append (cube_tex.texture) next.start() return textures def change_res(texture, res): sink = texture.getSink() sink.set_property("width", int (res)) sink.set_property("height", int (res)) def change_vol(texture, gain): global cube_texs cube_tex = cube_texs[texture] cube_tex.change_vol (gain) def gst_dump(): global toplevels try: for dude in toplevels: dump_element (dude, 0) except e: print "exception" print_stack() def dump_bin (bin, indent): # Iterate the children for child in bin.get_list(): dump_element (child, indent + 2) def dump_element (element, indent): states = { 1: 'NULL', 2: 'READY', 4: 'PAUSED', 8: 'PLAYING' } state = 'UNKNOWN' try: state = states[element.get_state()] except KeyError: state = 'UNKNOWN (%d)' % element.get_state() c = element.get_clock() if c is None: clock_str = "clock - None" else: clock_str = "clock - %s" % (c.get_name()) out = "%s (%s): state %s, %s" % (element.get_name(), gobject.type_name (element.__gtype__), state, clock_str) print out.rjust(len(out) + indent) tmp = { True: 'active', False: 'inactive' } for curpad in element.get_pad_list(): if curpad.get_direction() == gst.PAD_SRC: if curpad.is_linked(): peer = curpad.get_peer() out = " - %s:%s (%s) => %s:%s (%s)" % ( curpad.get_parent().get_name(), curpad.get_name(), tmp[curpad.is_active()], peer.get_parent().get_name(), peer.get_name(), tmp[peer.is_active()]) print out.rjust(len(out) + indent) if isinstance (element, gst.Bin): dump_bin (element, indent + 2) elif isinstance (element, gst.Queue): out = " - time_level: %ld" % (element.get_property('current-level-time')) print out.rjust(len(out) + indent) out = " - bytes_level: %ld" % (element.get_property('current-level-bytes')) print out.rjust(len(out) + indent)