]> git.basschouten.com Git - openhab-addons.git/blob
ba28fb59589f5c5a879e9c80a970d8f3b0449aaa
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.persistence.dynamodb.internal;
14
15 import java.time.Instant;
16 import java.time.ZoneId;
17 import java.time.ZonedDateTime;
18 import java.util.UUID;
19 import java.util.concurrent.ArrayBlockingQueue;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.TimeUnit;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.items.Item;
26 import org.openhab.core.persistence.PersistenceService;
27 import org.openhab.core.types.State;
28 import org.openhab.core.types.UnDefType;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Abstract class for buffered persistence services
34  *
35  * @param <T> Type of the state as accepted by the AWS SDK.
36  *
37  * @author Sami Salonen - Initial contribution
38  * @author Kai Kreuzer - Migration to 3.x
39  *
40  */
41 @NonNullByDefault
42 public abstract class AbstractBufferedPersistenceService<T> implements PersistenceService {
43
44     private static final long BUFFER_OFFER_TIMEOUT_MILLIS = 500;
45
46     private final Logger logger = LoggerFactory.getLogger(AbstractBufferedPersistenceService.class);
47     protected @Nullable BlockingQueue<T> buffer;
48
49     private boolean writeImmediately;
50
51     protected void resetWithBufferSize(int bufferSize) {
52         int capacity = Math.max(1, bufferSize);
53         buffer = new ArrayBlockingQueue<>(capacity, true);
54         writeImmediately = bufferSize == 0;
55     }
56
57     protected abstract T persistenceItemFromState(String name, State state, ZonedDateTime time);
58
59     protected abstract boolean isReadyToStore();
60
61     protected abstract void flushBufferedData();
62
63     @Override
64     public void store(Item item) {
65         store(item, null);
66     }
67
68     @Override
69     public void store(Item item, @Nullable String alias) {
70         long storeStart = System.currentTimeMillis();
71         String uuid = UUID.randomUUID().toString();
72         if (item.getState() instanceof UnDefType) {
73             logger.debug("Undefined item state received. Not storing item {}.", item.getName());
74             return;
75         }
76         if (!isReadyToStore()) {
77             return;
78         }
79         if (buffer == null) {
80             throw new IllegalStateException("Buffer not initialized with resetWithBufferSize. Bug?");
81         }
82         ZonedDateTime time = ZonedDateTime.ofInstant(Instant.ofEpochMilli(storeStart), ZoneId.systemDefault());
83         String realName = item.getName();
84         String name = (alias != null) ? alias : realName;
85         State state = item.getState();
86         T persistenceItem = persistenceItemFromState(name, state, time);
87         logger.trace("store() called with item {}, which was converted to {} [{}]", item, persistenceItem, uuid);
88         if (writeImmediately) {
89             logger.debug("Writing immediately item {} [{}]", realName, uuid);
90             // We want to write everything immediately
91             // Synchronous behavior to ensure buffer does not get full.
92             synchronized (this) {
93                 boolean buffered = addToBuffer(persistenceItem);
94                 assert buffered;
95                 flushBufferedData();
96             }
97         } else {
98             long bufferStart = System.currentTimeMillis();
99             boolean buffered = addToBuffer(persistenceItem);
100             if (buffered) {
101                 logger.debug("Buffered item {} in {} ms. Total time for store(): {} [{}]", realName,
102                         System.currentTimeMillis() - bufferStart, System.currentTimeMillis() - storeStart, uuid);
103             } else {
104                 logger.debug(
105                         "Buffer is full. Writing buffered data immediately and trying again. Consider increasing bufferSize");
106                 // Buffer is full, commit it immediately
107                 flushBufferedData();
108                 boolean buffered2 = addToBuffer(persistenceItem);
109                 if (buffered2) {
110                     logger.debug("Buffered item in {} ms (2nd try, flushed buffer in-between) [{}]",
111                             System.currentTimeMillis() - bufferStart, uuid);
112                 } else {
113                     // The unlikely case happened -- buffer got full again immediately
114                     logger.warn("Buffering failed for the second time -- Too small bufferSize? Discarding data [{}]",
115                             uuid);
116                 }
117             }
118         }
119     }
120
121     protected boolean addToBuffer(T persistenceItem) {
122         try {
123             return buffer != null && buffer.offer(persistenceItem, BUFFER_OFFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
124         } catch (InterruptedException e) {
125             logger.warn("Interrupted when trying to buffer data! Dropping data");
126             return false;
127         }
128     }
129 }