akonadi
resourcebase.cpp
00001 /* 00002 Copyright (c) 2006 Till Adam <adam@kde.org> 00003 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00004 00005 This library is free software; you can redistribute it and/or modify it 00006 under the terms of the GNU Library General Public License as published by 00007 the Free Software Foundation; either version 2 of the License, or (at your 00008 option) any later version. 00009 00010 This library is distributed in the hope that it will be useful, but WITHOUT 00011 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00012 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00013 License for more details. 00014 00015 You should have received a copy of the GNU Library General Public License 00016 along with this library; see the file COPYING.LIB. If not, write to the 00017 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00018 02110-1301, USA. 00019 */ 00020 00021 #include "resourcebase.h" 00022 #include "agentbase_p.h" 00023 00024 #include "resourceadaptor.h" 00025 #include "collectiondeletejob.h" 00026 #include "collectionsync_p.h" 00027 #include "dbusconnectionpool.h" 00028 #include "itemsync.h" 00029 #include "resourcescheduler_p.h" 00030 #include "tracerinterface.h" 00031 #include "xdgbasedirs_p.h" 00032 00033 #include "changerecorder.h" 00034 #include "collectionfetchjob.h" 00035 #include "collectionfetchscope.h" 00036 #include "collectionmodifyjob.h" 00037 #include "itemfetchjob.h" 00038 #include "itemfetchscope.h" 00039 #include "itemmodifyjob.h" 00040 #include "itemmodifyjob_p.h" 00041 #include "session.h" 00042 #include "resourceselectjob_p.h" 00043 #include "monitor_p.h" 00044 #include "servermanager_p.h" 00045 00046 #include <kaboutdata.h> 00047 #include <kcmdlineargs.h> 00048 #include <kdebug.h> 00049 #include <klocale.h> 00050 00051 #include <QtCore/QDebug> 00052 #include <QtCore/QDir> 00053 #include <QtCore/QHash> 00054 #include <QtCore/QSettings> 00055 #include <QtCore/QTimer> 00056 #include <QtGui/QApplication> 00057 #include <QtDBus/QtDBus> 00058 00059 using namespace Akonadi; 00060 00061 class Akonadi::ResourceBasePrivate : public AgentBasePrivate 00062 { 00063 Q_OBJECT 00064 Q_CLASSINFO( "D-Bus Interface", "org.kde.dfaure" ) 00065 00066 public: 00067 ResourceBasePrivate( ResourceBase *parent ) 00068 : AgentBasePrivate( parent ), 00069 scheduler( 0 ), 00070 mItemSyncer( 0 ), 00071 mItemSyncFetchScope( 0 ), 00072 mItemTransactionMode( ItemSync::SingleTransaction ), 00073 mCollectionSyncer( 0 ), 00074 mHierarchicalRid( false ), 00075 mUnemittedProgress( 0 ) 00076 { 00077 Internal::setClientType( Internal::Resource ); 00078 mStatusMessage = defaultReadyMessage(); 00079 mProgressEmissionCompressor.setInterval( 1000 ); 00080 mProgressEmissionCompressor.setSingleShot( true ); 00081 } 00082 00083 ~ResourceBasePrivate() 00084 { 00085 delete mItemSyncFetchScope; 00086 } 00087 00088 Q_DECLARE_PUBLIC( ResourceBase ) 00089 00090 void delayedInit() 00091 { 00092 if ( !DBusConnectionPool::threadConnection().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) ) { 00093 QString reason = DBusConnectionPool::threadConnection().lastError().message(); 00094 if ( reason.isEmpty() ) { 00095 reason = QString::fromLatin1( "this service is probably running already." ); 00096 } 00097 kError() << "Unable to register service at D-Bus: " << reason; 00098 00099 if ( QThread::currentThread() == QCoreApplication::instance()->thread() ) 00100 QCoreApplication::instance()->exit(1); 00101 00102 } else { 00103 AgentBasePrivate::delayedInit(); 00104 } 00105 } 00106 00107 virtual void changeProcessed() 00108 { 00109 mChangeRecorder->changeProcessed(); 00110 if ( !mChangeRecorder->isEmpty() ) 00111 scheduler->scheduleChangeReplay(); 00112 scheduler->taskDone(); 00113 } 00114 00115 void slotAbortRequested(); 00116 00117 void slotDeliveryDone( KJob* job ); 00118 void slotCollectionSyncDone( KJob *job ); 00119 void slotLocalListDone( KJob *job ); 00120 void slotSynchronizeCollection( const Collection &col ); 00121 void slotCollectionListDone( KJob *job ); 00122 void slotSynchronizeCollectionAttributes( const Collection &col ); 00123 void slotCollectionListForAttributesDone( KJob *job ); 00124 void slotCollectionAttributesSyncDone( KJob *job ); 00125 00126 void slotItemSyncDone( KJob *job ); 00127 00128 void slotPercent( KJob* job, unsigned long percent ); 00129 void slotDelayedEmitProgress(); 00130 void slotDeleteResourceCollection(); 00131 void slotDeleteResourceCollectionDone( KJob *job ); 00132 void slotCollectionDeletionDone( KJob *job ); 00133 00134 void slotPrepareItemRetrieval( const Akonadi::Item &item ); 00135 void slotPrepareItemRetrievalResult( KJob* job ); 00136 00137 void changeCommittedResult( KJob* job ); 00138 00139 void slotSessionReconnected() 00140 { 00141 Q_Q( ResourceBase ); 00142 00143 new ResourceSelectJob( q->identifier() ); 00144 } 00145 00146 void createItemSyncInstanceIfMissing() 00147 { 00148 Q_Q( ResourceBase ); 00149 Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::SyncCollection, 00150 "createItemSyncInstance", "Calling items retrieval methods although no item retrieval is in progress" ); 00151 if ( !mItemSyncer ) { 00152 mItemSyncer = new ItemSync( q->currentCollection() ); 00153 mItemSyncer->setTransactionMode( mItemTransactionMode ); 00154 if ( mItemSyncFetchScope ) 00155 mItemSyncer->setFetchScope( *mItemSyncFetchScope ); 00156 mItemSyncer->setProperty( "collection", QVariant::fromValue( q->currentCollection() ) ); 00157 connect( mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), q, SLOT( slotPercent( KJob*, unsigned long ) ) ); 00158 connect( mItemSyncer, SIGNAL( result( KJob* ) ), q, SLOT( slotItemSyncDone( KJob* ) ) ); 00159 } 00160 Q_ASSERT( mItemSyncer ); 00161 } 00162 00163 public Q_SLOTS: 00164 Q_SCRIPTABLE void dump() 00165 { 00166 scheduler->dump(); 00167 } 00168 00169 Q_SCRIPTABLE void clear() 00170 { 00171 scheduler->clear(); 00172 } 00173 00174 public: 00175 // synchronize states 00176 Collection currentCollection; 00177 00178 ResourceScheduler *scheduler; 00179 ItemSync *mItemSyncer; 00180 ItemFetchScope *mItemSyncFetchScope; 00181 ItemSync::TransactionMode mItemTransactionMode; 00182 CollectionSync *mCollectionSyncer; 00183 bool mHierarchicalRid; 00184 QTimer mProgressEmissionCompressor; 00185 int mUnemittedProgress; 00186 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus; 00187 }; 00188 00189 ResourceBase::ResourceBase( const QString & id ) 00190 : AgentBase( new ResourceBasePrivate( this ), id ) 00191 { 00192 Q_D( ResourceBase ); 00193 00194 new Akonadi__ResourceAdaptor( this ); 00195 00196 d->scheduler = new ResourceScheduler( this ); 00197 00198 d->mChangeRecorder->setChangeRecordingEnabled( true ); 00199 connect( d->mChangeRecorder, SIGNAL( changesAdded() ), 00200 d->scheduler, SLOT( scheduleChangeReplay() ) ); 00201 00202 d->mChangeRecorder->setResourceMonitored( d->mId.toLatin1() ); 00203 00204 connect( d->scheduler, SIGNAL( executeFullSync() ), 00205 SLOT( retrieveCollections() ) ); 00206 connect( d->scheduler, SIGNAL( executeCollectionTreeSync() ), 00207 SLOT( retrieveCollections() ) ); 00208 connect( d->scheduler, SIGNAL( executeCollectionSync( const Akonadi::Collection& ) ), 00209 SLOT( slotSynchronizeCollection( const Akonadi::Collection& ) ) ); 00210 connect( d->scheduler, SIGNAL( executeCollectionAttributesSync( const Akonadi::Collection& ) ), 00211 SLOT( slotSynchronizeCollectionAttributes(Akonadi::Collection)) ); 00212 connect( d->scheduler, SIGNAL( executeItemFetch( const Akonadi::Item&, const QSet<QByteArray>& ) ), 00213 SLOT( slotPrepareItemRetrieval(Akonadi::Item)) ); 00214 connect( d->scheduler, SIGNAL( executeResourceCollectionDeletion() ), 00215 SLOT( slotDeleteResourceCollection() ) ); 00216 connect( d->scheduler, SIGNAL( status( int, const QString& ) ), 00217 SIGNAL( status( int, const QString& ) ) ); 00218 connect( d->scheduler, SIGNAL( executeChangeReplay() ), 00219 d->mChangeRecorder, SLOT( replayNext() ) ); 00220 connect( d->scheduler, SIGNAL( fullSyncComplete() ), SIGNAL( synchronized() ) ); 00221 connect( d->mChangeRecorder, SIGNAL( nothingToReplay() ), d->scheduler, SLOT( taskDone() ) ); 00222 connect( d->mChangeRecorder, SIGNAL( collectionRemoved( const Akonadi::Collection& ) ), 00223 d->scheduler, SLOT( collectionRemoved( const Akonadi::Collection& ) ) ); 00224 connect( this, SIGNAL( abortRequested() ), this, SLOT( slotAbortRequested() ) ); 00225 connect( this, SIGNAL( synchronized() ), d->scheduler, SLOT( taskDone() ) ); 00226 connect( this, SIGNAL( agentNameChanged( const QString& ) ), 00227 this, SIGNAL( nameChanged( const QString& ) ) ); 00228 00229 connect( &d->mProgressEmissionCompressor, SIGNAL( timeout() ), 00230 this, SLOT( slotDelayedEmitProgress() ) ); 00231 00232 d->scheduler->setOnline( d->mOnline ); 00233 if ( !d->mChangeRecorder->isEmpty() ) 00234 d->scheduler->scheduleChangeReplay(); 00235 00236 DBusConnectionPool::threadConnection().registerObject( QLatin1String( "/Debug" ), d, QDBusConnection::ExportScriptableSlots ); 00237 00238 new ResourceSelectJob( identifier() ); 00239 00240 connect( d->mChangeRecorder->session(), SIGNAL( reconnected() ), SLOT( slotSessionReconnected() ) ); 00241 } 00242 00243 ResourceBase::~ResourceBase() 00244 { 00245 } 00246 00247 void ResourceBase::synchronize() 00248 { 00249 d_func()->scheduler->scheduleFullSync(); 00250 } 00251 00252 void ResourceBase::setName( const QString &name ) 00253 { 00254 AgentBase::setAgentName( name ); 00255 } 00256 00257 QString ResourceBase::name() const 00258 { 00259 return AgentBase::agentName(); 00260 } 00261 00262 QString ResourceBase::parseArguments( int argc, char **argv ) 00263 { 00264 QString identifier; 00265 if ( argc < 3 ) { 00266 kDebug() << "Not enough arguments passed..."; 00267 exit( 1 ); 00268 } 00269 00270 for ( int i = 1; i < argc - 1; ++i ) { 00271 if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) ) 00272 identifier = QLatin1String( argv[ i + 1 ] ); 00273 } 00274 00275 if ( identifier.isEmpty() ) { 00276 kDebug() << "Identifier argument missing"; 00277 exit( 1 ); 00278 } 00279 00280 QByteArray catalog; 00281 char *p = strrchr( argv[0], '/' ); 00282 if ( p ) 00283 catalog = QByteArray( p + 1 ); 00284 else 00285 catalog = QByteArray( argv[0] ); 00286 00287 KCmdLineArgs::init( argc, argv, identifier.toLatin1(), catalog, 00288 ki18nc( "@title application name", "Akonadi Resource" ), "0.1", 00289 ki18nc( "@title application description", "Akonadi Resource" ) ); 00290 00291 KCmdLineOptions options; 00292 options.add( "identifier <argument>", 00293 ki18nc( "@label commandline option", "Resource identifier" ) ); 00294 KCmdLineArgs::addCmdLineOptions( options ); 00295 00296 return identifier; 00297 } 00298 00299 int ResourceBase::init( ResourceBase *r ) 00300 { 00301 QApplication::setQuitOnLastWindowClosed( false ); 00302 int rv = kapp->exec(); 00303 delete r; 00304 return rv; 00305 } 00306 00307 void ResourceBasePrivate::slotAbortRequested() 00308 { 00309 Q_Q( ResourceBase ); 00310 00311 scheduler->cancelQueues(); 00312 QMetaObject::invokeMethod( q, "abortActivity" ); 00313 } 00314 00315 void ResourceBase::itemRetrieved( const Item &item ) 00316 { 00317 Q_D( ResourceBase ); 00318 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ); 00319 if ( !item.isValid() ) { 00320 d->scheduler->currentTask().sendDBusReplies( false ); 00321 d->scheduler->taskDone(); 00322 return; 00323 } 00324 00325 Item i( item ); 00326 QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts; 00327 foreach ( const QByteArray &part, requestedParts ) { 00328 if ( !item.loadedPayloadParts().contains( part ) ) { 00329 kWarning() << "Item does not provide part" << part; 00330 } 00331 } 00332 00333 ItemModifyJob *job = new ItemModifyJob( i ); 00334 // FIXME: remove once the item with which we call retrieveItem() has a revision number 00335 job->disableRevisionCheck(); 00336 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotDeliveryDone( KJob* ) ) ); 00337 } 00338 00339 void ResourceBasePrivate::slotDeliveryDone(KJob * job) 00340 { 00341 Q_Q( ResourceBase ); 00342 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem ); 00343 if ( job->error() ) { 00344 emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() ); 00345 } 00346 scheduler->currentTask().sendDBusReplies( !job->error() ); 00347 scheduler->taskDone(); 00348 } 00349 00350 void ResourceBase::collectionAttributesRetrieved( const Collection &collection ) 00351 { 00352 Q_D( ResourceBase ); 00353 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes ); 00354 if ( !collection.isValid() ) { 00355 emit attributesSynchronized( d->scheduler->currentTask().collection.id() ); 00356 d->scheduler->taskDone(); 00357 return; 00358 } 00359 00360 CollectionModifyJob *job = new CollectionModifyJob( collection ); 00361 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionAttributesSyncDone( KJob* ) ) ); 00362 } 00363 00364 void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob * job) 00365 { 00366 Q_Q( ResourceBase ); 00367 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes ); 00368 if ( job->error() ) { 00369 emit q->error( QLatin1String( "Error while updating collection: " ) + job->errorString() ); 00370 } 00371 emit q->attributesSynchronized( scheduler->currentTask().collection.id() ); 00372 scheduler->taskDone(); 00373 } 00374 00375 void ResourceBasePrivate::slotDeleteResourceCollection() 00376 { 00377 Q_Q( ResourceBase ); 00378 00379 CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::FirstLevel ); 00380 job->fetchScope().setResource( q->identifier() ); 00381 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotDeleteResourceCollectionDone( KJob* ) ) ); 00382 } 00383 00384 void ResourceBasePrivate::slotDeleteResourceCollectionDone( KJob *job ) 00385 { 00386 Q_Q( ResourceBase ); 00387 if ( job->error() ) { 00388 emit q->error( job->errorString() ); 00389 scheduler->taskDone(); 00390 } else { 00391 const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob*>( job ); 00392 00393 if ( !fetchJob->collections().isEmpty() ) { 00394 CollectionDeleteJob *job = new CollectionDeleteJob( fetchJob->collections().first() ); 00395 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotCollectionDeletionDone( KJob* ) ) ); 00396 } else { 00397 // there is no resource collection, so just ignore the request 00398 scheduler->taskDone(); 00399 } 00400 } 00401 } 00402 00403 void ResourceBasePrivate::slotCollectionDeletionDone( KJob *job ) 00404 { 00405 Q_Q( ResourceBase ); 00406 if ( job->error() ) { 00407 emit q->error( job->errorString() ); 00408 } 00409 00410 scheduler->taskDone(); 00411 } 00412 00413 void ResourceBase::changeCommitted( const Item& item ) 00414 { 00415 Q_D( ResourceBase ); 00416 ItemModifyJob *job = new ItemModifyJob( item ); 00417 job->d_func()->setClean(); 00418 job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error? 00419 job->setIgnorePayload( true ); // we only want to reset the dirty flag and update the remote id 00420 d->changeProcessed(); 00421 } 00422 00423 void ResourceBase::changeCommitted( const Collection &collection ) 00424 { 00425 CollectionModifyJob *job = new CollectionModifyJob( collection ); 00426 connect( job, SIGNAL( result( KJob* ) ), SLOT( changeCommittedResult( KJob* ) ) ); 00427 } 00428 00429 void ResourceBasePrivate::changeCommittedResult( KJob *job ) 00430 { 00431 Q_Q( ResourceBase ); 00432 if ( job->error() ) 00433 emit q->error( i18nc( "@info", "Updating local collection failed: %1.", job->errorText() ) ); 00434 mChangeRecorder->d_ptr->invalidateCache( static_cast<CollectionModifyJob*>( job )->collection() ); 00435 changeProcessed(); 00436 } 00437 00438 bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId, 00439 const QString &mimeType, const QStringList &_parts ) 00440 { 00441 Q_D( ResourceBase ); 00442 if ( !isOnline() ) { 00443 emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) ); 00444 return false; 00445 } 00446 00447 setDelayedReply( true ); 00448 // FIXME: we need at least the revision number too 00449 Item item( uid ); 00450 item.setMimeType( mimeType ); 00451 item.setRemoteId( remoteId ); 00452 00453 QSet<QByteArray> parts; 00454 Q_FOREACH( const QString &str, _parts ) 00455 parts.insert( str.toLatin1() ); 00456 00457 d->scheduler->scheduleItemFetch( item, parts, message().createReply() ); 00458 00459 return true; 00460 } 00461 00462 void ResourceBase::collectionsRetrieved( const Collection::List & collections ) 00463 { 00464 Q_D( ResourceBase ); 00465 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00466 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00467 "ResourceBase::collectionsRetrieved()", 00468 "Calling collectionsRetrieved() although no collection retrieval is in progress" ); 00469 if ( !d->mCollectionSyncer ) { 00470 d->mCollectionSyncer = new CollectionSync( identifier() ); 00471 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00472 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); 00473 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); 00474 } 00475 d->mCollectionSyncer->setRemoteCollections( collections ); 00476 } 00477 00478 void ResourceBase::collectionsRetrievedIncremental( const Collection::List & changedCollections, 00479 const Collection::List & removedCollections ) 00480 { 00481 Q_D( ResourceBase ); 00482 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00483 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00484 "ResourceBase::collectionsRetrievedIncremental()", 00485 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress" ); 00486 if ( !d->mCollectionSyncer ) { 00487 d->mCollectionSyncer = new CollectionSync( identifier() ); 00488 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00489 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); 00490 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); 00491 } 00492 d->mCollectionSyncer->setRemoteCollections( changedCollections, removedCollections ); 00493 } 00494 00495 void ResourceBase::setCollectionStreamingEnabled( bool enable ) 00496 { 00497 Q_D( ResourceBase ); 00498 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00499 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00500 "ResourceBase::setCollectionStreamingEnabled()", 00501 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress" ); 00502 if ( !d->mCollectionSyncer ) { 00503 d->mCollectionSyncer = new CollectionSync( identifier() ); 00504 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00505 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); 00506 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); 00507 } 00508 d->mCollectionSyncer->setStreamingEnabled( enable ); 00509 } 00510 00511 void ResourceBase::collectionsRetrievalDone() 00512 { 00513 Q_D( ResourceBase ); 00514 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00515 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00516 "ResourceBase::collectionsRetrievalDone()", 00517 "Calling collectionsRetrievalDone() although no collection retrieval is in progress" ); 00518 // streaming enabled, so finalize the sync 00519 if ( d->mCollectionSyncer ) { 00520 d->mCollectionSyncer->retrievalDone(); 00521 } 00522 // user did the sync himself, we are done now 00523 else { 00524 // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here! 00525 d->scheduler->taskDone(); 00526 } 00527 } 00528 00529 void ResourceBasePrivate::slotCollectionSyncDone( KJob * job ) 00530 { 00531 Q_Q( ResourceBase ); 00532 mCollectionSyncer = 0; 00533 if ( job->error() ) { 00534 if ( job->error() != Job::UserCanceled ) 00535 emit q->error( job->errorString() ); 00536 } else { 00537 if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) { 00538 CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive ); 00539 list->setFetchScope( q->changeRecorder()->collectionFetchScope() ); 00540 list->fetchScope().setResource( mId ); 00541 q->connect( list, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalListDone( KJob* ) ) ); 00542 return; 00543 } 00544 } 00545 scheduler->taskDone(); 00546 } 00547 00548 void ResourceBasePrivate::slotLocalListDone( KJob * job ) 00549 { 00550 Q_Q( ResourceBase ); 00551 if ( job->error() ) { 00552 emit q->error( job->errorString() ); 00553 } else { 00554 Collection::List cols = static_cast<CollectionFetchJob*>( job )->collections(); 00555 foreach ( const Collection &col, cols ) { 00556 scheduler->scheduleSync( col ); 00557 } 00558 scheduler->scheduleFullSyncCompletion(); 00559 } 00560 scheduler->taskDone(); 00561 } 00562 00563 void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col ) 00564 { 00565 Q_Q( ResourceBase ); 00566 currentCollection = col; 00567 // check if this collection actually can contain anything 00568 QStringList contentTypes = currentCollection.contentMimeTypes(); 00569 contentTypes.removeAll( Collection::mimeType() ); 00570 if ( !contentTypes.isEmpty() || (col.rights() & (Collection::CanLinkItem)) ) { // HACK to check for virtual collections 00571 emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing collection '%1'", currentCollection.name() ) ); 00572 q->retrieveItems( currentCollection ); 00573 return; 00574 } 00575 scheduler->taskDone(); 00576 } 00577 00578 void ResourceBasePrivate::slotSynchronizeCollectionAttributes( const Collection &col ) 00579 { 00580 Q_Q( ResourceBase ); 00581 QMetaObject::invokeMethod( q, "retrieveCollectionAttributes", Q_ARG( Akonadi::Collection, col ) ); 00582 } 00583 00584 void ResourceBasePrivate::slotPrepareItemRetrieval( const Akonadi::Item &item ) 00585 { 00586 Q_Q( ResourceBase ); 00587 ItemFetchJob *fetch = new ItemFetchJob( item, this ); 00588 fetch->fetchScope().setAncestorRetrieval( q->changeRecorder()->itemFetchScope().ancestorRetrieval() ); 00589 fetch->fetchScope().setCacheOnly( true ); 00590 00591 // copy list of attributes to fetch 00592 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes(); 00593 foreach ( const QByteArray &attribute, attributes ) 00594 fetch->fetchScope().fetchAttribute( attribute ); 00595 00596 q->connect( fetch, SIGNAL( result( KJob* ) ), SLOT( slotPrepareItemRetrievalResult( KJob* ) ) ); 00597 } 00598 00599 void ResourceBasePrivate::slotPrepareItemRetrievalResult( KJob* job ) 00600 { 00601 Q_Q( ResourceBase ); 00602 Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::FetchItem, 00603 "ResourceBasePrivate::slotPrepareItemRetrievalResult()", 00604 "Preparing item retrieval although no item retrieval is in progress" ); 00605 if ( job->error() ) { 00606 q->cancelTask( job->errorText() ); 00607 return; 00608 } 00609 ItemFetchJob *fetch = qobject_cast<ItemFetchJob*>( job ); 00610 if ( fetch->items().count() != 1 ) { 00611 q->cancelTask( i18n( "The requested item no longer exists" ) ); 00612 return; 00613 } 00614 const Item item = fetch->items().first(); 00615 const QSet<QByteArray> parts = scheduler->currentTask().itemParts; 00616 if ( !q->retrieveItem( item, parts ) ) 00617 q->cancelTask(); 00618 } 00619 00620 void ResourceBase::itemsRetrievalDone() 00621 { 00622 Q_D( ResourceBase ); 00623 // streaming enabled, so finalize the sync 00624 if ( d->mItemSyncer ) { 00625 d->mItemSyncer->deliveryDone(); 00626 } 00627 // user did the sync himself, we are done now 00628 else { 00629 d->scheduler->taskDone(); 00630 } 00631 } 00632 00633 void ResourceBase::clearCache() 00634 { 00635 Q_D( ResourceBase ); 00636 d->scheduler->scheduleResourceCollectionDeletion(); 00637 } 00638 00639 Collection ResourceBase::currentCollection() const 00640 { 00641 Q_D( const ResourceBase ); 00642 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection , 00643 "ResourceBase::currentCollection()", 00644 "Trying to access current collection although no item retrieval is in progress" ); 00645 return d->currentCollection; 00646 } 00647 00648 Item ResourceBase::currentItem() const 00649 { 00650 Q_D( const ResourceBase ); 00651 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem , 00652 "ResourceBase::currentItem()", 00653 "Trying to access current item although no item retrieval is in progress" ); 00654 return d->scheduler->currentTask().item; 00655 } 00656 00657 void ResourceBase::synchronizeCollectionTree() 00658 { 00659 d_func()->scheduler->scheduleCollectionTreeSync(); 00660 } 00661 00662 void ResourceBase::cancelTask() 00663 { 00664 Q_D( ResourceBase ); 00665 switch ( d->scheduler->currentTask().type ) { 00666 case ResourceScheduler::FetchItem: 00667 itemRetrieved( Item() ); // sends the error reply and 00668 break; 00669 case ResourceScheduler::ChangeReplay: 00670 d->changeProcessed(); 00671 break; 00672 case ResourceScheduler::SyncCollectionTree: 00673 case ResourceScheduler::SyncAll: 00674 if ( d->mCollectionSyncer ) 00675 d->mCollectionSyncer->rollback(); 00676 else 00677 d->scheduler->taskDone(); 00678 break; 00679 case ResourceScheduler::SyncCollection: 00680 if ( d->mItemSyncer ) 00681 d->mItemSyncer->rollback(); 00682 else 00683 d->scheduler->taskDone(); 00684 break; 00685 default: 00686 d->scheduler->taskDone(); 00687 } 00688 } 00689 00690 void ResourceBase::cancelTask( const QString &msg ) 00691 { 00692 cancelTask(); 00693 00694 emit error( msg ); 00695 } 00696 00697 void ResourceBase::deferTask() 00698 { 00699 Q_D( ResourceBase ); 00700 d->scheduler->deferTask(); 00701 } 00702 00703 void ResourceBase::doSetOnline( bool state ) 00704 { 00705 d_func()->scheduler->setOnline( state ); 00706 } 00707 00708 void ResourceBase::synchronizeCollection( qint64 collectionId ) 00709 { 00710 synchronizeCollection( collectionId, false ); 00711 } 00712 00713 void ResourceBase::synchronizeCollection( qint64 collectionId, bool recursive ) 00714 { 00715 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base ); 00716 job->setFetchScope( changeRecorder()->collectionFetchScope() ); 00717 job->fetchScope().setResource( identifier() ); 00718 job->setProperty( "recursive", recursive ); 00719 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListDone( KJob* ) ) ); 00720 } 00721 00722 void ResourceBasePrivate::slotCollectionListDone( KJob *job ) 00723 { 00724 if ( !job->error() ) { 00725 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections(); 00726 if ( !list.isEmpty() ) { 00727 if ( job->property( "recursive" ).toBool() ) { 00728 Q_FOREACH ( const Collection &collection, list ) { 00729 scheduler->scheduleSync( collection ); 00730 } 00731 } else { 00732 scheduler->scheduleSync( list.first() ); 00733 } 00734 } 00735 } 00736 // TODO: error handling 00737 } 00738 00739 void ResourceBase::synchronizeCollectionAttributes( qint64 collectionId ) 00740 { 00741 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), CollectionFetchJob::Base ); 00742 job->setFetchScope( changeRecorder()->collectionFetchScope() ); 00743 job->fetchScope().setResource( identifier() ); 00744 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListForAttributesDone( KJob* ) ) ); 00745 } 00746 00747 void ResourceBasePrivate::slotCollectionListForAttributesDone( KJob *job ) 00748 { 00749 if ( !job->error() ) { 00750 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections(); 00751 if ( !list.isEmpty() ) { 00752 Collection col = list.first(); 00753 scheduler->scheduleAttributesSync( col ); 00754 } 00755 } 00756 // TODO: error handling 00757 } 00758 00759 void ResourceBase::setTotalItems( int amount ) 00760 { 00761 kDebug() << amount; 00762 Q_D( ResourceBase ); 00763 setItemStreamingEnabled( true ); 00764 d->mItemSyncer->setTotalItems( amount ); 00765 } 00766 00767 void ResourceBase::setItemStreamingEnabled( bool enable ) 00768 { 00769 Q_D( ResourceBase ); 00770 d->createItemSyncInstanceIfMissing(); 00771 d->mItemSyncer->setStreamingEnabled( enable ); 00772 } 00773 00774 void ResourceBase::itemsRetrieved( const Item::List &items ) 00775 { 00776 Q_D( ResourceBase ); 00777 d->createItemSyncInstanceIfMissing(); 00778 d->mItemSyncer->setFullSyncItems( items ); 00779 } 00780 00781 void ResourceBase::itemsRetrievedIncremental( const Item::List &changedItems, const Item::List &removedItems ) 00782 { 00783 Q_D( ResourceBase ); 00784 d->createItemSyncInstanceIfMissing(); 00785 d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems ); 00786 } 00787 00788 void ResourceBasePrivate::slotItemSyncDone( KJob *job ) 00789 { 00790 mItemSyncer = 0; 00791 Q_Q( ResourceBase ); 00792 if ( job->error() && job->error() != Job::UserCanceled ) { 00793 emit q->error( job->errorString() ); 00794 } 00795 scheduler->taskDone(); 00796 } 00797 00798 00799 void ResourceBasePrivate::slotDelayedEmitProgress() 00800 { 00801 Q_Q( ResourceBase ); 00802 emit q->percent( mUnemittedProgress ); 00803 00804 Q_FOREACH( const QVariantMap &statusMap, mUnemittedAdvancedStatus ) { 00805 emit q->advancedStatus( statusMap ); 00806 } 00807 mUnemittedProgress = 0; 00808 mUnemittedAdvancedStatus.clear(); 00809 } 00810 00811 void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent ) 00812 { 00813 Q_Q( ResourceBase ); 00814 00815 mUnemittedProgress = percent; 00816 00817 const Collection collection = job->property( "collection" ).value<Collection>(); 00818 if ( collection.isValid() ) { 00819 QVariantMap statusMap; 00820 statusMap.insert( QLatin1String( "key" ), QString::fromLatin1( "collectionSyncProgress" ) ); 00821 statusMap.insert( QLatin1String( "collectionId" ), collection.id() ); 00822 statusMap.insert( QLatin1String( "percent" ), static_cast<unsigned int>( percent ) ); 00823 00824 mUnemittedAdvancedStatus[collection.id()] = statusMap; 00825 } 00826 // deliver completion right away, intermediate progress at 1s intervals 00827 if ( percent == 100 ) { 00828 mProgressEmissionCompressor.stop(); 00829 slotDelayedEmitProgress(); 00830 } else if ( !mProgressEmissionCompressor.isActive() ) { 00831 mProgressEmissionCompressor.start(); 00832 } 00833 } 00834 00835 void ResourceBase::setHierarchicalRemoteIdentifiersEnabled( bool enable ) 00836 { 00837 Q_D( ResourceBase ); 00838 d->mHierarchicalRid = enable; 00839 } 00840 00841 void ResourceBase::scheduleCustomTask( QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority ) 00842 { 00843 Q_D( ResourceBase ); 00844 d->scheduler->scheduleCustomTask( receiver, method, argument, priority ); 00845 } 00846 00847 void ResourceBase::taskDone() 00848 { 00849 Q_D( ResourceBase ); 00850 d->scheduler->taskDone(); 00851 } 00852 00853 void ResourceBase::retrieveCollectionAttributes( const Collection &collection ) 00854 { 00855 collectionAttributesRetrieved( collection ); 00856 } 00857 00858 void Akonadi::ResourceBase::abortActivity() 00859 { 00860 00861 } 00862 00863 void ResourceBase::setItemTransactionMode(ItemSync::TransactionMode mode) 00864 { 00865 Q_D( ResourceBase ); 00866 d->mItemTransactionMode = mode; 00867 } 00868 00869 void ResourceBase::setItemSynchronizationFetchScope(const ItemFetchScope& fetchScope) 00870 { 00871 Q_D( ResourceBase ); 00872 if ( !d->mItemSyncFetchScope ) 00873 d->mItemSyncFetchScope = new ItemFetchScope; 00874 *(d->mItemSyncFetchScope) = fetchScope; 00875 } 00876 00877 #include "resourcebase.moc" 00878 #include "moc_resourcebase.cpp"