XRootD
Loading...
Searching...
No Matches
XrdThrottleFileSystemConfig.cc
Go to the documentation of this file.
1
2#include <fcntl.h>
3
5#include "XrdOuc/XrdOuca2x.hh"
6#include "XrdOuc/XrdOucEnv.hh"
8
11
12using namespace XrdThrottle;
13
14#define OFS_NAME "libXrdOfs.so"
15
16/*
17 * Note nothing in this file is thread-safe.
18 */
19
20static XrdSfsFileSystem *
21LoadFS(const std::string &fslib, XrdSysError &eDest, const std::string &config_file){
22 // Load the library
23 XrdSysPlugin ofsLib(&eDest, fslib.c_str(), "fslib", NULL);
25 if (fslib == OFS_NAME)
26 {
28 XrdSysLogger *lp,
29 const char *configfn,
30 XrdOucEnv *EnvInfo);
31
32 if (!(fs = XrdSfsGetDefaultFileSystem(0, eDest.logger(), config_file.c_str(), 0)))
33 {
34 eDest.Emsg("Config", "Unable to load OFS filesystem.");
35 }
36 }
37 else
38 {
39 XrdSfsFileSystem *(*ep)(XrdSfsFileSystem *, XrdSysLogger *, const char *);
40 if (!(ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *,XrdSysLogger *,const char *))
41 ofsLib.getPlugin("XrdSfsGetFileSystem")))
42 return NULL;
43 if (!(fs = (*ep)(0, eDest.logger(), config_file.c_str())))
44 {
45 eDest.Emsg("Config", "Unable to create file system object via", fslib.c_str());
46 return NULL;
47 }
48 }
49 ofsLib.Persist();
50
51 return fs;
52}
53
54namespace XrdThrottle {
57 XrdSysLogger *lp,
58 const char *configfn,
59 XrdOucEnv *envP)
60{
61 FileSystem* fs = NULL;
62 FileSystem::Initialize(fs, native_fs, lp, configfn, envP);
63 return fs;
64}
65}
66
67// Export the symbol necessary for this to be dynamically loaded.
68extern "C" {
71 XrdSysLogger *lp,
72 const char *configfn)
73{
74 return XrdSfsGetFileSystem_Internal(native_fs, lp, configfn, nullptr);
75}
76
79 XrdSysLogger *lp,
80 const char *configfn,
81 XrdOucEnv *envP)
82{
83 return XrdSfsGetFileSystem_Internal(native_fs, lp, configfn, envP);
84}
85}
86
89
90FileSystem* FileSystem::m_instance = 0;
91
92FileSystem::FileSystem()
93 : m_eroute(0), m_trace(&m_eroute), m_sfs_ptr(0), m_initialized(false), m_throttle(&m_eroute, &m_trace)
94{
95 myVersion = &XrdVERSIONINFOVAR(XrdSfsGetFileSystem);
96}
97
98FileSystem::~FileSystem() {}
99
100void
101FileSystem::Initialize(FileSystem *&fs,
102 XrdSfsFileSystem *native_fs,
103 XrdSysLogger *lp,
104 const char *configfn,
105 XrdOucEnv *envP)
106{
107 fs = NULL;
108 if (m_instance == NULL && !(m_instance = new FileSystem()))
109 {
110 return;
111 }
112 fs = m_instance;
113 if (!fs->m_initialized)
114 {
115 fs->m_config_file = configfn;
116 fs->m_eroute.logger(lp);
117 fs->m_eroute.Say("Initializing a Throttled file system.");
118 if (fs->Configure(fs->m_eroute, native_fs, envP))
119 {
120 fs->m_eroute.Say("Initialization of throttled file system failed.");
121 fs = NULL;
122 return;
123 }
124 fs->m_throttle.Init();
125 fs->m_initialized = true;
126 }
127}
128
129#define TS_Xeq(key, func) NoGo = (strcmp(key, var) == 0) ? func(Config) : 0
130int
131FileSystem::Configure(XrdSysError & log, XrdSfsFileSystem *native_fs, XrdOucEnv *envP)
132{
133 XrdOucEnv myEnv;
134 XrdOucStream Config(&m_eroute, getenv("XRDINSTANCE"), &myEnv, "(Throttle Config)> ");
135 int cfgFD;
136 if (m_config_file.length() == 0)
137 {
138 log.Say("No filename specified.");
139 return 1;
140 }
141 if ((cfgFD = open(m_config_file.c_str(), O_RDONLY)) < 0)
142 {
143 log.Emsg("Config", errno, "Unable to open configuration file", m_config_file.c_str());
144 return 1;
145 }
146 Config.Attach(cfgFD);
147 static const char *cvec[] = { "*** throttle (ofs) plugin config:", 0 };
148 Config.Capture(cvec);
149
150 std::string fslib = OFS_NAME;
151
152 char *var, *val;
153 int NoGo = 0;
154 while( (var = Config.GetMyFirstWord()) )
155 {
156 if (strcmp("throttle.fslib", var) == 0)
157 {
158 val = Config.GetWord();
159 if (!val || !val[0]) {log.Emsg("Config", "fslib not specified."); continue;}
160 fslib = val;
161 }
162 TS_Xeq("throttle.max_open_files", xmaxopen);
163 TS_Xeq("throttle.max_active_connections", xmaxconn);
164 TS_Xeq("throttle.throttle", xthrottle);
165 TS_Xeq("throttle.loadshed", xloadshed);
166 TS_Xeq("throttle.trace", xtrace);
167 if (NoGo)
168 {
169 log.Emsg("Config", "Throttle configuration failed.");
170 }
171 }
172
173 // Load the filesystem object.
174 m_sfs_ptr = native_fs ? native_fs : LoadFS(fslib, m_eroute, m_config_file);
175 if (!m_sfs_ptr) return 1;
176
177 // Overwrite the environment variable saying that throttling is the fslib.
178 XrdOucEnv::Export("XRDOFSLIB", fslib.c_str());
179
180 if (envP)
181 {
182 auto gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("Throttle.gStream*"));
183 log.Say("Config", "Throttle g-stream has", gstream ? "" : " NOT", " been configured via xrootd.mongstream directive");
184 m_throttle.SetMonitor(gstream);
185 }
186
187 // The Feature function is not a virtual but implemented by the base class to
188 // look at a protected member. Thus, to forward the call, we need to copy
189 // from the underlying filesystem
190 FeatureSet = m_sfs_ptr->Features();
191 return 0;
192}
193
194/******************************************************************************/
195/* x m a x o p e n */
196/******************************************************************************/
197
198/* Function: xmaxopen
199
200 Purpose: Parse the directive: throttle.max_open_files <limit>
201
202 <limit> maximum number of open file handles for a unique entity.
203
204 Output: 0 upon success or !0 upon failure.
205*/
206int
207FileSystem::xmaxopen(XrdOucStream &Config)
208{
209 auto val = Config.GetWord();
210 if (!val || val[0] == '\0')
211 {m_eroute.Emsg("Config", "Max open files not specified! Example usage: throttle.max_open_files 16000");}
212 long long max_open = -1;
213 if (XrdOuca2x::a2sz(m_eroute, "max open files value", val, &max_open, 1)) return 1;
214
215 m_throttle.SetMaxOpen(max_open);
216 return 0;
217}
218
219
220/******************************************************************************/
221/* x m a x c o n n */
222/******************************************************************************/
223
224/* Function: xmaxconn
225
226 Purpose: Parse the directive: throttle.max_active_connections <limit>
227
228 <limit> maximum number of connections with at least one open file for a given entity
229
230 Output: 0 upon success or !0 upon failure.
231*/
232int
233FileSystem::xmaxconn(XrdOucStream &Config)
234{
235 auto val = Config.GetWord();
236 if (!val || val[0] == '\0')
237 {m_eroute.Emsg("Config", "Max active cconnections not specified! Example usage: throttle.max_active_connections 4000");}
238 long long max_conn = -1;
239 if (XrdOuca2x::a2sz(m_eroute, "max active connections value", val, &max_conn, 1)) return 1;
240
241 m_throttle.SetMaxConns(max_conn);
242 return 0;
243}
244
245
246/******************************************************************************/
247/* x t h r o t t l e */
248/******************************************************************************/
249
250/* Function: xthrottle
251
252 Purpose: To parse the directive: throttle [data <drate>] [iops <irate>] [concurrency <climit>] [interval <rint>]
253
254 <drate> maximum bytes per second through the server.
255 <irate> maximum IOPS per second through the server.
256 <climit> maximum number of concurrent IO connections.
257 <rint> minimum interval in milliseconds between throttle re-computing.
258
259 Output: 0 upon success or !0 upon failure.
260*/
261int
262FileSystem::xthrottle(XrdOucStream &Config)
263{
264 long long drate = -1, irate = -1, rint = 1000, climit = -1;
265 char *val;
266
267 while ((val = Config.GetWord()))
268 {
269 if (strcmp("data", val) == 0)
270 {
271 if (!(val = Config.GetWord()))
272 {m_eroute.Emsg("Config", "data throttle limit not specified."); return 1;}
273 if (XrdOuca2x::a2sz(m_eroute,"data throttle value",val,&drate,1)) return 1;
274 }
275 else if (strcmp("iops", val) == 0)
276 {
277 if (!(val = Config.GetWord()))
278 {m_eroute.Emsg("Config", "IOPS throttle limit not specified."); return 1;}
279 if (XrdOuca2x::a2sz(m_eroute,"IOPS throttle value",val,&irate,1)) return 1;
280 }
281 else if (strcmp("rint", val) == 0)
282 {
283 if (!(val = Config.GetWord()))
284 {m_eroute.Emsg("Config", "recompute interval not specified."); return 1;}
285 if (XrdOuca2x::a2sp(m_eroute,"recompute interval value",val,&rint,10)) return 1;
286 }
287 else if (strcmp("concurrency", val) == 0)
288 {
289 if (!(val = Config.GetWord()))
290 {m_eroute.Emsg("Config", "Concurrency limit not specified."); return 1;}
291 if (XrdOuca2x::a2sz(m_eroute,"Concurrency limit value",val,&climit,1)) return 1;
292 }
293 else
294 {
295 m_eroute.Emsg("Config", "Warning - unknown throttle option specified", val, ".");
296 }
297 }
298
299 m_throttle.SetThrottles(drate, irate, climit, static_cast<float>(rint)/1000.0);
300 return 0;
301}
302
303/******************************************************************************/
304/* x l o a d s h e d */
305/******************************************************************************/
306
307/* Function: xloadshed
308
309 Purpose: To parse the directive: loadshed host <hostname> [port <port>] [frequency <freq>]
310
311 <hostname> hostname of server to shed load to. Required
312 <port> port of server to shed load to. Defaults to 1094
313 <freq> A value from 1 to 100 specifying how often to shed load
314 (1 = 1% chance; 100 = 100% chance; defaults to 10).
315
316 Output: 0 upon success or !0 upon failure.
317*/
318int FileSystem::xloadshed(XrdOucStream &Config)
319{
320 long long port = 0, freq = 0;
321 char *val;
322 std::string hostname;
323
324 while ((val = Config.GetWord()))
325 {
326 if (strcmp("host", val) == 0)
327 {
328 if (!(val = Config.GetWord()))
329 {m_eroute.Emsg("Config", "loadshed hostname not specified."); return 1;}
330 hostname = val;
331 }
332 else if (strcmp("port", val) == 0)
333 {
334 if (!(val = Config.GetWord()))
335 {m_eroute.Emsg("Config", "Port number not specified."); return 1;}
336 if (XrdOuca2x::a2sz(m_eroute,"Port number",val,&port,1, 65536)) return 1;
337 }
338 else if (strcmp("frequency", val) == 0)
339 {
340 if (!(val = Config.GetWord()))
341 {m_eroute.Emsg("Config", "Loadshed frequency not specified."); return 1;}
342 if (XrdOuca2x::a2sz(m_eroute,"Loadshed frequency",val,&freq,1,100)) return 1;
343 }
344 else
345 {
346 m_eroute.Emsg("Config", "Warning - unknown loadshed option specified", val, ".");
347 }
348 }
349
350 if (hostname.empty())
351 {
352 m_eroute.Emsg("Config", "must specify hostname for loadshed parameter.");
353 return 1;
354 }
355
356 m_throttle.SetLoadShed(hostname, port, freq);
357 return 0;
358}
359
360/******************************************************************************/
361/* x t r a c e */
362/******************************************************************************/
363
364/* Function: xtrace
365
366 Purpose: To parse the directive: trace <events>
367
368 <events> the blank separated list of events to trace. Trace
369 directives are cummalative.
370
371 Output: 0 upon success or 1 upon failure.
372*/
373
374int FileSystem::xtrace(XrdOucStream &Config)
375{
376 char *val;
377 static const struct traceopts {const char *opname; int opval;} tropts[] =
378 {
379 {"all", TRACE_ALL},
380 {"off", TRACE_NONE},
381 {"none", TRACE_NONE},
382 {"debug", TRACE_DEBUG},
383 {"iops", TRACE_IOPS},
384 {"bandwidth", TRACE_BANDWIDTH},
385 {"ioload", TRACE_IOLOAD},
386 {"files", TRACE_FILES},
387 {"connections",TRACE_CONNS},
388 };
389 int i, neg, trval = 0, numopts = sizeof(tropts)/sizeof(struct traceopts);
390
391 if (!(val = Config.GetWord()))
392 {
393 m_eroute.Emsg("Config", "trace option not specified");
394 return 1;
395 }
396 while (val)
397 {
398 if (!strcmp(val, "off"))
399 {
400 trval = 0;
401 }
402 else
403 {
404 if ((neg = (val[0] == '-' && val[1])))
405 {
406 val++;
407 }
408 for (i = 0; i < numopts; i++)
409 {
410 if (!strcmp(val, tropts[i].opname))
411 {
412 if (neg)
413 {
414 if (tropts[i].opval) trval &= ~tropts[i].opval;
415 else trval = TRACE_ALL;
416 }
417 else if (tropts[i].opval) trval |= tropts[i].opval;
418 else trval = TRACE_NONE;
419 break;
420 }
421 }
422 if (i >= numopts)
423 {
424 m_eroute.Say("Config warning: ignoring invalid trace option '", val, "'.");
425 }
426 }
427 val = Config.GetWord();
428 }
429 m_trace.What = trval;
430 return 0;
431}
432
#define TS_Xeq(x, m)
Definition XrdConfig.cc:157
static XrdSysError eDest(0,"crypto_")
XrdSfsFileSystem * XrdSfsGetDefaultFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *EnvInfo)
Definition XrdOfsFS.cc:49
#define open
Definition XrdPosix.hh:76
static XrdSfsFileSystem * LoadFS(const std::string &fslib, XrdSysError &eDest, const std::string &config_file)
XrdSfsFileSystem * XrdSfsGetFileSystem2(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)
XrdSfsFileSystem * XrdSfsGetFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn)
XrdVERSIONINFO(XrdSfsGetFileSystem, FileSystem)
#define TRACE_IOLOAD
#define TRACE_BANDWIDTH
#define TRACE_FILES
#define TRACE_CONNS
#define TRACE_IOPS
#define TRACE_NONE
Definition XrdTrace.hh:34
#define TRACE_DEBUG
Definition XrdTrace.hh:36
#define TRACE_ALL
Definition XrdTrace.hh:35
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
static int a2sp(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:213
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
uint64_t FeatureSet
Adjust features at initialization.
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
void * getPlugin(const char *pname, int optional=0)
void * Persist()
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void SetMaxConns(unsigned long max_conns)
XrdCmsConfig Config
XrdSfsFileSystem * XrdSfsGetFileSystem_Internal(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)