akonadi
itemsync.cpp
00001 /* 00002 Copyright (c) 2007 Tobias Koenig <tokoe@kde.org> 00003 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00004 00005 This library is free software; you can redistribute it and/or modify it 00006 under the terms of the GNU Library General Public License as published by 00007 the Free Software Foundation; either version 2 of the License, or (at your 00008 option) any later version. 00009 00010 This library is distributed in the hope that it will be useful, but WITHOUT 00011 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00012 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00013 License for more details. 00014 00015 You should have received a copy of the GNU Library General Public License 00016 along with this library; see the file COPYING.LIB. If not, write to the 00017 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00018 02110-1301, USA. 00019 */ 00020 00021 #include "itemsync.h" 00022 00023 #include "collection.h" 00024 #include "item.h" 00025 #include "itemcreatejob.h" 00026 #include "itemdeletejob.h" 00027 #include "itemfetchjob.h" 00028 #include "itemmodifyjob.h" 00029 #include "transactionsequence.h" 00030 #include "itemfetchscope.h" 00031 00032 #include <kdebug.h> 00033 00034 #include <QtCore/QStringList> 00035 00036 using namespace Akonadi; 00037 00041 class ItemSync::Private 00042 { 00043 public: 00044 Private( ItemSync *parent ) : 00045 q( parent ), 00046 mTransactionMode( SingleTransaction ), 00047 mCurrentTransaction( 0 ), 00048 mTransactionJobs( 0 ), 00049 mPendingJobs( 0 ), 00050 mProgress( 0 ), 00051 mTotalItems( -1 ), 00052 mTotalItemsProcessed( 0 ), 00053 mStreaming( false ), 00054 mIncremental( false ), 00055 mLocalListDone( false ), 00056 mDeliveryDone( false ), 00057 mFinished( false ) 00058 { 00059 // we want to fetch all data by default 00060 mFetchScope.fetchFullPayload(); 00061 mFetchScope.fetchAllAttributes(); 00062 } 00063 00064 void createLocalItem( const Item &item ); 00065 void checkDone(); 00066 void slotLocalListDone( KJob* ); 00067 void slotLocalDeleteDone( KJob* ); 00068 void slotLocalChangeDone( KJob* ); 00069 void execute(); 00070 void processItems(); 00071 void deleteItems( const Item::List &items ); 00072 void slotTransactionResult( KJob *job ); 00073 Job* subjobParent() const; 00074 00075 ItemSync *q; 00076 Collection mSyncCollection; 00077 QHash<Item::Id, Akonadi::Item> mLocalItemsById; 00078 QHash<QString, Akonadi::Item> mLocalItemsByRemoteId; 00079 QSet<Akonadi::Item> mUnprocessedLocalItems; 00080 00081 ItemSync::TransactionMode mTransactionMode; 00082 TransactionSequence *mCurrentTransaction; 00083 int mTransactionJobs; 00084 00085 // fetch scope for initial item listing 00086 ItemFetchScope mFetchScope; 00087 00088 // remote items 00089 Akonadi::Item::List mRemoteItems; 00090 00091 // removed remote items 00092 Item::List mRemovedRemoteItems; 00093 00094 // create counter 00095 int mPendingJobs; 00096 int mProgress; 00097 int mTotalItems; 00098 int mTotalItemsProcessed; 00099 00100 bool mStreaming; 00101 bool mIncremental; 00102 bool mLocalListDone; 00103 bool mDeliveryDone; 00104 bool mFinished; 00105 }; 00106 00107 void ItemSync::Private::createLocalItem( const Item & item ) 00108 { 00109 // don't try to do anything in error state 00110 if ( q->error() ) 00111 return; 00112 mPendingJobs++; 00113 ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() ); 00114 q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) ); 00115 } 00116 00117 void ItemSync::Private::checkDone() 00118 { 00119 q->setProcessedAmount( KJob::Bytes, mProgress ); 00120 if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 ) 00121 return; 00122 00123 if ( !mFinished ) { // prevent double result emission, can happen since checkDone() is called from all over the place 00124 mFinished = true; 00125 q->emitResult(); 00126 } 00127 } 00128 00129 ItemSync::ItemSync( const Collection &collection, QObject *parent ) : 00130 Job( parent ), 00131 d( new Private( this ) ) 00132 { 00133 d->mSyncCollection = collection; 00134 } 00135 00136 ItemSync::~ItemSync() 00137 { 00138 delete d; 00139 } 00140 00141 void ItemSync::setFullSyncItems( const Item::List &items ) 00142 { 00143 Q_ASSERT( !d->mIncremental ); 00144 if ( !d->mStreaming ) 00145 d->mDeliveryDone = true; 00146 d->mRemoteItems += items; 00147 d->mTotalItemsProcessed += items.count(); 00148 kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems; 00149 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed ); 00150 if ( d->mTotalItemsProcessed == d->mTotalItems ) 00151 d->mDeliveryDone = true; 00152 d->execute(); 00153 } 00154 00155 void ItemSync::setTotalItems( int amount ) 00156 { 00157 Q_ASSERT( !d->mIncremental ); 00158 Q_ASSERT( amount >= 0 ); 00159 setStreamingEnabled( true ); 00160 kDebug() << amount; 00161 d->mTotalItems = amount; 00162 setTotalAmount( KJob::Bytes, amount ); 00163 if ( d->mTotalItems == 0 ) { 00164 d->mDeliveryDone = true; 00165 d->execute(); 00166 } 00167 } 00168 00169 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems ) 00170 { 00171 d->mIncremental = true; 00172 if ( !d->mStreaming ) 00173 d->mDeliveryDone = true; 00174 d->mRemoteItems += changedItems; 00175 d->mRemovedRemoteItems += removedItems; 00176 d->mTotalItemsProcessed += changedItems.count() + removedItems.count(); 00177 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed ); 00178 if ( d->mTotalItemsProcessed == d->mTotalItems ) 00179 d->mDeliveryDone = true; 00180 d->execute(); 00181 } 00182 00183 void ItemSync::setFetchScope( ItemFetchScope &fetchScope ) 00184 { 00185 d->mFetchScope = fetchScope; 00186 } 00187 00188 ItemFetchScope &ItemSync::fetchScope() 00189 { 00190 return d->mFetchScope; 00191 } 00192 00193 void ItemSync::doStart() 00194 { 00195 ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this ); 00196 job->setFetchScope( d->mFetchScope ); 00197 00198 // we only can fetch parts already in the cache, otherwise this will deadlock 00199 job->fetchScope().setCacheOnly( true ); 00200 00201 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) ); 00202 } 00203 00204 bool ItemSync::updateItem( const Item &storedItem, Item &newItem ) 00205 { 00206 // we are in error state, better not change anything at all anymore 00207 if ( error() ) 00208 return false; 00209 00210 /* 00211 * We know that this item has changed (as it is part of the 00212 * incremental changed list), so we just put it into the 00213 * storage. 00214 */ 00215 if ( d->mIncremental ) 00216 return true; 00217 00218 // Check whether the flags differ 00219 if ( storedItem.flags() != newItem.flags() ) { 00220 kDebug() << "Stored flags " << storedItem.flags() 00221 << "new flags " << newItem.flags(); 00222 return true; 00223 } 00224 00225 // Check whether the new item contains unknown parts 00226 QSet<QByteArray> missingParts = newItem.loadedPayloadParts(); 00227 missingParts.subtract( storedItem.loadedPayloadParts() ); 00228 if ( !missingParts.isEmpty() ) 00229 return true; 00230 00231 // ### FIXME SLOW!!! 00232 // If the available part identifiers don't differ, check 00233 // whether the content of the payload differs 00234 if ( newItem.hasPayload() 00235 && storedItem.payloadData() != newItem.payloadData() ) 00236 return true; 00237 00238 // check if remote attributes have been changed 00239 foreach ( Attribute* attr, newItem.attributes() ) { 00240 if ( !storedItem.hasAttribute( attr->type() ) ) 00241 return true; 00242 if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() ) 00243 return true; 00244 } 00245 00246 return false; 00247 } 00248 00249 void ItemSync::Private::slotLocalListDone( KJob * job ) 00250 { 00251 if ( !job->error() ) { 00252 const Item::List list = static_cast<ItemFetchJob*>( job )->items(); 00253 foreach ( const Item &item, list ) { 00254 if ( item.remoteId().isEmpty() ) 00255 continue; 00256 mLocalItemsById.insert( item.id(), item ); 00257 mLocalItemsByRemoteId.insert( item.remoteId(), item ); 00258 mUnprocessedLocalItems.insert( item ); 00259 } 00260 } 00261 00262 mLocalListDone = true; 00263 execute(); 00264 } 00265 00266 void ItemSync::Private::execute() 00267 { 00268 if ( !mLocalListDone ) 00269 return; 00270 00271 // early exit to avoid unnecessary TransactionSequence creation in MultipleTransactions mode 00272 // TODO: do the transaction handling in a nicer way instead, only creating TransactionSequences when really needed 00273 if ( !mDeliveryDone && mRemoteItems.isEmpty() ) 00274 return; 00275 00276 if ( (mTransactionMode == SingleTransaction && !mCurrentTransaction) || mTransactionMode == MultipleTransactions) { 00277 ++mTransactionJobs; 00278 mCurrentTransaction = new TransactionSequence( q ); 00279 mCurrentTransaction->setAutomaticCommittingEnabled( false ); 00280 connect( mCurrentTransaction, SIGNAL( result( KJob* ) ), q, SLOT( slotTransactionResult( KJob* ) ) ); 00281 } 00282 00283 processItems(); 00284 if ( !mDeliveryDone ) { 00285 if ( mTransactionMode == MultipleTransactions && mCurrentTransaction ) { 00286 mCurrentTransaction->commit(); 00287 mCurrentTransaction = 0; 00288 } 00289 return; 00290 } 00291 00292 // removed 00293 if ( !mIncremental ) { 00294 mRemovedRemoteItems = mUnprocessedLocalItems.toList(); 00295 mUnprocessedLocalItems.clear(); 00296 } 00297 00298 deleteItems( mRemovedRemoteItems ); 00299 mLocalItemsById.clear(); 00300 mLocalItemsByRemoteId.clear(); 00301 mRemovedRemoteItems.clear(); 00302 00303 if ( mCurrentTransaction ) { 00304 mCurrentTransaction->commit(); 00305 mCurrentTransaction = 0; 00306 } 00307 00308 checkDone(); 00309 } 00310 00311 void ItemSync::Private::processItems() 00312 { 00313 // added / updated 00314 foreach ( Item remoteItem, mRemoteItems ) { //krazy:exclude=foreach non-const is needed here 00315 #ifndef NDEBUG 00316 if ( remoteItem.remoteId().isEmpty() ) { 00317 kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier"; 00318 } 00319 #endif 00320 00321 Item localItem = mLocalItemsById.value( remoteItem.id() ); 00322 if ( !localItem.isValid() ) 00323 localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() ); 00324 mUnprocessedLocalItems.remove( localItem ); 00325 // missing locally 00326 if ( !localItem.isValid() ) { 00327 createLocalItem( remoteItem ); 00328 continue; 00329 } 00330 00331 if ( q->updateItem( localItem, remoteItem ) ) { 00332 mPendingJobs++; 00333 00334 remoteItem.setId( localItem.id() ); 00335 remoteItem.setRevision( localItem.revision() ); 00336 remoteItem.setSize( localItem.size() ); 00337 remoteItem.setRemoteId( localItem.remoteId() ); // in case someone clears remoteId by accident 00338 ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() ); 00339 mod->disableRevisionCheck(); 00340 q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) ); 00341 } else { 00342 mProgress++; 00343 } 00344 } 00345 mRemoteItems.clear(); 00346 } 00347 00348 void ItemSync::Private::deleteItems( const Item::List &items ) 00349 { 00350 // if in error state, better not change anything anymore 00351 if ( q->error() ) 00352 return; 00353 00354 Item::List itemsToDelete; 00355 foreach ( const Item &item, items ) { 00356 Item delItem( item ); 00357 if ( !item.isValid() ) { 00358 delItem = mLocalItemsByRemoteId.value( item.remoteId() ); 00359 } 00360 00361 if ( !delItem.isValid() ) { 00362 #ifndef NDEBUG 00363 kWarning() << "Delete item (remoteeId=" << item.remoteId() 00364 << "mimeType=" << item.mimeType() 00365 << ") does not have a valid UID and no item with that remote ID exists either"; 00366 #endif 00367 continue; 00368 } 00369 00370 if ( delItem.remoteId().isEmpty() ) { 00371 // don't attempt to remove items that never were written to the backend 00372 continue; 00373 } 00374 00375 itemsToDelete.append ( delItem ); 00376 } 00377 00378 if ( !itemsToDelete.isEmpty() ) { 00379 mPendingJobs++; 00380 ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() ); 00381 q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalDeleteDone( KJob* ) ) ); 00382 00383 // It can happen that the groupware servers report us deleted items 00384 // twice, in this case this item delete job will fail on the second try. 00385 // To avoid a rollback of the complete transaction we gracefully allow the job 00386 // to fail :) 00387 TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() ); 00388 if ( transaction ) 00389 transaction->setIgnoreJobFailure( job ); 00390 } 00391 } 00392 00393 void ItemSync::Private::slotLocalDeleteDone( KJob* ) 00394 { 00395 mPendingJobs--; 00396 mProgress++; 00397 00398 checkDone(); 00399 } 00400 00401 void ItemSync::Private::slotLocalChangeDone( KJob * job ) 00402 { 00403 Q_UNUSED( job ); 00404 mPendingJobs--; 00405 mProgress++; 00406 00407 checkDone(); 00408 } 00409 00410 void ItemSync::Private::slotTransactionResult( KJob *job ) 00411 { 00412 --mTransactionJobs; 00413 if ( mCurrentTransaction == job ) 00414 mCurrentTransaction = 0; 00415 00416 checkDone(); 00417 } 00418 00419 Job * ItemSync::Private::subjobParent() const 00420 { 00421 if ( mCurrentTransaction && mTransactionMode != NoTransaction ) 00422 return mCurrentTransaction; 00423 return q; 00424 } 00425 00426 void ItemSync::setStreamingEnabled(bool enable) 00427 { 00428 d->mStreaming = enable; 00429 } 00430 00431 void ItemSync::deliveryDone() 00432 { 00433 Q_ASSERT( d->mStreaming ); 00434 d->mDeliveryDone = true; 00435 d->execute(); 00436 } 00437 00438 void ItemSync::slotResult(KJob* job) 00439 { 00440 if ( job->error() ) { 00441 // pretent there were no errors 00442 Akonadi::Job::removeSubjob( job ); 00443 // propagate the first error we got but continue, we might still be fed with stuff from a resource 00444 if ( !error() ) { 00445 setError( job->error() ); 00446 setErrorText( job->errorText() ); 00447 } 00448 } else { 00449 Akonadi::Job::slotResult( job ); 00450 } 00451 } 00452 00453 void ItemSync::rollback() 00454 { 00455 setError( UserCanceled ); 00456 if ( d->mCurrentTransaction ) 00457 d->mCurrentTransaction->rollback(); 00458 d->mDeliveryDone = true; // user wont deliver more data 00459 d->execute(); // end this in an ordered way, since we have an error set no real change will be done 00460 } 00461 00462 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode) 00463 { 00464 d->mTransactionMode = mode; 00465 } 00466 00467 00468 #include "itemsync.moc"