Package flumotion :: Package worker :: Module base
[hide private]

Source Code for Module flumotion.worker.base

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  worker-side objects to handle worker clients 
 20  """ 
 21   
 22  import os 
 23  import sys 
 24  import signal 
 25   
 26  from twisted.cred import portal 
 27  from twisted.internet import defer, reactor 
 28  from twisted.spread import pb 
 29  from zope.interface import implements 
 30   
 31  from flumotion.common import errors, log 
 32  from flumotion.common import worker, startset 
 33  from flumotion.common.process import signalPid 
 34  from flumotion.twisted import checkers, fdserver 
 35  from flumotion.twisted import pb as fpb 
 36   
 37  __version__ = "$Rev$" 
 38   
 39  JOB_SHUTDOWN_TIMEOUT = 5 
 40   
 41   
42 -def _getSocketPath():
43 # FIXME: there is mkstemp for sockets, so we have a small window 44 # here in which the socket could be created by something else 45 # I didn't succeed in preparing a socket file with that name either 46 47 # caller needs to delete name before using 48 import tempfile 49 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 50 os.close(fd) 51 52 return name
53 54
55 -class JobInfo(object):
56 """ 57 I hold information about a job. 58 59 @cvar pid: PID of the child process 60 @type pid: int 61 @cvar avatarId: avatar identification string 62 @type avatarId: str 63 @cvar type: type of the component to create 64 @type type: str 65 @cvar moduleName: name of the module to create the component from 66 @type moduleName: str 67 @cvar methodName: the factory method to use to create the component 68 @type methodName: str 69 @cvar nice: the nice level to run the job as 70 @type nice: int 71 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 72 create the component 73 @type bundles: list of (str, str) 74 """ 75 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 76 'nice', 'bundles') 77
78 - def __init__(self, pid, avatarId, type, moduleName, methodName, nice, 79 bundles):
80 self.pid = pid 81 self.avatarId = avatarId 82 self.type = type 83 self.moduleName = moduleName 84 self.methodName = methodName 85 self.nice = nice 86 self.bundles = bundles
87 88
89 -class JobProcessProtocol(worker.ProcessProtocol):
90
91 - def __init__(self, heaven, avatarId, startSet):
92 self._startSet = startSet 93 self._deferredStart = startSet.createRegistered(avatarId) 94 worker.ProcessProtocol.__init__(self, heaven, avatarId, 95 'component', 96 heaven.getWorkerName())
97
98 - def sendMessage(self, message):
99 heaven = self.loggable 100 heaven.brain.callRemote('componentAddMessage', self.avatarId, 101 message)
102
103 - def processEnded(self, status):
104 heaven = self.loggable 105 dstarts = self._startSet 106 signum = status.value.signal 107 108 # we need to trigger a failure on the create deferred 109 # if the job failed before logging in to the worker; 110 # otherwise the manager still thinks it's starting up when it's 111 # dead. If the job already attached to the worker however, 112 # the create deferred will already have callbacked. 113 deferred = dstarts.createRegistered(self.avatarId) 114 if deferred is self._deferredStart: 115 if signum: 116 reason = "received signal %d" % signum 117 else: 118 reason = "unknown reason" 119 text = ("Component '%s' has exited early (%s)." % 120 (self.avatarId, reason)) 121 dstarts.createFailed(self.avatarId, 122 errors.ComponentCreateError(text)) 123 124 if dstarts.shutdownRegistered(self.avatarId): 125 dstarts.shutdownSuccess(self.avatarId) 126 127 heaven.jobStopped(self.pid) 128 129 # chain up 130 worker.ProcessProtocol.processEnded(self, status)
131 132
133 -class BaseJobHeaven(pb.Root, log.Loggable):
134 """ 135 I am similar to but not quite the same as a manager-side Heaven. 136 I manage avatars inside the worker for job processes spawned by the worker. 137 138 @ivar avatars: dict of avatarId -> avatar 139 @type avatars: dict of str -> L{base.BaseJobAvatar} 140 @ivar brain: the worker brain 141 @type brain: L{worker.WorkerBrain} 142 """ 143 144 logCategory = "job-heaven" 145 implements(portal.IRealm) 146 147 avatarClass = None 148
149 - def __init__(self, brain):
150 """ 151 @param brain: a reference to the worker brain 152 @type brain: L{worker.WorkerBrain} 153 """ 154 self.avatars = {} # componentId -> avatar 155 self.brain = brain 156 self._socketPath = _getSocketPath() 157 self._port = None 158 self._onShutdown = None # If set, a deferred to fire when 159 # our last child process exits 160 161 self._jobInfos = {} # processid -> JobInfo 162 163 self._startSet = startset.StartSet( 164 lambda x: x in self.avatars, 165 errors.ComponentAlreadyStartingError, 166 errors.ComponentAlreadyRunningError)
167
168 - def listen(self):
169 assert self._port is None 170 assert self.avatarClass is not None 171 # FIXME: we should hand a username and password to log in with to 172 # the job process instead of allowing anonymous 173 checker = checkers.FlexibleCredentialsChecker() 174 checker.allowPasswordless(True) 175 p = portal.Portal(self, [checker]) 176 f = pb.PBServerFactory(p) 177 try: 178 os.unlink(self._socketPath) 179 except OSError: 180 pass 181 182 # Rather than a listenUNIX(), we use listenWith so that we can specify 183 # our particular Port, which creates Transports that we know how to 184 # pass FDs over. 185 self.debug("Listening for FD's on unix socket %s", self._socketPath) 186 187 # listenWith is deprecated but the function never did much anyway 188 # 189 # port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 190 port = fdserver.FDPort(self._socketPath, f, reactor=reactor) 191 port.startListening() 192 193 self._port = port
194 195 ### portal.IRealm method 196
197 - def requestAvatar(self, avatarId, mind, *interfaces):
198 if pb.IPerspective in interfaces: 199 avatar = self.avatarClass(self, avatarId, mind) 200 assert avatarId not in self.avatars 201 self.avatars[avatarId] = avatar 202 return pb.IPerspective, avatar, avatar.logout 203 else: 204 raise NotImplementedError("no interface")
205
206 - def removeAvatar(self, avatarId):
207 if avatarId in self.avatars: 208 del self.avatars[avatarId] 209 else: 210 self.warning("some programmer is telling me about an avatar " 211 "I have no idea about: %s", avatarId)
212
213 - def getWorkerName(self):
214 """ 215 Gets the name of the worker that spawns the process. 216 217 @rtype: str 218 """ 219 return self.brain.workerName
220
221 - def addJobInfo(self, processId, jobInfo):
222 self._jobInfos[processId] = jobInfo
223
224 - def getJobInfo(self, processId):
225 return self._jobInfos[processId]
226
227 - def getJobInfos(self):
228 return self._jobInfos.values()
229
230 - def getJobPids(self):
231 return self._jobInfos.keys()
232
233 - def rotateChildLogFDs(self):
234 self.debug('telling kids about new log file descriptors') 235 for avatar in self.avatars.values(): 236 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
237
238 - def jobStopped(self, pid):
239 if pid in self._jobInfos: 240 self.debug('Removing job info for %d', pid) 241 del self._jobInfos[pid] 242 243 if not self._jobInfos and self._onShutdown: 244 self.debug("Last child exited") 245 self._onShutdown.callback(None) 246 else: 247 self.warning("some programmer is telling me about a pid " 248 "I have no idea about: %d", pid)
249
250 - def shutdown(self):
251 self.debug('Shutting down JobHeaven') 252 self.debug('Stopping all jobs') 253 for avatar in self.avatars.values(): 254 avatar.stop() 255 256 if self.avatars: 257 # If our jobs fail to shut down nicely within some period of 258 # time, shut them down less nicely 259 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 260 261 def cancelDelayedCall(res, dc): 262 # be nice to unit tests 263 if dc.active(): 264 dc.cancel() 265 return res
266 267 self._onShutdown = defer.Deferred() 268 self._onShutdown.addCallback(cancelDelayedCall, dc) 269 ret = self._onShutdown 270 else: 271 # everything's gone already, return success 272 ret = defer.succeed(None) 273 274 def stopListening(_): 275 # possible for it to be None, if we haven't been told to 276 # listen yet, as in some test cases 277 if self._port: 278 port = self._port 279 self._port = None 280 return port.stopListening()
281 ret.addCallback(stopListening) 282 return ret 283
284 - def kill(self, signum=signal.SIGKILL):
285 self.warning("Killing all children immediately") 286 for pid in self.getJobPids(): 287 self.killJobByPid(pid, signum)
288
289 - def killJobByPid(self, pid, signum):
290 if pid not in self._jobInfos: 291 raise errors.UnknownComponentError(pid) 292 293 jobInfo = self._jobInfos[pid] 294 self.debug("Sending signal %d to job %s at pid %d", signum, 295 jobInfo.avatarId, jobInfo.pid) 296 signalPid(jobInfo.pid, signum)
297
298 - def killJob(self, avatarId, signum):
299 for job in self._jobInfos.values(): 300 if job.avatarId == avatarId: 301 self.killJobByPid(job.pid, signum)
302 303
304 -class BaseJobAvatar(fpb.Avatar, log.Loggable):
305 """ 306 I am an avatar for the job living in the worker. 307 """ 308 logCategory = 'job-avatar' 309
310 - def __init__(self, heaven, avatarId, mind):
311 """ 312 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 313 @type avatarId: str 314 """ 315 fpb.Avatar.__init__(self, avatarId) 316 self._heaven = heaven 317 self.setMind(mind) 318 self.pid = None
319
320 - def setMind(self, mind):
321 """ 322 @param mind: reference to the job's JobMedium on which we can call 323 @type mind: L{twisted.spread.pb.RemoteReference} 324 """ 325 fpb.Avatar.setMind(self, mind) 326 self.haveMind()
327
328 - def haveMind(self):
329 # implement me in subclasses 330 pass
331
332 - def logout(self):
333 self.log('logout called, %s disconnected', self.avatarId) 334 335 self._heaven.removeAvatar(self.avatarId)
336
337 - def stop(self):
338 """ 339 returns: a deferred marking completed stop. 340 """ 341 raise NotImplementedError
342
343 - def _sendFileDescriptor(self, fd, message):
344 try: 345 # FIXME: pay attention to the return value of 346 # sendFileDescriptor; is the same as the return value of 347 # sendmsg(2) 348 self.mind.broker.transport.sendFileDescriptor(fd, message) 349 return True 350 except OSError, e: 351 # OSError is what is thrown by the C code doing this 352 # when there are issues 353 self.warning("Error %s sending file descriptors", 354 log.getExceptionMessage(e)) 355 return False
356
357 - def logTo(self, stdout, stderr):
358 """ 359 Tell the job to log to the given file descriptors. 360 """ 361 self.debug('Giving job new stdout and stderr') 362 if self.mind: 363 self._sendFileDescriptor(stdout, "redirectStdout") 364 self._sendFileDescriptor(stdout, "redirectStderr")
365