Skip to content

Commit

Permalink
Merge pull request #5 from neXenio/bugfix/003-offset-ignored
Browse files Browse the repository at this point in the history
Client specific value providers
  • Loading branch information
Steppschuh authored Jun 15, 2020
2 parents 146e218 + 6066014 commit 0df513b
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 10 deletions.
30 changes: 30 additions & 0 deletions .idea/jarRepositories.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ android {
dependencies {
implementation project(path: ':rxandroidbleserver')

implementation 'io.reactivex.rxjava3:rxjava:3.0.1'
implementation 'io.reactivex.rxjava3:rxjava:3.0.3'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'com.github.niklasvd96:rxpermissions:0.11.2'

implementation 'androidx.appcompat:appcompat:1.1.0'
implementation 'androidx.constraintlayout:constraintlayout:1.1.3'
implementation 'androidx.lifecycle:lifecycle-extensions:2.2.0'
implementation 'com.google.android.material:material:1.1.0'

implementation 'com.jakewharton.timber:timber:4.7.1'

Expand All @@ -41,5 +42,4 @@ dependencies {
androidTestImplementation 'androidx.test.ext:junit:1.1.1'
androidTestImplementation 'androidx.test:runner:1.2.0'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.2.0'
implementation 'com.google.android.material:material:1.0.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.nexenio.rxandroidbleserver.service.value.RxBleValue;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand All @@ -40,7 +41,7 @@ public ExampleProfile(@NonNull Context context) {

public Completable updateCharacteristicValues() {
return Observable.interval(1, TimeUnit.SECONDS)
.map(count -> (int) (count % 1337))
.map(count -> "Updated example value #" + count)
.map(this::createExampleValue)
.flatMapCompletable(value -> exampleCharacteristic.setValue(value)
.andThen(exampleCharacteristic.sendNotifications()));
Expand Down Expand Up @@ -79,7 +80,7 @@ private RxBleService createExampleService() {

private RxBleCharacteristic createExampleCharacteristic() {
exampleCharacteristic = new CharacteristicBuilder(EXAMPLE_CHARACTERISTIC_UUID)
.withInitialValue(createExampleValue(0))
.withInitialValue(createExampleValue("Initial example value"))
.withDescriptor(new CharacteristicUserDescription("Example"))
.withDescriptor(new ClientCharacteristicConfiguration())
.withDescriptor(createExampleDescriptor())
Expand Down Expand Up @@ -108,4 +109,8 @@ private RxBleValue createExampleValue(int number) {
return new BaseValue(buffer.array());
}

private RxBleValue createExampleValue(String value) {
return new BaseValue(value.getBytes(StandardCharsets.UTF_8));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.nexenio.rxandroidbleserver.request.characteristic.RxBleCharacteristicWriteRequest;
import com.nexenio.rxandroidbleserver.request.descriptor.RxBleDescriptorReadRequest;
import com.nexenio.rxandroidbleserver.request.descriptor.RxBleDescriptorWriteRequest;
import com.nexenio.rxandroidbleserver.response.BaseServerResponse;
import com.nexenio.rxandroidbleserver.response.RxBleServerResponse;
import com.nexenio.rxandroidbleserver.service.RxBleService;
import com.nexenio.rxandroidbleserver.service.characteristic.RxBleCharacteristic;
Expand Down Expand Up @@ -135,8 +136,8 @@ public Completable provideServices() {
@Override
public Completable provideServicesAndAdvertise(@NonNull UUID uuid) {
return Completable.mergeArray(
provideServices().subscribeOn(Schedulers.io()),
advertise(uuid).subscribeOn(Schedulers.io())
provideServices(),
advertise(uuid)
);
}

Expand Down Expand Up @@ -436,7 +437,7 @@ private Completable sendResponse(RxBleServerResponse response) {
response.getRequestId(),
response.getStatus(),
response.getOffset(),
response.getValue().getBytes()
BaseServerResponse.trimData(response.getValue().getBytes(), response.getOffset())
);

if (success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public String toString() {
public static byte[] trimData(byte[] data, int offset) {
if (offset == 0) {
return data;
} else if (offset >= data.length) {
return new byte[]{};
} else {
return Arrays.copyOfRange(data, offset, data.length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import androidx.annotation.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import timber.log.Timber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,21 @@ public Single<RxBleValue> getValue() {

@Override
public Completable setValue(@NonNull RxBleValue value) {
return sharedValueProvider.setValue(value);
return Completable.mergeArray(
sharedValueProvider.setValue(value),
clientValueProvider.setValueForAllClients(value)
);
}

@Override
public Observable<RxBleValue> getValueChanges() {
return sharedValueProvider.getValueChanges();
return Observable.defer(() -> {
if (shareValues) {
return sharedValueProvider.getValueChanges();
} else {
return clientValueProvider.getValueChangesFromAllClients();
}
});
}

@Override
Expand All @@ -56,6 +65,17 @@ public Single<RxBleValue> getValue(@NonNull RxBleClient client) {
});
}

@Override
public Observable<RxBleValue> getValuesFromAllClients() {
return Observable.defer(() -> {
if (shareValues) {
return sharedValueProvider.getValue().toObservable();
} else {
return clientValueProvider.getValuesFromAllClients();
}
});
}

@Override
public Completable setValue(@NonNull RxBleClient client, @NonNull RxBleValue value) {
return Completable.defer(() -> {
Expand All @@ -67,11 +87,33 @@ public Completable setValue(@NonNull RxBleClient client, @NonNull RxBleValue val
});
}

@Override
public Completable setValueForAllClients(@NonNull RxBleValue value) {
return Completable.defer(() -> {
if (shareValues) {
return sharedValueProvider.setValue(value);
} else {
return clientValueProvider.setValueForAllClients(value);
}
});
}

@Override
public Observable<RxBleValue> getValueChanges(@NonNull RxBleClient client) {
return clientValueProvider.getValueChanges(client);
}

@Override
public Observable<RxBleValue> getValueChangesFromAllClients() {
return Observable.defer(() -> {
if (shareValues) {
return sharedValueProvider.getValueChanges();
} else {
return clientValueProvider.getValueChangesFromAllClients();
}
});
}

public Single<RxBleServerResponse> createReadRequestResponse(@NonNull RxBleReadRequest request) {
return getValue(request.getClient())
.map(value -> new BaseServerResponse(request, value))
Expand Down Expand Up @@ -114,4 +156,22 @@ public String toString() {
'}';
}

@Override
public boolean isSharingValuesBetweenClients() {
return shareValues;
}

@Override
public void shareValuesBetweenClients(boolean shareValues) {
this.shareValues = shareValues;
}

public RxBleSharedValueProvider getSharedValueProvider() {
return sharedValueProvider;
}

public RxBleClientValueProvider getClientValueProvider() {
return clientValueProvider;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ public interface RxBleValueContainer extends RxBleClientValueProvider, RxBleShar

Maybe<RxBleServerResponse> createWriteRequestResponse(@NonNull RxBleWriteRequest request);

boolean isSharingValuesBetweenClients();

void shareValuesBetweenClients(boolean shareValues);

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,29 @@
import com.nexenio.rxandroidbleserver.exception.ValueNotAvailableException;
import com.nexenio.rxandroidbleserver.service.value.RxBleValue;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import androidx.annotation.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.ReplaySubject;

public class BaseClientValueProvider implements RxBleClientValueProvider {

private final ReplaySubject<RxBleClient> clientPublisher;

private final Map<RxBleClient, RxBleValue> valueMap;
private final Map<RxBleClient, PublishSubject<RxBleValue>> valuePublisherMap;

public BaseClientValueProvider() {
this.valueMap = new HashMap<>();
this.valuePublisherMap = new HashMap<>();
this.clientPublisher = ReplaySubject.create();
}

@Override
Expand All @@ -36,21 +42,55 @@ public Single<RxBleValue> getValue(@NonNull RxBleClient client) {
});
}

@Override
public Observable<RxBleValue> getValuesFromAllClients() {
return getCurrentClients().flatMapSingle(this::getValue);
}

@Override
public Completable setValue(@NonNull RxBleClient client, @NonNull RxBleValue value) {
return Completable.fromAction(() -> {
boolean isNewClient;
synchronized (valueMap) {
isNewClient = !valueMap.containsKey(client);
valueMap.put(client, value);
}
if (isNewClient) {
clientPublisher.onNext(client);
}
getOrCreateValuePublisher(client).onNext(value);
});
}

@Override
public Completable setValueForAllClients(@NonNull RxBleValue value) {
return getCurrentClients().flatMapCompletable(client -> setValue(client, value));
}

@Override
public Observable<RxBleValue> getValueChanges(@NonNull RxBleClient client) {
return getOrCreateValuePublisher(client);
}

@Override
public Observable<RxBleValue> getValueChangesFromAllClients() {
return getCurrentAndFutureClients().flatMap(this::getValueChanges);
}

private Observable<RxBleClient> getCurrentClients() {
return Observable.defer(() -> {
Collection<RxBleClient> clients;
synchronized (valueMap) {
clients = new HashSet<>(valueMap.keySet());
}
return Observable.fromIterable(clients);
});
}

private Observable<RxBleClient> getCurrentAndFutureClients() {
return clientPublisher;
}

private PublishSubject<RxBleValue> getOrCreateValuePublisher(@NonNull RxBleClient client) {
synchronized (valuePublisherMap) {
if (valuePublisherMap.containsKey(client)) {
Expand All @@ -63,4 +103,11 @@ private PublishSubject<RxBleValue> getOrCreateValuePublisher(@NonNull RxBleClien
}
}

@Override
public String toString() {
return "BaseClientValueProvider{" +
"valueMap=" + valueMap +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ public Observable<RxBleValue> getValueChanges() {
return valuePublisher;
}

@Override
public String toString() {
return "BaseSharedValueProvider{" +
"value=" + value +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ public interface RxBleClientValueProvider {

Single<RxBleValue> getValue(@NonNull RxBleClient client);

Observable<RxBleValue> getValuesFromAllClients();

Completable setValue(@NonNull RxBleClient client, @NonNull RxBleValue value);

Completable setValueForAllClients(@NonNull RxBleValue value);

Observable<RxBleValue> getValueChanges(@NonNull RxBleClient client);

Observable<RxBleValue> getValueChangesFromAllClients();

}

0 comments on commit 0df513b

Please sign in to comment.