* Hardening to enable resume after disconnections and other errors.

This commit is contained in:
richard-austin
2025-01-10 16:58:46 +00:00
parent 6d206b8eb8
commit df1816fbaf
2 changed files with 75 additions and 67 deletions

View File

@@ -13,9 +13,7 @@ import org.oasis_open.docs.wsn.b_2.Renew;
import org.oasis_open.docs.wsn.b_2.RenewResponse;
import org.oasis_open.docs.wsn.b_2.Unsubscribe;
import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
import org.oasis_open.docs.wsn.bw_2.SubscriptionManager;
import org.oasis_open.docs.wsn.bw_2.UnableToDestroySubscriptionFault;
import org.oasis_open.docs.wsn.bw_2.UnacceptableTerminationTimeFault;
import org.oasis_open.docs.wsn.bw_2.*;
import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
import org.onvif.ver10.events.wsdl.*;
import org.slf4j.Logger;
@@ -36,56 +34,43 @@ import java.util.concurrent.Executors;
public class PullPointSubscriptionHandler {
private final Logger LOG = LoggerFactory.getLogger(PullPointSubscriptionHandler.class);
private final EventPortType eventWs;
private final ExecutorService pullMessagesExecutor;
private final List<Header> headers;
final private PullPointSubscription pullPointSubscription;
final private SubscriptionManager subscriptionManager;
private final Client pullPointSubscriptionProxy;
private final Client subscriptionManagerProxy;
final private String serviceAddress;
private final OnvifDevice device;
private PullPointSubscription pullPointSubscription;
private SubscriptionManager subscriptionManager;
private final CreatePullPointSubscription cpps;
PullMessages pm = null;
Renew renew = null;
PullMessagesCallbacks callback;
SOAPElement messageIDEl;
boolean terminate = false;
public PullPointSubscriptionHandler(final OnvifDevice device, CreatePullPointSubscriptionResponse cppsr, PullMessagesCallbacks callback) {
public PullPointSubscriptionHandler(final OnvifDevice device, CreatePullPointSubscription cpps, PullMessagesCallbacks callback) {
eventWs = device.getEvents();
this.device = device;
this.cpps = cpps;
headers = new ArrayList<>();
pullMessagesExecutor = Executors.newSingleThreadExecutor();
serviceAddress = getWSAAddress(cppsr.getSubscriptionReference());
pullPointSubscription = device.getServiceProxy((BindingProvider) device.eventService.getEventPort(), serviceAddress).create(PullPointSubscription.class);
subscriptionManager = device.getServiceProxy((BindingProvider) device.eventService.getEventPort(), serviceAddress).create(SubscriptionManager.class);
pullPointSubscriptionProxy = ClientProxy.getClient(pullPointSubscription);
subscriptionManagerProxy = ClientProxy.getClient(subscriptionManager);
this.callback = callback;
init();
}
// PullPointSubscription functions
public SeekResponse seek(Seek parameters) {
return pullPointSubscription.seek(parameters);
}
public void setSynchronizationPoint() {
pullPointSubscription.setSynchronizationPoint();
}
public UnsubscribeResponse unsubscribe(Unsubscribe unsubscribeRequest) throws UnableToDestroySubscriptionFault, ResourceUnknownFault {
return pullPointSubscription.unsubscribe(unsubscribeRequest);
}
public PullMessagesResponse pullMessages(PullMessages parameters) throws PullMessagesFaultResponse_Exception {
return pullPointSubscription.pullMessages(parameters);
}
// SubscriptionManager functions
public RenewResponse renew(Renew renewRequest) throws UnacceptableTerminationTimeFault, ResourceUnknownFault {
return subscriptionManager.renew(renewRequest);
}
public void init() {
private void init() {
final String addressingNS = "http://www.w3.org/2005/08/addressing";
try {
CreatePullPointSubscriptionResponse resp =
eventWs.createPullPointSubscription(cpps);
final String serviceAddress = getWSAAddress(resp.getSubscriptionReference());
pullPointSubscription = device.getServiceProxy((BindingProvider) device.eventService.getEventPort(), serviceAddress).create(PullPointSubscription.class);
subscriptionManager = device.getServiceProxy((BindingProvider) device.eventService.getEventPort(), serviceAddress).create(SubscriptionManager.class);
final Client pullPointSubscriptionProxy = ClientProxy.getClient(pullPointSubscription);
final Client subscriptionManagerProxy = ClientProxy.getClient(subscriptionManager);
pm = new PullMessages();
pm.setMessageLimit(1024);
Duration dur = DatatypeFactory.newInstance().newDuration("PT1M");
@@ -117,7 +102,8 @@ public class PullPointSubscriptionHandler {
var messageIdHdr = new Header(messageID, messageIDEl);
var toHdr = new Header(to, toEl);
var replyToHdr = new Header(replyTo, replyToEl);
headers.clear();
headers.add(actionHdr);
headers.add(messageIdHdr);
headers.add(toHdr);
@@ -127,8 +113,19 @@ public class PullPointSubscriptionHandler {
subscriptionManagerProxy.getRequestContext().put(Header.HEADER_LIST, headers);
startPullMessages();
} catch (DatatypeConfigurationException | SOAPException ex) {
LOG.error(ex.getMessage(), ex);
} catch (DatatypeConfigurationException | SOAPException | UnsupportedPolicyRequestFault |
TopicExpressionDialectUnknownFault | TopicNotSupportedFault | ResourceUnknownFault |
UnrecognizedPolicyRequestFault | NotifyMessageNotSupportedFault | SubscribeCreationFailedFault |
UnacceptableInitialTerminationTimeFault | InvalidProducerPropertiesExpressionFault |
InvalidTopicExpressionFault | InvalidMessageContentExpressionFault | InvalidFilterFault e) {
LOG.error("{}: {}", e.getClass().getName(), e.getMessage());
try {
if(!terminate) {
Thread.sleep(3000);
init(); // On failure, tru again after delay
}
}
catch (InterruptedException ignored) {}
}
}
@@ -157,9 +154,19 @@ public class PullPointSubscriptionHandler {
unsubscribe(new Unsubscribe());
pullMessagesExecutor.shutdown();
}
} catch (PullMessagesFaultResponse_Exception | UnacceptableTerminationTimeFault | ResourceUnknownFault |
UnableToDestroySubscriptionFault e) {
} catch (Exception e) {
LOG.error(e.getMessage(), e);
if(!terminate) {
try {
// This will most likely fail but try it anyway
unsubscribe(new Unsubscribe());
} catch (Exception ignore) {}
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {}
// On error restart from the beginning
init();
}
} finally {
if(!terminate)
startPullMessages();
@@ -167,6 +174,28 @@ public class PullPointSubscriptionHandler {
});
}
// PullPointSubscription functions
public SeekResponse seek(Seek parameters) {
return pullPointSubscription.seek(parameters);
}
public void setSynchronizationPoint() {
pullPointSubscription.setSynchronizationPoint();
}
public UnsubscribeResponse unsubscribe(Unsubscribe unsubscribeRequest) throws UnableToDestroySubscriptionFault, ResourceUnknownFault {
return pullPointSubscription.unsubscribe(unsubscribeRequest);
}
public PullMessagesResponse pullMessages(PullMessages parameters) throws PullMessagesFaultResponse_Exception {
return pullPointSubscription.pullMessages(parameters);
}
// SubscriptionManager functions
public RenewResponse renew(Renew renewRequest) throws UnacceptableTerminationTimeFault, ResourceUnknownFault {
return subscriptionManager.renew(renewRequest);
}
public void setTerminate() {
terminate = true;
}

View File

@@ -6,7 +6,6 @@ import de.onvif.soap.PullMessagesCallbacks;
import de.onvif.soap.PullPointSubscriptionHandler;
import jakarta.xml.bind.JAXBElement;
import jakarta.xml.soap.SOAPException;
import org.apache.cxf.wsn.client.Publisher;
import org.oasis_open.docs.wsn.b_2.FilterType;
import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
import org.onvif.ver10.events.wsdl.*;
@@ -24,7 +23,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
public class PullPointTest implements PullMessagesCallbacks {
private PullPointTest() {
@@ -87,11 +85,7 @@ public class PullPointTest implements PullMessagesCallbacks {
objectFactory.createSubscribeInitialTerminationTime(timespan));
try {
CreatePullPointSubscriptionResponse resp =
eventWs.createPullPointSubscription(pullPointSubscription);
PullPointSubscriptionHandler ppsh = new PullPointSubscriptionHandler(cam, resp, this);
PullPointSubscriptionHandler ppsh = new PullPointSubscriptionHandler(cam, pullPointSubscription, this);
Thread.currentThread().join();
ppsh.setTerminate();
@@ -125,25 +119,10 @@ public class PullPointTest implements PullMessagesCallbacks {
@Override
public void onPullMessagesReceived(PullMessagesResponse pullMessages) {
ProcessedPullMessagesResponse ppmr = new ProcessedPullMessagesResponse(pullMessages);
ppmr.responseData.forEach((x) -> {
x.Data.forEach((data) -> {
System.out.println(x.created + " " + x.topic + " " + data.Name + " " + data.Value);
});
});
ppmr.responseData.forEach((x) ->
x.Data.forEach((data) ->
System.out.println(x.created + " " + x.topic + " " + data.Name + " " + data.Value)));
responses.put(new Date(), ppmr);
}
public static class PublisherCallback implements Publisher.Callback {
final CountDownLatch subscribed = new CountDownLatch(1);
final CountDownLatch unsubscribed = new CountDownLatch(1);
public void subscribe(TopicExpressionType topic) {
subscribed.countDown();
}
public void unsubscribe(TopicExpressionType topic) {
unsubscribed.countDown();
}
}
}