00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "CeylanMultiplexedServerStreamSocket.h"
00028
00029
00030 #include "CeylanLogPlug.h"
00031 #include "CeylanOperators.h"
00032 #include "CeylanThread.h"
00033 #include "CeylanAnonymousStreamSocket.h"
00034 #include "CeylanStringUtils.h"
00035 #include "CeylanSignal.h"
00036 #include "CeylanInputStream.h"
00037
00038
00039
00040 #ifdef CEYLAN_USES_CONFIG_H
00041 #include "CeylanConfig.h"
00042 #endif // CEYLAN_USES_CONFIG_H
00043
00044
00045
00046 extern "C"
00047 {
00048
00049 #ifdef CEYLAN_USES_SYS_TYPES_H
00050 #include <sys/types.h>
00051 #endif // CEYLAN_USES_SYS_TYPES_H
00052
00053 #ifdef CEYLAN_USES_SYS_SOCKET_H
00054 #include <sys/socket.h>
00055 #endif // CEYLAN_USES_SYS_SOCKET_H
00056
00057 #ifdef CEYLAN_USES_ARPA_INET_H
00058 #include <arpa/inet.h>
00059 #endif // CEYLAN_USES_ARPA_INET_H
00060
00061 #ifdef CEYLAN_USES_NETINET_IN_H
00062 #include <netinet/in.h>
00063 #endif // CEYLAN_USES_NETINET_IN_H
00064
00065 }
00066
00067
00068 using namespace Ceylan::System ;
00069 using namespace Ceylan::Network ;
00070 using namespace Ceylan::Log ;
00071
00072
00073 using std::string ;
00074 using std::list ;
00075 using std::set ;
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086 MultiplexedServerStreamSocket::MultiplexedServerStreamSocketException::MultiplexedServerStreamSocketException( const std::string & reason ) :
00087 ServerStreamSocketException( reason )
00088 {
00089
00090 }
00091
00092
00093
00094 MultiplexedServerStreamSocket::MultiplexedServerStreamSocketException::~MultiplexedServerStreamSocketException() throw()
00095 {
00096
00097 }
00098
00099
00100
00101 MultiplexedServerStreamSocket::MultiplexedServerStreamSocket(
00102 Port listeningPort, bool reuse ) :
00103 ServerStreamSocket( listeningPort, reuse, false ),
00104 _currentConnections()
00105 {
00106
00107 #if CEYLAN_USES_NETWORK
00108
00109
00110
00111 #else // CEYLAN_USES_NETWORK
00112
00113 throw MultiplexedServerStreamSocketException(
00114 "MultiplexedServerStreamSocket constructor failed: "
00115 "network support not available." ) ;
00116
00117 #endif // CEYLAN_USES_NETWORK
00118
00119 }
00120
00121
00122
00123 MultiplexedServerStreamSocket::~MultiplexedServerStreamSocket() throw()
00124 {
00125
00126 #if CEYLAN_USES_NETWORK
00127
00128 closeAcceptedConnections() ;
00129
00130 #endif // CEYLAN_USES_NETWORK
00131
00132 }
00133
00134
00135
00136 bool MultiplexedServerStreamSocket::isConnected() const
00137 {
00138
00139 return ! _currentConnections.empty() ;
00140
00141 }
00142
00143
00144
00145 void MultiplexedServerStreamSocket::run()
00146 {
00147
00148 #if CEYLAN_USES_NETWORK
00149
00150
00151 #if CEYLAN_DEBUG_NETWORK_SERVERS
00152 LogPlug::trace( "Entering in MultiplexedServerStreamSocket::run" ) ;
00153 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00154
00155
00156
00157 Ceylan::Uint32 connectionCount = 0 ;
00158
00159 if ( ! _currentConnections.empty() )
00160 throw MultiplexedServerStreamSocketException(
00161 "MultiplexedServerStreamSocket::run failed: "
00162 "there are already living connections." ) ;
00163
00164 #if CEYLAN_ARCH_WINDOWS == 0
00165
00166 Signal::ignore( Signal::BrokenPipe ) ;
00167 #endif // CEYLAN_ARCH_WINDOWS
00168
00169 prepareToAccept() ;
00170
00171
00172 list<InputStream *> watchedSockets ;
00173
00174
00175 watchedSockets.push_back( static_cast<InputStream*>( this ) ) ;
00176
00177
00178 list<AnonymousStreamSocket*> connectionsToRemove ;
00179
00180
00181 #if CEYLAN_DEBUG_NETWORK_SERVERS
00182 LogPlug::trace( "MultiplexedServerStreamSocket::run: "
00183 "entering main listening loop." ) ;
00184 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00185
00186
00187 while ( ! isRequestedToStop() )
00188 {
00189
00190 connectionsToRemove.clear() ;
00191
00192
00193
00194 try
00195 {
00196
00197
00198 #if CEYLAN_DEBUG_NETWORK_SERVERS
00199 LogPlug::trace( "MultiplexedServerStreamSocket::run: "
00200 "waiting for selected sockets." ) ;
00201 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00202
00203
00204
00205
00206
00207
00208
00209 InputStream::Select( watchedSockets ) ;
00210
00211 #if CEYLAN_DEBUG_NETWORK_SERVERS
00212 LogPlug::trace( "MultiplexedServerStreamSocket::run: "
00213 "at least a socket is selected." ) ;
00214 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00215
00216
00217
00218 for ( list<InputStream*>::iterator it = watchedSockets.begin();
00219 it != watchedSockets.end(); it++ )
00220 {
00221
00222 if ( (*it)->isFaulty() )
00223 {
00224
00225 if ( (*it) != this )
00226 {
00227
00228 LogPlug::warning(
00229 "MultiplexedServerStreamSocket::run: "
00230 "a faulty connection socket will be removed: "
00231 + (*it)->toString() ) ;
00232
00233 connectionsToRemove.push_back(
00234 dynamic_cast<AnonymousStreamSocket*>( *it ) ) ;
00235
00236 }
00237 else
00238 {
00239
00240 LogPlug::error( "MultiplexedServerStreamSocket::run: "
00241 "listening socket is in faulty state: "
00242 + toString() + ", trying to overcome..." ) ;
00243
00244
00245 setFaulty( false ) ;
00246
00247 }
00248
00249 }
00250
00251
00252 if ( (*it)->isSelected() )
00253 {
00254
00255 FileDescriptor selectedFD = (*it)->getInputStreamID() ;
00256
00257
00258
00259
00260
00261
00262
00263 if ( selectedFD == getOriginalFileDescriptor() )
00264 {
00265
00266
00267
00268
00269
00270
00271
00272 #if CEYLAN_DEBUG_NETWORK_SERVERS
00273 LogPlug::trace( "MultiplexedServerStreamSocket::run: "
00274 "trying to accept a new connection." ) ;
00275 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00276
00277 InputStream * newAnonymousSocket = accept() ;
00278
00279 if ( newAnonymousSocket != 0 )
00280 {
00281
00282 watchedSockets.push_back( newAnonymousSocket ) ;
00283
00284 connectionCount++ ;
00285
00286 #if CEYLAN_DEBUG_NETWORK_SERVERS
00287 LogPlug::trace(
00288 "MultiplexedServerStreamSocket::run: "
00289 "connection #"
00290 + Ceylan::toString( connectionCount )
00291 + " accepted." ) ;
00292 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00293
00294 }
00295 else
00296 {
00297
00298 #if CEYLAN_DEBUG_NETWORK_SERVERS
00299 LogPlug::trace(
00300 "MultiplexedServerStreamSocket::run: "
00301 "no connection could be accepted." ) ;
00302 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00303
00304 }
00305
00306 }
00307 else
00308 {
00309
00310
00311
00312 AnonymousStreamSocket * connection =
00313 dynamic_cast<AnonymousStreamSocket*>( *it ) ;
00314
00315 if ( connection == 0 )
00316 {
00317 LogPlug::error(
00318 "MultiplexedServerStreamSocket::run: "
00319 "unexpected failure of conversion to "
00320 "an anonymous stream socket of: "
00321 + (*it)->toString() ) ;
00322 }
00323
00324
00325 #if CEYLAN_DEBUG_NETWORK_SERVERS
00326 LogPlug::trace( "MultiplexedServerStreamSocket::run: "
00327 "following connection socket has data available: "
00328 + connection->toString() + "." ) ;
00329 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00330
00331 if ( ! handleConnection( *connection ) )
00332 connectionsToRemove.push_back( connection ) ;
00333
00334
00335 }
00336
00337
00338 }
00339
00340
00341 }
00342
00343
00344
00345 #if CEYLAN_DEBUG_NETWORK_SERVERS
00346
00347 LogPlug::trace( "MultiplexedServerStreamSocket::run: "
00348 "end of select scan." ) ;
00349
00350
00351 Thread::Sleep( 0 , 200000 ) ;
00352
00353 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00354
00355
00356
00357
00358
00359
00360
00361 while ( ! connectionsToRemove.empty() )
00362 {
00363
00364 AnonymousStreamSocket & toDel = * connectionsToRemove.front() ;
00365 connectionsToRemove.pop_front() ;
00366
00367 #if CEYLAN_DEBUG_NETWORK_SERVERS
00368 LogPlug::trace( "MultiplexedServerStreamSocket::run: "
00369 "removing stopped connection: " + toDel.toString() ) ;
00370 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00371
00372 closeConnection( toDel ) ;
00373 watchedSockets.remove( & toDel ) ;
00374
00375 }
00376
00377
00378 }
00379 catch( const Ceylan::Exception & e )
00380 {
00381 LogPlug::error ( "MultiplexedServerStreamSocket::run: "
00382 "error caught: " + e.toString() ) ;
00383 }
00384
00385
00386 }
00387
00388 #if CEYLAN_DEBUG_NETWORK_SERVERS
00389 LogPlug::trace( "Exiting from MultiplexedServerStreamSocket::run" ) ;
00390 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00391
00392
00393 #endif // CEYLAN_USES_NETWORK
00394
00395 }
00396
00397
00398
00399 AnonymousStreamSocket * MultiplexedServerStreamSocket::accept()
00400 {
00401
00402 #if CEYLAN_USES_NETWORK
00403
00404 if ( ! _bound )
00405 prepareToAccept() ;
00406
00407
00408 #if CEYLAN_DEBUG_NETWORK_SERVERS
00409 LogPlug::trace( "MultiplexedServerStreamSocket::accept: "
00410 "will accept now connections, state is: " + toString() ) ;
00411 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00412
00413 AnonymousStreamSocket * res ;
00414
00415 try
00416 {
00417
00418
00419 res = new AnonymousStreamSocket( getOriginalFileDescriptor(),
00420 false ) ;
00421
00422 }
00423 catch( const AnonymousStreamSocket::NonBlockingAcceptException & e )
00424 {
00425
00426 LogPlug::warning( "MultiplexedServerStreamSocket::accept "
00427 "cancelled: " + e.toString() ) ;
00428
00429 return 0 ;
00430
00431 }
00432 catch( const SocketException & e )
00433 {
00434 throw MultiplexedServerStreamSocketException(
00435 "MultiplexedServerStreamSocket::accept failed: "
00436 + e.toString() ) ;
00437 }
00438
00439 _currentConnections.insert( res ) ;
00440
00441 #if CEYLAN_DEBUG_NETWORK_SERVERS
00442 LogPlug::trace( "MultiplexedServerStreamSocket::accept: "
00443 "new connection accepted" ) ;
00444 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00445
00446 accepted( *res ) ;
00447
00448 #if CEYLAN_DEBUG_NETWORK_SERVERS
00449 LogPlug::trace( "MultiplexedServerStreamSocket::accept: "
00450 "connection terminated, cleaning up afterwards" ) ;
00451 #endif // CEYLAN_DEBUG_NETWORK_SERVERS
00452
00453
00454
00455 return res ;
00456
00457
00458 #else // CEYLAN_USES_NETWORK
00459
00460
00461 throw ServerStreamSocketException(
00462 "MultiplexedServerStreamSocket::accept: "
00463 "network feature not available." ) ;
00464
00465
00466 #endif // CEYLAN_USES_NETWORK
00467
00468 }
00469
00470
00471
00472 void MultiplexedServerStreamSocket::closeConnection(
00473 AnonymousStreamSocket & connection )
00474 {
00475
00476 if ( _currentConnections.find( &connection ) != _currentConnections.end() )
00477 {
00478
00479 delete &connection ;
00480 _currentConnections.erase( &connection ) ;
00481
00482 }
00483 else
00484 throw MultiplexedServerStreamSocketException(
00485 "MultiplexedServerStreamSocket::closeConnection: "
00486 "unable to find following connection: " + connection.toString() ) ;
00487 }
00488
00489
00490
00491 const std::string MultiplexedServerStreamSocket::toString(
00492 Ceylan::VerbosityLevels level ) const
00493 {
00494
00495 #if CEYLAN_USES_NETWORK
00496
00497 string res = "MultiplexedServerStreamSocket" ;
00498
00499 if ( _currentConnections.empty() )
00500 {
00501 res += " currently not managing any connection" ;
00502 }
00503 else
00504 {
00505
00506 list<string> connectionDescriptions ;
00507
00508 for ( set<AnonymousStreamSocket *>::const_iterator it =
00509 _currentConnections.begin();
00510 it != _currentConnections.end(); it++ )
00511 connectionDescriptions.push_back( (*it)->toString( level ) ) ;
00512
00513 res += " managing currently following connection(s): "
00514 + Ceylan::formatStringList( connectionDescriptions ) ;
00515
00516 }
00517
00518 if ( level == Ceylan::medium )
00519 return res ;
00520
00521 return res + ". " + StreamSocket::toString( level ) ;
00522
00523 #else // CEYLAN_USES_NETWORK
00524
00525 return "MultiplexedServerStreamSocket (no network support not available)" ;
00526
00527 #endif // CEYLAN_USES_NETWORK
00528
00529 }
00530
00531
00532
00533 bool MultiplexedServerStreamSocket::closeAcceptedConnections()
00534 {
00535
00536 if ( _currentConnections.empty() )
00537 return false ;
00538
00539 for ( set<AnonymousStreamSocket *>::const_iterator it =
00540 _currentConnections.begin(); it != _currentConnections.end(); it++ )
00541 {
00542
00543 delete (*it) ;
00544
00545 }
00546
00547 _currentConnections.clear() ;
00548
00549 return true ;
00550
00551 }
00552