2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.persistence.dynamodb.internal;
15 import java.time.Instant;
16 import java.time.ZoneId;
17 import java.time.ZonedDateTime;
18 import java.util.UUID;
19 import java.util.concurrent.ArrayBlockingQueue;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.TimeUnit;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.items.Item;
26 import org.openhab.core.persistence.PersistenceService;
27 import org.openhab.core.types.State;
28 import org.openhab.core.types.UnDefType;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Abstract class for buffered persistence services
35 * @param <T> Type of the state as accepted by the AWS SDK.
37 * @author Sami Salonen - Initial contribution
38 * @author Kai Kreuzer - Migration to 3.x
42 public abstract class AbstractBufferedPersistenceService<T> implements PersistenceService {
44 private static final long BUFFER_OFFER_TIMEOUT_MILLIS = 500;
46 private final Logger logger = LoggerFactory.getLogger(AbstractBufferedPersistenceService.class);
47 protected @Nullable BlockingQueue<T> buffer;
49 private boolean writeImmediately;
51 protected void resetWithBufferSize(int bufferSize) {
52 int capacity = Math.max(1, bufferSize);
53 buffer = new ArrayBlockingQueue<>(capacity, true);
54 writeImmediately = bufferSize == 0;
57 protected abstract T persistenceItemFromState(String name, State state, ZonedDateTime time);
59 protected abstract boolean isReadyToStore();
61 protected abstract void flushBufferedData();
64 public void store(Item item) {
69 public void store(Item item, @Nullable String alias) {
70 long storeStart = System.currentTimeMillis();
71 String uuid = UUID.randomUUID().toString();
72 if (item.getState() instanceof UnDefType) {
73 logger.debug("Undefined item state received. Not storing item {}.", item.getName());
76 if (!isReadyToStore()) {
80 throw new IllegalStateException("Buffer not initialized with resetWithBufferSize. Bug?");
82 ZonedDateTime time = ZonedDateTime.ofInstant(Instant.ofEpochMilli(storeStart), ZoneId.systemDefault());
83 String realName = item.getName();
84 String name = (alias != null) ? alias : realName;
85 State state = item.getState();
86 T persistenceItem = persistenceItemFromState(name, state, time);
87 logger.trace("store() called with item {}, which was converted to {} [{}]", item, persistenceItem, uuid);
88 if (writeImmediately) {
89 logger.debug("Writing immediately item {} [{}]", realName, uuid);
90 // We want to write everything immediately
91 // Synchronous behavior to ensure buffer does not get full.
93 boolean buffered = addToBuffer(persistenceItem);
98 long bufferStart = System.currentTimeMillis();
99 boolean buffered = addToBuffer(persistenceItem);
101 logger.debug("Buffered item {} in {} ms. Total time for store(): {} [{}]", realName,
102 System.currentTimeMillis() - bufferStart, System.currentTimeMillis() - storeStart, uuid);
105 "Buffer is full. Writing buffered data immediately and trying again. Consider increasing bufferSize");
106 // Buffer is full, commit it immediately
108 boolean buffered2 = addToBuffer(persistenceItem);
110 logger.debug("Buffered item in {} ms (2nd try, flushed buffer in-between) [{}]",
111 System.currentTimeMillis() - bufferStart, uuid);
113 // The unlikely case happened -- buffer got full again immediately
114 logger.warn("Buffering failed for the second time -- Too small bufferSize? Discarding data [{}]",
121 protected boolean addToBuffer(T persistenceItem) {
123 return buffer != null && buffer.offer(persistenceItem, BUFFER_OFFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
124 } catch (InterruptedException e) {
125 logger.warn("Interrupted when trying to buffer data! Dropping data");