Package flumotion :: Package component :: Package producers :: Package icecast :: Module icecast
[hide private]

Source Code for Module flumotion.component.producers.icecast.icecast

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gst 
 19  from twisted.internet import defer 
 20  from flumotion.component import feedcomponent 
 21  from flumotion.twisted.defer import RetryingDeferred 
 22  from flumotion.common import errors 
 23   
 24  __version__ = "$Rev$" 
 25   
 26   
27 -class Icecast(feedcomponent.ParseLaunchComponent):
28 29 configured = False 30
31 - def get_pipeline_string(self, properties):
32 return "souphttpsrc name=src ! typefind name=tf"
33
34 - def _typefind_have_caps_cb(self, tf, prob, caps):
35 # Basing on the cappabilities plug additional gst compoponents: 36 # 1. If we have pure audio (http src doesn't support ICY) plug parser 37 # 2. If we have application/x-icy plug the icydemuxer and than parser 38 capsname = caps[0].get_name() 39 tf_src_pad = tf.get_pad('src') 40 gdp_sink_pad = tf_src_pad.get_peer() 41 # unlink the typefind from the gdp pad so that we can put another 42 # component in it's place 43 tf_src_pad.unlink(gdp_sink_pad) 44 45 if capsname == 'application/x-icy': 46 demuxer = gst.element_factory_make("icydemux") 47 demuxer.set_state(gst.STATE_PLAYING) 48 self._demuxer_name = demuxer.get_name() 49 self.pipeline.add(demuxer) 50 tf.link(demuxer) 51 # demuxer src pad is dynamic, we need to register a callback 52 demuxer.connect('pad-added', self._link_parser, gdp_sink_pad) 53 else: 54 self._demuxer_name = None 55 self._link_parser(tf, tf_src_pad, gdp_sink_pad)
56 89
90 - def configure_pipeline(self, pipeline, properties):
91 # Later, when the typefind element has successfully found the type 92 # of the data, we'll rebuild the pipeline. 93 self.src = pipeline.get_by_name('src') 94 self.url = properties['url'] 95 self.passthrough = properties.get('passthrough', False) 96 self.src.set_property('location', self.url) 97 self.src.set_property('iradio-mode', True) 98 99 typefind = pipeline.get_by_name('tf') 100 self.signal_id = typefind.connect('have-type',\ 101 self._typefind_have_caps_cb) 102 103 if not self.configured: 104 self.attachPadMonitorToElement('src', 105 self._src_connected, 106 self._src_disconnected) 107 self.reconnecting = False 108 self.reconnector = RetryingDeferred(self.connect) 109 self.reconnector.initialDelay = 1.0 110 self.reconnector.maxDelay = 300 111 self.attemptD = None 112 113 def _drop_eos(pad, event): 114 self.debug('Swallowing event %r', event) 115 if event.type == gst.EVENT_EOS: 116 return False 117 return True
118 self.configured = True 119 self.src.get_pad('src').add_event_probe(_drop_eos)
120
121 - def bus_message_received_cb(self, bus, message):
122 if message.type == gst.MESSAGE_ERROR and message.src == self.src: 123 gerror, debug = message.parse_error() 124 self.warning('element %s error %s %s', 125 message.src.get_path_string(), gerror, debug) 126 if self.reconnecting: 127 self._retry() 128 return True 129 feedcomponent.ParseLaunchComponent.bus_message_received_cb( 130 self, bus, message)
131
132 - def connect(self):
133 self.info('Connecting to icecast server on %s', self.url) 134 self.src.set_state(gst.STATE_READY) 135 # can't just self.src.set_state(gst.STATE_PLAYING), 136 # because the pipeline might NOT be in PLAYING, 137 # if we never connected to Icecast and never went to PLAYING 138 self.try_start_pipeline(force=True) 139 self.attemptD = defer.Deferred() 140 return self.attemptD
141
142 - def _src_connected(self, name):
143 self.info('Connected to icecast server on %s', self.url) 144 if self.reconnecting: 145 assert self.attemptD 146 self.attemptD.callback(None) 147 self.reconnecting = False
148
149 - def _reset(self, pad):
150 # remove all the elements downstream souphttpsrc. 151 if not self._parser_name: 152 self.reconnecting = True 153 self.reconnector.start() 154 return 155 156 tf = self.get_element('tf') 157 pad.unlink(tf.get_pad('sink')) 158 159 parser = self.get_element(self._parser_name) 160 tf.get_pad('src').unlink(parser.get_pad('sink')) 161 peer = parser.get_pad('src').get_peer() 162 parser.get_pad('src').unlink(peer) 163 164 parser.set_state(gst.STATE_NULL) 165 self.pipeline.remove(parser) 166 self._parser_name = None 167 tf.set_state(gst.STATE_NULL) 168 self.pipeline.remove(tf) 169 if self._demuxer_name is not None: 170 demuxer = self.get_element(self._demuxer_name) 171 demuxer.set_state(gst.STATE_NULL) 172 self.pipeline.remove(demuxer) 173 self._demuxer_name = None 174 175 # recreate the typefind element in order to be in the same state as 176 # when the component was first initiated 177 tf = gst.element_factory_make('typefind', 'tf') 178 self.pipeline.add(tf) 179 tf.set_state(gst.STATE_PLAYING) 180 pad.link(tf.get_pad('sink')) 181 tf.get_pad('src').link(peer) 182 183 # reconfigure the pipeline 184 self.configure_pipeline(self.pipeline, self.config['properties']) 185 self.pipeline.set_state(gst.STATE_PLAYING) 186 self.reconnecting = True 187 self.reconnector.start()
188
189 - def _src_disconnected(self, name):
190 self.info('Disconnected from icecast server on %s', self.url) 191 if not self.reconnecting: 192 src = self.get_element('src') 193 pad = src.get_pad('src') 194 self._reset(pad)
195
196 - def _retry(self):
197 assert self.attemptD 198 if not self.attemptD.called: 199 self.debug('Retrying connection to icecast server on %s', self.url) 200 self.attemptD.errback(errors.ConnectionError)
201