1 //          Copyright Ferdinand Majerech 2014.
2 // Distributed under the Boost Software License, Version 1.0.
3 //    (See accompanying file LICENSE_1_0.txt or copy at
4 //          http://www.boost.org/LICENSE_1_0.txt)
5 
6 
7 /// Profiling data sources (which receive profiling data sent by the profiled application).
8 module despiker.profdatasource;
9 
10 
11 import core.time;
12 import std.stdio;
13 import std.exception: assumeWontThrow, ErrnoException;
14 
15 
16 import despiker.backend;
17 
18 
19 /** Base class for profiling data sources.
20  *
21  * For now, the only implementation reads profile data from stdin. (Profiled application
22  * launches despiker and writes to its stdin through a pipe).
23  *
24  * Note: $(B Must) be destroyed manually (or thrugh $(D std.typecons.scoped), etc.).
25  *       A ProfDataSource may contain resources (threads, sockets, file handles) that
26  *       must be freed.
27  */
28 abstract class ProfDataSource
29 {
30     /** Try to receive a _chunk of profiling data.
31      *
32      * Returns: true if a _chunk was received (and written to chunk), false otherwise.
33      */
34     bool receiveChunk(out ProfileDataChunk chunk) @system nothrow;
35 }
36 
37 /// A chunk of profiling data.
38 struct ProfileDataChunk
39 {
40     /** ID (index) of the profiled thread the profiler of which generated the profiling data.
41      *
42      * Useful when the user is profiling multiple threads (with separate Profilers).
43      */
44     uint threadId;
45 
46     /** Profiling data itself. Must contain whole events (must not start/end in the middle
47      * of an event).
48      */
49     immutable(ubyte)[] data;
50 }
51 
52 /// Exception thrown at ProfileDataSource errors.
53 class ProfDataSourceException : Exception
54 {
55     this(string msg, string file = __FILE__, int line = __LINE__) @safe pure nothrow
56     {
57         super(msg, file, line);
58     }
59 }
60 
61 /** A profiling data source that reads profiling data from stdin.
62  *
63  * To use this, the profiled application launches despiker, gets a pipe to despiker's
64  * stdin and writes data to that pipe.
65  *
66  *
67  * The 'protocol' for sending profiling data through stdin:
68  *
69  * Profiling data is sent in varying-size chunks with the following structure:
70  * --------------------
71  * uint threadIdx; // Index of the profiled thread (when using multiple per-thread Profilers)
72  * uint byteCount; // Size of profiling data in the chunk, in bytes
73  * ubyte[byteCount]; data; // Profiling data itself.
74  * --------------------
75  */
76 class ProfDataSourceStdin: ProfDataSource
77 {
78     import std.concurrency;
79 
80 private:
81     /** Background thread that reads profiling data that appears in stdin and sends it to main
82      * thread.
83      */
84     Tid readerTid_;
85 
86 public:
87     /** Construct a ProfDataSourceStdin.
88      *
89      * Throws:
90      *
91      * ProfDataSourceException on failure.
92      */
93     this() @system
94     {
95         try
96         {
97             // Start spawnedFunc in a new thread.
98             readerTid_ = spawn(&reader, thisTid);
99         }
100         catch(Exception e)
101         {
102             throw new ProfDataSourceException("Failed to init ProfDataSourceStdin: ",e.msg);
103         }
104     }
105 
106     import std.typecons;
107     /// Destroy the data source. Must be called by the user.
108     ~this() @system nothrow
109     {
110         // Tell the reader thread to quit.
111         send(readerTid_, Yes.quit).assumeWontThrow;
112         stdin.close().assumeWontThrow;
113     }
114 
115     override bool receiveChunk(out ProfileDataChunk chunk) @system nothrow
116     {
117         // while() because if we ignore any chunk (thread idx over maxThreads), we try
118         // to receive the next chunk.
119         while(receiveTimeout(dur!"msecs"(0),
120               (ProfileDataChunk c) { chunk = c; },
121               (Variant v) { assert(false, "Received unknown type"); }).assumeWontThrow)
122         {
123             if(chunk.threadId >= maxThreads)
124             {
125                 // TODO: Add 'ignoreIfThrows()' and use it here (and in similar code)
126                 //       instead of assumeWontThrow 2014-10-02
127                 writeln("Chunk with thread ID greater or equal to 1024; no more than "
128                         "1024 threads are supported. Ignoring.").assumeWontThrow;
129                 continue;
130             }
131             return true;
132         }
133 
134         return false;
135     }
136 
137 private:
138     /** The reader thread - reads any chunks from stdin and sends them to main thread.
139      *
140      * Params:
141      *
142      * owner = Tid of the owner thread.
143      */
144     static void reader(Tid owner)
145     {
146         // Chunk data is read from stdin to here.
147         auto chunkReadBuffer = new ubyte[16 * 1024];
148 
149         scope(exit) { writeln("ProfDataSourceStdin: reader thread exit"); }
150 
151         try for(;;)
152         {
153             // Check if the main thread has told us to quit.
154             Flag!"quit" quit;
155             receiveTimeout(dur!"msecs"(0),
156                 (Flag!"quit" q) { quit = q; },
157                 (Variant v) { assert(false, "Received unknown type"); });
158             if(quit) { break; }
159 
160             // Read the chunk header.
161             uint[2] header;
162             stdin.rawRead(header[]);
163             const threadIdx = header[0];
164             const byteCount = header[1];
165             if(byteCount == 0) { continue; }
166 
167             // Enlarge chunkReadBuffer if needed.
168             if(chunkReadBuffer.length < byteCount) { chunkReadBuffer.length = byteCount; }
169 
170             // Read byteCount bytes.
171             auto newProfileData = chunkReadBuffer[0 .. byteCount];
172             stdin.rawRead(newProfileData);
173 
174             // Send the chunk. Make a copy since chunkReadBuffer will be overwritten with
175             // the next read chunk.
176             send(owner, ProfileDataChunk(threadIdx, newProfileData[].idup)).assumeWontThrow;
177         }
178         // The quit message should be enough, but just to be sure handle the case when
179         // the main thread is terminated but we don't get the quit message (e.g. on an
180         // error in the main thread?)
181         catch(OwnerTerminated e)
182         {
183             return;
184         }
185         catch(ErrnoException e)
186         {
187             // We occasionally get bad file descriptor when reading stdin fails after
188             // the parent thread is closed. Probably no way to fix this cleanly, so we
189             // just ignore it.
190             return;
191         }
192         catch(Throwable e)
193         {
194             writeln("ProfDataSourceStdin: unexpected exception in reader thread:\n", e);
195         }
196     }
197 }
198 
199 /** A profiling data source that reads raw profiling data from files.
200  *
201  * To use this, despiker can be launched manually with the ``-r`` parameter to specify
202  * the raw files.
203  *
204  * The files can be created simply by dumping profile data from a file.
205  *
206  * Example:
207  * --------------------
208  * // Dumping profile data from multiple profilers, where each profiler is used to 
209  * // profile one thread:
210  *
211  * // Profiler[] threadProfilers;
212  * foreach(threadIdx, threadProfiler; threadProfilers)
213  * {
214  *     auto file = File("profile%s.raw.prof".format(threadIdx), "wb");
215  *     file.rawWrite(threadProfiler.profileData);
216  * }
217  * --------------------
218  *
219  */
220 class ProfDataSourceRaw: ProfDataSource
221 {
222     import std.concurrency;
223 
224 private:
225     /** Background thread that reads profiling data that appears in stdin and sends it to main
226      * thread.
227      */
228     Tid readerTid_;
229 
230 public:
231     /** Construct a ProfDataSourceRaw.
232      *
233      * Params:
234      *
235      * filenames = Filenames of profile data files for profiled threads.
236      *
237      * Throws:
238      *
239      * ProfDataSourceException on failure.
240      */
241     this(string[] filenames) @system
242     {
243         try
244         {
245             // Start spawnedFunc in a new thread.
246             readerTid_ = spawn(&reader, thisTid, filenames.idup);
247         }
248         catch(Exception e)
249         {
250             throw new ProfDataSourceException("Failed to init ProfDataSourceRaw: ",e.msg);
251         }
252     }
253 
254     /// Destroy the data source. Must be called by the user.
255     ~this() @system nothrow { }
256 
257     override bool receiveChunk(out ProfileDataChunk chunk) @system nothrow
258     {
259         // while() because if we ignore any chunk (thread idx over maxThreads), we try
260         // to receive the next chunk.
261         while(receiveTimeout(dur!"msecs"(0),
262               (ProfileDataChunk c) { chunk = c; },
263               (Variant v) { assert(false, "Received unknown type"); }).assumeWontThrow)
264         {
265             if(chunk.threadId >= maxThreads)
266             {
267                 // TODO: Add 'ignoreIfThrows()' and use it here (and in similar code)
268                 //       instead of assumeWontThrow 2014-10-02
269                 writeln("Chunk with thread ID greater or equal to 1024; no more than "
270                         "1024 threads are supported. Ignoring.").assumeWontThrow;
271                 continue;
272             }
273             return true;
274         }
275 
276         return false;
277     }
278 
279 private:
280     /** The reader thread - reads profile data from files and sends it to the main thread.
281      *
282      * Params:
283      *
284      * owner     = Tid of the owner thread.
285      * filenames = Filenames of profile data files for profiled threads.
286      */
287     static void reader(Tid owner, immutable(string[]) filenames)
288     {
289         import std.file;
290         try
291         {
292             import std.algorithm, std.array;
293             foreach(uint threadIdx, name; filenames)
294             {
295                 send(owner, ProfileDataChunk(threadIdx, cast(immutable(ubyte)[])read(name)));
296             }
297         }
298         catch(FileException e)   { writeln("Failed to read profiling data: ", e.msg); }
299         // Be sure to handle the case when the main thread is terminated but we don't get
300         // the quit message (e.g. on an error in the main thread?)
301         catch(OwnerTerminated e) { writeln("Failed to read profiling data: ", e.msg); }
302         catch(ErrnoException e)  { writeln("Failed to read profiling data: ", e.msg); }
303         catch(Throwable e)
304         {
305             writeln("ProfDataSourceStdin: unexpected exception in reader thread:\n", e);
306         }
307     }
308 }