diff --git a/onvif-java/src/main/java/de/onvif/soap/PullPointSubscriptionHandler.java b/onvif-java/src/main/java/de/onvif/soap/PullPointSubscriptionHandler.java index f84e1d1..94c6594 100644 --- a/onvif-java/src/main/java/de/onvif/soap/PullPointSubscriptionHandler.java +++ b/onvif-java/src/main/java/de/onvif/soap/PullPointSubscriptionHandler.java @@ -13,9 +13,7 @@ import org.oasis_open.docs.wsn.b_2.Renew; import org.oasis_open.docs.wsn.b_2.RenewResponse; import org.oasis_open.docs.wsn.b_2.Unsubscribe; import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse; -import org.oasis_open.docs.wsn.bw_2.SubscriptionManager; -import org.oasis_open.docs.wsn.bw_2.UnableToDestroySubscriptionFault; -import org.oasis_open.docs.wsn.bw_2.UnacceptableTerminationTimeFault; +import org.oasis_open.docs.wsn.bw_2.*; import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault; import org.onvif.ver10.events.wsdl.*; import org.slf4j.Logger; @@ -36,56 +34,43 @@ import java.util.concurrent.Executors; public class PullPointSubscriptionHandler { private final Logger LOG = LoggerFactory.getLogger(PullPointSubscriptionHandler.class); + private final EventPortType eventWs; private final ExecutorService pullMessagesExecutor; private final List
headers; - final private PullPointSubscription pullPointSubscription; - final private SubscriptionManager subscriptionManager; - private final Client pullPointSubscriptionProxy; - private final Client subscriptionManagerProxy; - final private String serviceAddress; + private final OnvifDevice device; + private PullPointSubscription pullPointSubscription; + private SubscriptionManager subscriptionManager; + private final CreatePullPointSubscription cpps; PullMessages pm = null; Renew renew = null; PullMessagesCallbacks callback; SOAPElement messageIDEl; boolean terminate = false; - public PullPointSubscriptionHandler(final OnvifDevice device, CreatePullPointSubscriptionResponse cppsr, PullMessagesCallbacks callback) { + public PullPointSubscriptionHandler(final OnvifDevice device, CreatePullPointSubscription cpps, PullMessagesCallbacks callback) { + eventWs = device.getEvents(); + this.device = device; + this.cpps = cpps; headers = new ArrayList<>(); pullMessagesExecutor = Executors.newSingleThreadExecutor(); - serviceAddress = getWSAAddress(cppsr.getSubscriptionReference()); - pullPointSubscription = device.getServiceProxy((BindingProvider) device.eventService.getEventPort(), serviceAddress).create(PullPointSubscription.class); - subscriptionManager = device.getServiceProxy((BindingProvider) device.eventService.getEventPort(), serviceAddress).create(SubscriptionManager.class); - pullPointSubscriptionProxy = ClientProxy.getClient(pullPointSubscription); - subscriptionManagerProxy = ClientProxy.getClient(subscriptionManager); this.callback = callback; init(); } - // PullPointSubscription functions - public SeekResponse seek(Seek parameters) { - return pullPointSubscription.seek(parameters); - } - - public void setSynchronizationPoint() { - pullPointSubscription.setSynchronizationPoint(); - } - - public UnsubscribeResponse unsubscribe(Unsubscribe unsubscribeRequest) throws UnableToDestroySubscriptionFault, ResourceUnknownFault { - return pullPointSubscription.unsubscribe(unsubscribeRequest); - } - - public PullMessagesResponse pullMessages(PullMessages parameters) throws PullMessagesFaultResponse_Exception { - return pullPointSubscription.pullMessages(parameters); - } - - // SubscriptionManager functions - public RenewResponse renew(Renew renewRequest) throws UnacceptableTerminationTimeFault, ResourceUnknownFault { - return subscriptionManager.renew(renewRequest); - } - - public void init() { + private void init() { final String addressingNS = "http://www.w3.org/2005/08/addressing"; try { + CreatePullPointSubscriptionResponse resp = + eventWs.createPullPointSubscription(cpps); + + final String serviceAddress = getWSAAddress(resp.getSubscriptionReference()); + pullPointSubscription = device.getServiceProxy((BindingProvider) device.eventService.getEventPort(), serviceAddress).create(PullPointSubscription.class); + subscriptionManager = device.getServiceProxy((BindingProvider) device.eventService.getEventPort(), serviceAddress).create(SubscriptionManager.class); + + final Client pullPointSubscriptionProxy = ClientProxy.getClient(pullPointSubscription); + final Client subscriptionManagerProxy = ClientProxy.getClient(subscriptionManager); + + pm = new PullMessages(); pm.setMessageLimit(1024); Duration dur = DatatypeFactory.newInstance().newDuration("PT1M"); @@ -117,7 +102,8 @@ public class PullPointSubscriptionHandler { var messageIdHdr = new Header(messageID, messageIDEl); var toHdr = new Header(to, toEl); var replyToHdr = new Header(replyTo, replyToEl); - + + headers.clear(); headers.add(actionHdr); headers.add(messageIdHdr); headers.add(toHdr); @@ -127,8 +113,19 @@ public class PullPointSubscriptionHandler { subscriptionManagerProxy.getRequestContext().put(Header.HEADER_LIST, headers); startPullMessages(); - } catch (DatatypeConfigurationException | SOAPException ex) { - LOG.error(ex.getMessage(), ex); + } catch (DatatypeConfigurationException | SOAPException | UnsupportedPolicyRequestFault | + TopicExpressionDialectUnknownFault | TopicNotSupportedFault | ResourceUnknownFault | + UnrecognizedPolicyRequestFault | NotifyMessageNotSupportedFault | SubscribeCreationFailedFault | + UnacceptableInitialTerminationTimeFault | InvalidProducerPropertiesExpressionFault | + InvalidTopicExpressionFault | InvalidMessageContentExpressionFault | InvalidFilterFault e) { + LOG.error("{}: {}", e.getClass().getName(), e.getMessage()); + try { + if(!terminate) { + Thread.sleep(3000); + init(); // On failure, tru again after delay + } + } + catch (InterruptedException ignored) {} } } @@ -157,9 +154,19 @@ public class PullPointSubscriptionHandler { unsubscribe(new Unsubscribe()); pullMessagesExecutor.shutdown(); } - } catch (PullMessagesFaultResponse_Exception | UnacceptableTerminationTimeFault | ResourceUnknownFault | - UnableToDestroySubscriptionFault e) { + } catch (Exception e) { LOG.error(e.getMessage(), e); + if(!terminate) { + try { + // This will most likely fail but try it anyway + unsubscribe(new Unsubscribe()); + } catch (Exception ignore) {} + try { + Thread.sleep(3000); + } catch (InterruptedException ignored) {} + // On error restart from the beginning + init(); + } } finally { if(!terminate) startPullMessages(); @@ -167,6 +174,28 @@ public class PullPointSubscriptionHandler { }); } + // PullPointSubscription functions + public SeekResponse seek(Seek parameters) { + return pullPointSubscription.seek(parameters); + } + + public void setSynchronizationPoint() { + pullPointSubscription.setSynchronizationPoint(); + } + + public UnsubscribeResponse unsubscribe(Unsubscribe unsubscribeRequest) throws UnableToDestroySubscriptionFault, ResourceUnknownFault { + return pullPointSubscription.unsubscribe(unsubscribeRequest); + } + + public PullMessagesResponse pullMessages(PullMessages parameters) throws PullMessagesFaultResponse_Exception { + return pullPointSubscription.pullMessages(parameters); + } + + // SubscriptionManager functions + public RenewResponse renew(Renew renewRequest) throws UnacceptableTerminationTimeFault, ResourceUnknownFault { + return subscriptionManager.renew(renewRequest); + } + public void setTerminate() { terminate = true; } diff --git a/onvif-java/src/test/java/org/onvif/client/PullPointTest.java b/onvif-java/src/test/java/org/onvif/client/PullPointTest.java index 46dea5c..e97752a 100644 --- a/onvif-java/src/test/java/org/onvif/client/PullPointTest.java +++ b/onvif-java/src/test/java/org/onvif/client/PullPointTest.java @@ -6,7 +6,6 @@ import de.onvif.soap.PullMessagesCallbacks; import de.onvif.soap.PullPointSubscriptionHandler; import jakarta.xml.bind.JAXBElement; import jakarta.xml.soap.SOAPException; -import org.apache.cxf.wsn.client.Publisher; import org.oasis_open.docs.wsn.b_2.FilterType; import org.oasis_open.docs.wsn.b_2.TopicExpressionType; import org.onvif.ver10.events.wsdl.*; @@ -24,7 +23,6 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; public class PullPointTest implements PullMessagesCallbacks { private PullPointTest() { @@ -87,11 +85,7 @@ public class PullPointTest implements PullMessagesCallbacks { objectFactory.createSubscribeInitialTerminationTime(timespan)); try { - CreatePullPointSubscriptionResponse resp = - eventWs.createPullPointSubscription(pullPointSubscription); - - PullPointSubscriptionHandler ppsh = new PullPointSubscriptionHandler(cam, resp, this); - + PullPointSubscriptionHandler ppsh = new PullPointSubscriptionHandler(cam, pullPointSubscription, this); Thread.currentThread().join(); ppsh.setTerminate(); @@ -125,25 +119,10 @@ public class PullPointTest implements PullMessagesCallbacks { @Override public void onPullMessagesReceived(PullMessagesResponse pullMessages) { ProcessedPullMessagesResponse ppmr = new ProcessedPullMessagesResponse(pullMessages); - ppmr.responseData.forEach((x) -> { - x.Data.forEach((data) -> { - System.out.println(x.created + " " + x.topic + " " + data.Name + " " + data.Value); - }); - }); + ppmr.responseData.forEach((x) -> + x.Data.forEach((data) -> + System.out.println(x.created + " " + x.topic + " " + data.Name + " " + data.Value))); responses.put(new Date(), ppmr); } - - public static class PublisherCallback implements Publisher.Callback { - final CountDownLatch subscribed = new CountDownLatch(1); - final CountDownLatch unsubscribed = new CountDownLatch(1); - - public void subscribe(TopicExpressionType topic) { - subscribed.countDown(); - } - - public void unsubscribe(TopicExpressionType topic) { - unsubscribed.countDown(); - } - } }