1 // Copyright Ferdinand Majerech 2014. 2 // Distributed under the Boost Software License, Version 1.0. 3 // (See accompanying file LICENSE_1_0.txt or copy at 4 // http://www.boost.org/LICENSE_1_0.txt) 5 6 7 /// Profiling data sources (which receive profiling data sent by the profiled application). 8 module despiker.profdatasource; 9 10 11 import core.time; 12 import std.stdio; 13 import std.exception: assumeWontThrow, ErrnoException; 14 15 16 import despiker.backend; 17 18 19 /** Base class for profiling data sources. 20 * 21 * For now, the only implementation reads profile data from stdin. (Profiled application 22 * launches despiker and writes to its stdin through a pipe). 23 * 24 * Note: $(B Must) be destroyed manually (or thrugh $(D std.typecons.scoped), etc.). 25 * A ProfDataSource may contain resources (threads, sockets, file handles) that 26 * must be freed. 27 */ 28 abstract class ProfDataSource 29 { 30 /** Try to receive a _chunk of profiling data. 31 * 32 * Returns: true if a _chunk was received (and written to chunk), false otherwise. 33 */ 34 bool receiveChunk(out ProfileDataChunk chunk) @system nothrow; 35 } 36 37 /// A chunk of profiling data. 38 struct ProfileDataChunk 39 { 40 /** ID (index) of the profiled thread the profiler of which generated the profiling data. 41 * 42 * Useful when the user is profiling multiple threads (with separate Profilers). 43 */ 44 uint threadId; 45 46 /** Profiling data itself. Must contain whole events (must not start/end in the middle 47 * of an event). 48 */ 49 immutable(ubyte)[] data; 50 } 51 52 /// Exception thrown at ProfileDataSource errors. 53 class ProfDataSourceException : Exception 54 { 55 this(string msg, string file = __FILE__, int line = __LINE__) @safe pure nothrow 56 { 57 super(msg, file, line); 58 } 59 } 60 61 /** A profiling data source that reads profiling data from stdin. 62 * 63 * To use this, the profiled application launches despiker, gets a pipe to despiker's 64 * stdin and writes data to that pipe. 65 * 66 * 67 * The 'protocol' for sending profiling data through stdin: 68 * 69 * Profiling data is sent in varying-size chunks with the following structure: 70 * -------------------- 71 * uint threadIdx; // Index of the profiled thread (when using multiple per-thread Profilers) 72 * uint byteCount; // Size of profiling data in the chunk, in bytes 73 * ubyte[byteCount]; data; // Profiling data itself. 74 * -------------------- 75 */ 76 class ProfDataSourceStdin: ProfDataSource 77 { 78 import std.concurrency; 79 80 private: 81 /** Background thread that reads profiling data that appears in stdin and sends it to main 82 * thread. 83 */ 84 Tid readerTid_; 85 86 public: 87 /** Construct a ProfDataSourceStdin. 88 * 89 * Throws: 90 * 91 * ProfDataSourceException on failure. 92 */ 93 this() @system 94 { 95 try 96 { 97 // Start spawnedFunc in a new thread. 98 readerTid_ = spawn(&reader, thisTid); 99 } 100 catch(Exception e) 101 { 102 throw new ProfDataSourceException("Failed to init ProfDataSourceStdin: ",e.msg); 103 } 104 } 105 106 import std.typecons; 107 /// Destroy the data source. Must be called by the user. 108 ~this() @system nothrow 109 { 110 // Tell the reader thread to quit. 111 send(readerTid_, Yes.quit).assumeWontThrow; 112 stdin.close().assumeWontThrow; 113 } 114 115 override bool receiveChunk(out ProfileDataChunk chunk) @system nothrow 116 { 117 // while() because if we ignore any chunk (thread idx over maxThreads), we try 118 // to receive the next chunk. 119 while(receiveTimeout(dur!"msecs"(0), 120 (ProfileDataChunk c) { chunk = c; }, 121 (Variant v) { assert(false, "Received unknown type"); }).assumeWontThrow) 122 { 123 if(chunk.threadId >= maxThreads) 124 { 125 // TODO: Add 'ignoreIfThrows()' and use it here (and in similar code) 126 // instead of assumeWontThrow 2014-10-02 127 writeln("Chunk with thread ID greater or equal to 1024; no more than " 128 "1024 threads are supported. Ignoring.").assumeWontThrow; 129 continue; 130 } 131 return true; 132 } 133 134 return false; 135 } 136 137 private: 138 /** The reader thread - reads any chunks from stdin and sends them to main thread. 139 * 140 * Params: 141 * 142 * owner = Tid of the owner thread. 143 */ 144 static void reader(Tid owner) 145 { 146 // Chunk data is read from stdin to here. 147 auto chunkReadBuffer = new ubyte[16 * 1024]; 148 149 scope(exit) { writeln("ProfDataSourceStdin: reader thread exit"); } 150 151 try for(;;) 152 { 153 // Check if the main thread has told us to quit. 154 Flag!"quit" quit; 155 receiveTimeout(dur!"msecs"(0), 156 (Flag!"quit" q) { quit = q; }, 157 (Variant v) { assert(false, "Received unknown type"); }); 158 if(quit) { break; } 159 160 // Read the chunk header. 161 uint[2] header; 162 stdin.rawRead(header[]); 163 const threadIdx = header[0]; 164 const byteCount = header[1]; 165 if(byteCount == 0) { continue; } 166 167 // Enlarge chunkReadBuffer if needed. 168 if(chunkReadBuffer.length < byteCount) { chunkReadBuffer.length = byteCount; } 169 170 // Read byteCount bytes. 171 auto newProfileData = chunkReadBuffer[0 .. byteCount]; 172 stdin.rawRead(newProfileData); 173 174 // Send the chunk. Make a copy since chunkReadBuffer will be overwritten with 175 // the next read chunk. 176 send(owner, ProfileDataChunk(threadIdx, newProfileData[].idup)).assumeWontThrow; 177 } 178 // The quit message should be enough, but just to be sure handle the case when 179 // the main thread is terminated but we don't get the quit message (e.g. on an 180 // error in the main thread?) 181 catch(OwnerTerminated e) 182 { 183 return; 184 } 185 catch(ErrnoException e) 186 { 187 // We occasionally get bad file descriptor when reading stdin fails after 188 // the parent thread is closed. Probably no way to fix this cleanly, so we 189 // just ignore it. 190 return; 191 } 192 catch(Throwable e) 193 { 194 writeln("ProfDataSourceStdin: unexpected exception in reader thread:\n", e); 195 } 196 } 197 } 198 199 /** A profiling data source that reads raw profiling data from files. 200 * 201 * To use this, despiker can be launched manually with the ``-r`` parameter to specify 202 * the raw files. 203 * 204 * The files can be created simply by dumping profile data from a file. 205 * 206 * Example: 207 * -------------------- 208 * // Dumping profile data from multiple profilers, where each profiler is used to 209 * // profile one thread: 210 * 211 * // Profiler[] threadProfilers; 212 * foreach(threadIdx, threadProfiler; threadProfilers) 213 * { 214 * auto file = File("profile%s.raw.prof".format(threadIdx), "wb"); 215 * file.rawWrite(threadProfiler.profileData); 216 * } 217 * -------------------- 218 * 219 */ 220 class ProfDataSourceRaw: ProfDataSource 221 { 222 import std.concurrency; 223 224 private: 225 /** Background thread that reads profiling data that appears in stdin and sends it to main 226 * thread. 227 */ 228 Tid readerTid_; 229 230 public: 231 /** Construct a ProfDataSourceRaw. 232 * 233 * Params: 234 * 235 * filenames = Filenames of profile data files for profiled threads. 236 * 237 * Throws: 238 * 239 * ProfDataSourceException on failure. 240 */ 241 this(string[] filenames) @system 242 { 243 try 244 { 245 // Start spawnedFunc in a new thread. 246 readerTid_ = spawn(&reader, thisTid, filenames.idup); 247 } 248 catch(Exception e) 249 { 250 throw new ProfDataSourceException("Failed to init ProfDataSourceRaw: ",e.msg); 251 } 252 } 253 254 /// Destroy the data source. Must be called by the user. 255 ~this() @system nothrow { } 256 257 override bool receiveChunk(out ProfileDataChunk chunk) @system nothrow 258 { 259 // while() because if we ignore any chunk (thread idx over maxThreads), we try 260 // to receive the next chunk. 261 while(receiveTimeout(dur!"msecs"(0), 262 (ProfileDataChunk c) { chunk = c; }, 263 (Variant v) { assert(false, "Received unknown type"); }).assumeWontThrow) 264 { 265 if(chunk.threadId >= maxThreads) 266 { 267 // TODO: Add 'ignoreIfThrows()' and use it here (and in similar code) 268 // instead of assumeWontThrow 2014-10-02 269 writeln("Chunk with thread ID greater or equal to 1024; no more than " 270 "1024 threads are supported. Ignoring.").assumeWontThrow; 271 continue; 272 } 273 return true; 274 } 275 276 return false; 277 } 278 279 private: 280 /** The reader thread - reads profile data from files and sends it to the main thread. 281 * 282 * Params: 283 * 284 * owner = Tid of the owner thread. 285 * filenames = Filenames of profile data files for profiled threads. 286 */ 287 static void reader(Tid owner, immutable(string[]) filenames) 288 { 289 import std.file; 290 try 291 { 292 import std.algorithm, std.array; 293 foreach(uint threadIdx, name; filenames) 294 { 295 send(owner, ProfileDataChunk(threadIdx, cast(immutable(ubyte)[])read(name))); 296 } 297 } 298 catch(FileException e) { writeln("Failed to read profiling data: ", e.msg); } 299 // Be sure to handle the case when the main thread is terminated but we don't get 300 // the quit message (e.g. on an error in the main thread?) 301 catch(OwnerTerminated e) { writeln("Failed to read profiling data: ", e.msg); } 302 catch(ErrnoException e) { writeln("Failed to read profiling data: ", e.msg); } 303 catch(Throwable e) 304 { 305 writeln("ProfDataSourceStdin: unexpected exception in reader thread:\n", e); 306 } 307 } 308 }