Pol  Revision:3cfda13
streamsaver.cpp
Go to the documentation of this file.
1 #include <fstream>
2 #include <iostream>
3 #include <string>
4 #include <thread>
5 
6 #include "streamsaver.h"
7 
8 namespace Pol
9 {
10 namespace Clib
11 {
12 const std::size_t flush_limit = 10000; // 500;
13 
15 StreamWriter::StreamWriter() : _writer( new fmt::Writer ) {}
17 
19 {
20  if ( _writer->size() >= flush_limit ) // guard against to big objects
21  {
22  this->flush();
23  }
24  return *( _writer.get() );
25 }
26 
29  : StreamWriter(),
30  _stream(),
31 #if 0
32  _fs_time( 0 ),
33 #endif
34  _stream_name()
35 {
36 }
37 
38 OFStreamWriter::OFStreamWriter( std::ofstream* stream )
39  : StreamWriter(),
40  _stream( stream ),
41 #if 0
42  _fs_time( 0 ),
43 #endif
44  _stream_name()
45 {
46 }
47 
49 {
50 #if 0
51  if ( _writer->size() )
52  {
54  *_stream << _writer->c_str();
55  _fs_time += t.ellapsed();
56  }
57  ERROR_PRINT << "streamwriter " << _stream_name << " io time " << _fs_time.count( ) << "\n";
58 #else
59  if ( _writer->size() )
60  *_stream << _writer->str();
61 #endif
62 }
63 
64 void OFStreamWriter::init( const std::string& filepath )
65 {
66  _stream->exceptions( std::ios_base::failbit | std::ios_base::badbit );
67  _stream->open( filepath.c_str(), std::ios::out );
68  _stream_name = filepath;
69 }
70 
72 {
73 #if 0
75 #endif
76  if ( _writer->size() )
77  {
78  *_stream << _writer->str();
79  _writer->Clear();
80  }
81 #if 0
82  _fs_time += t.ellapsed( );
83 #endif
84 }
85 
87 {
88  flush();
89  _stream->flush();
90 }
91 
94 
95 OStreamWriter::OStreamWriter( std::ostream* stream ) : StreamWriter(), _stream( stream ) {}
96 
98 {
99  if ( _writer->size() )
100  *_stream << _writer->str();
101 }
102 
103 void OStreamWriter::init( const std::string& ) {}
104 
106 {
107  if ( _writer->size() )
108  {
109  *_stream << _writer->str();
110  _writer.reset( new fmt::Writer );
111  }
112 }
113 
115 {
116  flush();
117  _stream->flush();
118 }
119 
122  : StreamWriter(), _stream(), _msg_queue(), _writethread(), _writers_hold(), _stream_name()
123 {
124  start_worker();
125 }
126 
128  : StreamWriter(),
129  _stream( stream ),
130  _msg_queue(),
131  _writethread(),
132  _writers_hold(),
133  _stream_name()
134 {
135  start_worker();
136 }
138 {
139  _writethread = std::thread( [this]() {
140  std::list<WriterPtr> writers;
141  // small helper lambda to write into stream
142  auto _write_to_stream = [&]( std::list<WriterPtr>& l ) {
143  for ( const auto& _w : l )
144  {
145  if ( _w->size() )
146  *_stream << _w->str();
147  }
148  };
149  try
150  {
151  for ( ;; )
152  {
153  writers.clear();
154  _msg_queue.pop_wait( &writers );
155  _write_to_stream( writers );
156  }
157  }
158  catch ( writer_queue::Canceled& )
159  {
160  }
161  writers.clear();
162  _msg_queue.pop_remaining( &writers );
163  _write_to_stream( writers );
164  _stream->flush();
165  } );
166 }
167 
169 {
170  flush_file();
171 }
172 
173 void ThreadedOFStreamWriter::init( const std::string& filepath )
174 {
175  _stream->exceptions( std::ios_base::failbit | std::ios_base::badbit );
176  _stream->open( filepath.c_str(), std::ios::out );
177  _stream_name = filepath;
178 }
179 
181 {
182  if ( _writer->size() )
183  {
184  _writers_hold.emplace_back( std::move( _writer ) );
185  if ( _writers_hold.size() > 10 )
186  {
188  }
189  _writer.reset( new fmt::Writer );
190  }
191 }
192 
194 {
195  if ( _writethread.joinable() )
196  {
197  flush();
198  if ( !_writers_hold.empty() )
200 
201  _msg_queue.cancel();
202  _writethread.join();
203  _stream->close();
204  }
205 }
206 }
207 }
virtual void flush() POL_OVERRIDE
Definition: streamsaver.cpp:71
void pop_remaining(std::list< Message > *msgs)
virtual void flush() POL_OVERRIDE
ThreadedOFStreamWriter()
ofstream implementation with worker thread for file io
std::unique_ptr< fmt::Writer > _writer
Definition: streamsaver.h:32
virtual void flush_file() POL_OVERRIDE
virtual void init(const std::string &filepath) POL_OVERRIDE
time_mu ellapsed() const
Definition: timer.cpp:67
fmt::Writer & operator()()
Definition: streamsaver.cpp:18
void pop_wait(Message *msg)
virtual void flush() POL_OVERRIDE
virtual void flush_file() POL_OVERRIDE
Definition: streamsaver.cpp:86
std::ostream * _stream
Definition: streamsaver.h:74
StreamWriter()
BaseClass implements only writer operator logic.
Definition: streamsaver.cpp:15
virtual void flush()=0
void push(Message const &msg)
Definition: message_queue.h:73
virtual void init(const std::string &filepath) POL_OVERRIDE
Definition: streamsaver.cpp:64
virtual void init(const std::string &filepath) POL_OVERRIDE
OFStreamWriter()
ofstream implementation (simple non threaded)
Definition: streamsaver.cpp:28
std::ofstream * _stream
Definition: streamsaver.h:56
virtual void flush_file() POL_OVERRIDE
std::list< WriterPtr > _writers_hold
Definition: streamsaver.h:96
#define ERROR_PRINT
Definition: logfacility.h:230
OStreamWriter()
ostream implementation (non threaded)
Definition: streamsaver.cpp:93
const std::size_t flush_limit
Definition: streamsaver.cpp:12
Definition: berror.cpp:12