Skip to content

Commit

Permalink
Add back old pull revisions
Browse files Browse the repository at this point in the history
  • Loading branch information
cbaker6 committed Apr 20, 2023
1 parent 70d0ffc commit 0dd0311
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 45 deletions.
5 changes: 2 additions & 3 deletions Sources/ParseCareKit/Models/PCKRevisionRecord.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,14 @@ struct PCKRevisionRecord: ParseObject, Equatable, Codable {
knowledgeVector: knowledgeVector)
}

func save(options: API.Options = []) async throws -> Self {
let saved = try await self.create(options: options)
func save(options: API.Options = []) async throws {
// let saved = try await self.create(options: options)
try await patients.createAll(options: options)
try await carePlans.createAll(options: options)
try await contacts.createAll(options: options)
try await tasks.createAll(options: options)
try await healthKitTasks.createAll(options: options)
try await outcomes.createAll(options: options)
return saved
}

func fetchEntities(options: API.Options = []) async throws -> Self {
Expand Down
162 changes: 120 additions & 42 deletions Sources/ParseCareKit/ParseRemote.swift
Original file line number Diff line number Diff line change
Expand Up @@ -328,41 +328,123 @@ public class ParseRemote: OCKRemoteSynchronizable {
}
let localClock = knowledgeVector.clock(for: self.uuid)
ParseRemote.queue.async {
Task {
let query = PCKRevisionRecord.query(ObjectableKey.logicalClock > localClock,
ObjectableKey.clockUUID == self.uuid)
.order([.ascending(ObjectableKey.logicalClock)])
do {
let revisions = try await query.find()
self.notifyRevisionProgress(0,
totalRecords: revisions.count)
for (index, revision) in revisions.enumerated() {
let record = try await revision.fetchEntities().convertToCareKit()
mergeRevision(record)
self.notifyRevisionProgress(index + 1,
totalRecords: revisions.count)
}
self.pullRevisionsForConcreteClasses(previousError: nil,
localClock: localClock,
cloudVector: cloudVector,
mergeRevision: mergeRevision) { previosError in

// Catch up
let revision = OCKRevisionRecord(entities: [],
knowledgeVector: cloudVector)
mergeRevision(revision)
if #available(iOS 14.0, watchOS 7.0, *) {
Logger.pullRevisions.debug("Finished pulling revisions for default classes")
} else {
os_log("Finished pulling revisions for default classes", log: .pullRevisions, type: .debug)
}
completion(nil)
} catch {
await self.remoteStatus.notSynchronzing()
completion(error)
}
self.pullRevisionsForCustomClasses(previousError: previosError,
localClock: localClock,
cloudVector: cloudVector,
mergeRevision: mergeRevision,
completion: completion)
}
}
}
}
}

func pullRevisionsForConcreteClasses(concreteClassesAlreadyPulled: Int=0,
previousError: Error?,
localClock: Int, cloudVector: OCKRevisionRecord.KnowledgeVector,
mergeRevision: @escaping (OCKRevisionRecord) -> Void,
completion: @escaping (Error?) -> Void) {

let classNames = PCKStoreClass.patient.orderedArray()
self.notifyRevisionProgress(concreteClassesAlreadyPulled,
total: classNames.count)

guard concreteClassesAlreadyPulled < classNames.count,
let concreteClass = self.pckStoreClassesToSynchronize[classNames[concreteClassesAlreadyPulled]] else {
if #available(iOS 14.0, watchOS 7.0, *) {
Logger.pullRevisions.debug("Finished pulling revisions for default classes")
} else {
os_log("Finished pulling revisions for default classes", log: .pullRevisions, type: .debug)
}
completion(previousError)
return
}

concreteClass.pullRevisions(since: localClock,
cloudClock: cloudVector,
remoteID: self.uuid.uuidString) { result in

var currentError = previousError

switch result {

case .success(let customRevision):
mergeRevision(customRevision)

case .failure(let error):
currentError = error
if #available(iOS 14.0, watchOS 7.0, *) {
Logger.pullRevisions.error("pullRevisionsForConcreteClasses: \(currentError!.localizedDescription, privacy: .private)")
} else {
os_log("pullRevisionsForConcreteClasses: %{private}@",
log: .pullRevisions, type: .error, currentError!.localizedDescription)
}
}

self.pullRevisionsForConcreteClasses(concreteClassesAlreadyPulled: concreteClassesAlreadyPulled+1,
previousError: currentError, localClock: localClock,
cloudVector: cloudVector, mergeRevision: mergeRevision,
completion: completion)
}
}

func pullRevisionsForCustomClasses(customClassesAlreadyPulled: Int=0, previousError: Error?,
localClock: Int, cloudVector: OCKRevisionRecord.KnowledgeVector,
mergeRevision: @escaping (OCKRevisionRecord) -> Void,
completion: @escaping (Error?) -> Void) {

if let customClassesToSynchronize = self.customClassesToSynchronize {
let classNames = customClassesToSynchronize.keys.sorted()
self.notifyRevisionProgress(customClassesAlreadyPulled,
total: classNames.count)

guard customClassesAlreadyPulled < classNames.count,
let customClass = customClassesToSynchronize[classNames[customClassesAlreadyPulled]] else {
if #available(iOS 14.0, watchOS 7.0, *) {
Logger.pullRevisions.debug("Finished pulling custom revision classes")
} else {
// Fallback on earlier versions
os_log("Finished pulling custom revision classes", log: .pullRevisions, type: .debug)
}
completion(previousError)
return
}

customClass.pullRevisions(since: localClock,
cloudClock: cloudVector,
remoteID: self.uuid.uuidString) { result in
var currentError = previousError

switch result {

case .success(let customRevision):
mergeRevision(customRevision)

case .failure(let error):
currentError = error
if #available(iOS 14.0, watchOS 7.0, *) {
Logger.pullRevisions.error("pullRevisionsForConcreteClasses: \(currentError!.localizedDescription, privacy: .private)")
} else {
os_log("pullRevisionsForConcreteClasses: %{private}@",
log: .pullRevisions, type: .error, currentError!.localizedDescription)
}
}

self.pullRevisionsForCustomClasses(customClassesAlreadyPulled: customClassesAlreadyPulled+1,
previousError: previousError, localClock: localClock,
cloudVector: cloudVector, mergeRevision: mergeRevision,
completion: completion)
}
} else {
completion(previousError)
}
}

public func pushRevisions(deviceRevisions: [CareKitStore.OCKRevisionRecord],
deviceKnowledge: CareKitStore.OCKRevisionRecord.KnowledgeVector,
completion: @escaping (Error?) -> Void) {
Expand Down Expand Up @@ -411,21 +493,17 @@ public class ParseRemote: OCKRemoteSynchronizable {
updatedDeviceKnowledge.increment(clockFor: self.uuid)
let logicalClock = updatedDeviceKnowledge.clock(for: self.uuid)
self.notifyRevisionProgress(0,
totalRecords: deviceRevisions.count)
total: deviceRevisions.count)

for (index, deviceRevision) in deviceRevisions.enumerated() {
do {
var revisionVector = deviceRevision.knowledgeVector
revisionVector.merge(with: deviceKnowledge)
let remoteRevision = OCKRevisionRecord(entities: deviceRevision.entities,
knowledgeVector: revisionVector)
let revision = try PCKRevisionRecord(record: remoteRevision,
remoteClockUUID: self.uuid,
remoteClock: parseClock,
remoteClockValue: logicalClock)
_ = try await revision.save()
let remoteRevision = try PCKRevisionRecord(record: deviceRevision,
remoteClockUUID: self.uuid,
remoteClock: parseClock,
remoteClockValue: logicalClock)
try await remoteRevision.save()
self.notifyRevisionProgress(index + 1,
totalRecords: deviceRevisions.count)
total: deviceRevisions.count)
if index == (deviceRevisions.count - 1) {
self.completePushRevisions(parseClock: parseClock,
cloudVector: cloudVector,
Expand Down Expand Up @@ -491,9 +569,9 @@ public class ParseRemote: OCKRemoteSynchronizable {
}
}

func notifyRevisionProgress(_ numberCompleted: Int, totalRecords: Int) {
if totalRecords > 0 {
let ratioComplete = Double(numberCompleted)/Double(totalRecords)
func notifyRevisionProgress(_ numberCompleted: Int, total: Int) {
if total > 0 {
let ratioComplete = Double(numberCompleted)/Double(total)
DispatchQueue.main.async {
self.parseDelegate?.remote(self, didUpdateProgress: ratioComplete)
}
Expand Down

0 comments on commit 0dd0311

Please sign in to comment.