Historian Script für Datenlöschung und Komprimierung

Das Langzeitarchiv für HomeMatic

Moderator: Co-Administratoren

Benutzeravatar
wak
Beiträge: 262
Registriert: 05.05.2014, 00:21
Hat sich bedankt: 2 Mal
Danksagung erhalten: 32 Mal

Historian Script für Datenlöschung und Komprimierung

Beitrag von wak » 25.03.2024, 22:25

Hallo Historian-Freunde,

mir ist das Aufräumen auf den Wecker gegangen und so richtig hat es auch nicht funktioniert. Deshalb hab ich für mich mal ein Script geschrieben, das einige Herausforderungen für mich behebt.

Herausforderung der Datenbehandlung:
* Datenbankwachstum
* Ladegeschwindigkeit bei grossen Zeiträumen
* Unterschiedliche Zeiträume die in der Datenbank verbleiben sollen
* keine Komprimierung möglich

Mir war es wichtig das ganze fein zu konfigurieren, heißt individuelle Zeiträume, individuelle Wert-Ermittlung bei Komprimierung.

Derzeit ist das ganz noch im Teststadium, bin auf eure Kommentare aber sehr gespannt und Achtung bei falscher Konfiguration können sehr leicht Daten gelöscht werden.

Natürlich verliert man mit der Komprimierung etwas an Details, das aber bei vielen Werten ab einen bestimmten Zeitpunkt nicht mehr für mich wichtig ist. Dafür gewinnt man Ladegeschwindigkeit bei z.b. Jahresvergleiche!

Das folgende Script kann ohne Änderung zum direkten Testen verwendet werden und läuft so im Test-Modus.

Mit der Variable testRun=false kann ihn den Schreib-Modus gewechselt werden und damit werden Daten gelöscht und / oder komprimiert.

Mit der Variable config kann man die gewünschten Datenpunkte konfigurieren:
Parameter 1 z.b. "*DutyCycle*" ist der Displayname und es können * (Wildcards verwendet werden)
Parameter 2 z.b. 365 ist die Anzahl der Tage die in der Datenbank verbleiben sollen, 365 wäre hier ein Jahr, alle Daten drüber ob komprimiert oder nicht werden gelöscht (es gibt auch volle Monate oder volle Jahre "5M" oder "3Y" )
Parameter 3 z.b. 10 ist hier die Anzal der Tage nachdem die Werte komprimiert werden
Parameter 4 z.b. 60*60*1 bestimmt den Komprimierungszeitraum das wäre 1 Std. (gängige Werte für mich 60*60*2 oder 60*60*24)
Parameter 5 z.b. "max" bestimmt die Art der Komprimierung "max" = maximaler Wert in 1 Std. ebenso gibt es noch:
"min" - Minimuwert in 1Std.
"avg" - Durchschnittswert verwende ich bei Temperaturwerten oder Feuchtigkeitswerten
"first" - erster Wert im Komprimierungszeitraum verwende ich bei Zählern
"last" - letzter Wert im Komprimierungszeitraum
"integral" - Durchschittswert mit Zeitfaktor, funktioniert aber nur bei vielen Wert in einem Komprimierungszeitraum spricht Tagesrdurchschnitt

Viel Spaß beim Testen und auf eure Erfahrungen und Verbesserungen bin ich schon gespannt
LG wak

Code: Alles auswählen

// Datenreihen löschen und komprimieren, V1.0

// min, max, first, last or avg as value for compression
def config = [
              ["*DUTYCYCLE*"          , 365, 10, 60*60*1,   "max" ],    // Std. Max-Wert nach 10 Tagen und nach 365 Tagen löschen
              ["*CARRIER_SENSE*"      , 365, 10, 60*60*1,   "max" ],    // Std. Max-Wert nach 10 Tagen und nach 365 Tagen löschen 
              ["*HUMIDITY"            ,"3Y", 30, 60*60*2,   "avg" ],    // 2 Std. Durchschnitt nach 30 Tagen und nach 3 vollen Jahren löschen
              ["*TEMPERATURE"         ,"3Y", 30, 60*60*2,   "avg" ],    // 2 Std. Durchschnitt nach 30 Tagen und nach 3 vollen Jahren löschen
              ["*ENERGY_COUNTER"     ,"5Y", 30, 60*60*2,   "first" ],  // 2 Std. bei Zähler 1. Wert nach 30 Tagen und nach 5 vollen Jahren 
]


// Testlauf durchführen? Bei einem Testlauf wird die Datenbank nicht verändert.
// (Ja: true, Nein: false)
def testRun = true

// *** Skript ***

def dateFormat = "yyyy-MM-dd"   // standard Timestamp Format
def jetzt =  new Date()               // aktuellen StartTimeStamp speichern für Berechnungen

// 24 Bits für Flag berechnen 
def int bit24 = Math.pow(2,23)   // 8388608;

config.each { conf -> 

  // Löschdatum 5Y oder 3M auf Tage aktuell umrechnen mit 1. des Monats
  def orgConf = conf[1]
  if (conf[1].getClass() == String ) {
    if (conf[1].endsWith("Y")        // volle Jahre 
       and conf[1].substring(0, conf[1].length() - 1).isInteger()
       and conf[1].substring(0, conf[1].length() - 1).toInteger() > 0 ) {
      def dateTmp = new Date(year: (jetzt.getYear() - conf[1].substring(0, conf[1].length() - 1).toInteger()) , month: 0, date: 1, hours: 0, minutes: 0, seconds: 0)
      conf[1] = jetzt - dateTmp     // berechnete Löschtage setzen für xY
    } else if (conf[1].endsWith("M")   // volle Monate 
       and conf[1].substring(0, conf[1].length() - 1).isInteger()
       and conf[1].substring(0, conf[1].length() - 1).toInteger() > 0 ) {
      def cal=Calendar.getInstance().clearTime()
      cal.add(Calendar.MONTH, conf[1].substring(0, conf[1].length() - 1).toInteger() * -1);
      cal.set(Calendar.DAY_OF_MONTH, 1);
      conf[1] = jetzt - cal.getTime() // berechnete Löschtage setzen für xM
    }
    def delDate = jetzt - conf[1]
    delDate.clearTime()
    println conf[0].padRight(40) + " Löschdatum berechnet " + delDate.format(dateFormat) + "  " + orgConf + "  " + conf[1]
  }

  // wildcards in regex umbauen 
  def txtResult = new StringBuffer()
  conf[0].each { ch ->
    switch (ch) {
    case '*':
        // Single '*' matches single dir/file; Double '*' matches sequence of zero or more dirs/files
        txtResult << /[^\/]*/
        break
    case '?':
        // Any character except the normalized file separator ('/')
        txtResult << /[^\/]/
        break
    case ['$', '|', '[', ']', '(', ')', '.', ':', '{', '}', '\\', '^', '+']:
        txtResult << '\\' + ch
        break
    default: txtResult << ch
    }
  }
  conf[0] = txtResult.toString().toUpperCase()
  // println dptxt[0]
}
 
def comprFactor=0
def dpCount=[]
def summeDel=0
def summeComp=0
def summetotal=0
def lastDPState
database.dataPoints.sort{ it.displayName.toUpperCase() }.each { dp ->

  // check dp.displayName wird in config gefunden
  def found=null
  def delBegin=null
  def delEnd=null
  def komprBegin=null
  def cntNew

  // Datenpunkt in Liste über REGEX suchen
  config.find { con ->
    if (dp.displayName.toUpperCase()==~con[0]) {
      // println  con[0] + " -> " + dp.displayName    // check REGEX Regeln
      found = con
      // beim ersten gefunden, merken und Schleife verlassen
      return true // break
    }
  }
  if (found) {

    // Datenlöschung     ******* 
    if (found[1] > 10) {
      cntNew=0
      // Löschzeitraum bestimmen
      delBegin=database.getFirstTimestamp(dp)
      delEnd=jetzt-found[1]      // found[1] = 2 spalte aus der Konfiguration Tabelle am Anfang
      delEnd.clearTime()         // Zeit auf 00:00:00 stellen

      if (testRun) {
        cntNew=database.getCount(dp, delBegin  , delEnd)
        if (cntNew>0) println "$dp.displayName: $cntNew werden gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat) + " (Testlauf)"
      } else {
        cntNew=database.deleteTimeSeries(dp, delBegin  , delEnd)
        if (cntNew>0) println "$dp.displayName: $cntNew gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat)
      }
      summeDel=summeDel+cntNew
    }

    // Datenkomprimierung   ****** 
    if (found[2] > 5) {
      // Komprimierungs Zeitraum bestimmen
      if (delEnd) {
         komprBegin=delEnd
      } else {
         komprBegin=database.getFirstTimestamp(dp)
      }
      def komprEnd=jetzt-found[2]
      komprEnd.clearTime()    // Zeit auf 00:00:00 stellen

      comprFactor = 1000*found[3]  // Sekunden von Konfigurationtabelle * 1000 auf Millisekungen
      def comprValue = found[4]    // von Konfigurationtalle "AVG", "MIN", "MAX", ...  

      def cnt
  
      // Zeitreihe holen
      def ts=database.getTimeSeriesRaw(dp, komprBegin, komprEnd)
  
      // erste Zeitreihe ohne komprimierung finden
      def komprBeg = komprEnd
      ts.find { pv ->
        if (!(pv.state&bit24)) { 
          komprBeg = pv.timestamp    // neues begin ermittelt
          return true // break
        }
      }
      //println "Erste Timestamp ohne KompFlag: $komprBeg"
  
      // Zeitreihe holen neu ohne bereits komprimierte
      ts=database.getTimeSeriesRaw(dp, komprBeg , komprEnd)
      cnt=ts.size
      
      // Statistik berechnen
      def duration=komprEnd.time - komprBeg.time 
      def min=Double.POSITIVE_INFINITY
      def max=Double.NEGATIVE_INFINITY
      def integr=0
      def intsum=0
      def intwert=0
      def anzahl=0
      def summe=0
      def lastTime=0
      def thisTime=0
      def firstValue=0
      def lastValue=0
      def avg=0
      def previous
      def comprPosible=false
  
      // neue komprimierte Zeitreihe erstellen    
      def timeSeries=new TimeSeries(dp)
      
      ts.each { pv ->
        thisTime=new Date( ( ( Math.floor(pv.timestamp.time/comprFactor)*comprFactor) as long) )
         
        if (thisTime!=lastTime) {
          if (lastTime!=0) {
            duration=lastTime.time - thisTime.time
            // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
            // def avg=integr/duration
            avg = Math.round(summe / anzahl * 10) / 10
            if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10
          
            // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"
          
            switch(comprValue) {            
            case "min": 
              timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
              break;
            case "max": 
              timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
              break;
            case "first": 
              timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
              break;
            case "last": 
              timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
              break;
            case "avg": 
              timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
              break;
            case "integral": 
              timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
              break;
            }
            if (anzahl>1) comprPosible=true
          }
    
          min=pv.value
          max=pv.value
          anzahl=0
          summe=0
          integr=0
          intsum=0
          intwert=pv.value 
          firstValue=pv.value
          lastDPState=pv.state
          lastTime=thisTime
        }
          
        if (pv.value<min) min=pv.value
        if (pv.value>max) max=pv.value
        if (previous!=null) {
          // Teilintegral berechnen: Messwert*Millisekunden
          integr+=previous.value*(pv.timestamp.time-previous.timestamp.time)
          intsum+=(pv.timestamp.time-previous.timestamp.time)
        }
        lastValue=pv.value
        anzahl=anzahl+1
        summe=summe+pv.value
        previous=pv
      }
      if (lastTime!=0) {
         duration=lastTime.time - thisTime.time
        // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
        // def avg=integr/duration
        avg = Math.round(summe / anzahl * 10) / 10
        if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10
      
        // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"
      
        switch(comprValue) {            
        case "min": 
          timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
          break;
        case "max": 
          timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
          break;
        case "first": 
          timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
          break;
        case "last": 
          timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
          break;
        case "avg": 
          timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
          break;
        case "integral": 
          timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
          break;
        }
        if (anzahl>1) comprPosible=true
      }
      if (comprPosible) {
        def comprFactorSek = comprFactor/1000  
        println "$dp.displayName: Konfig " + komprBegin.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " Zeitraum Sek: $comprFactorSek Wert: $comprValue"
        cntNew=0
        if (testRun) {
          println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " (Testlauf)"
          cntNew=timeSeries.size 
        } else {
          println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat)
          cntNew=database.replaceTimeSeries(dp, timeSeries, komprBeg, komprEnd)
        }
        summeComp=summeComp+ (cnt-cntNew)
        summetotal=summetotal+cnt
      }
    }
  }
}

println "Summe Datenzeilen gelöscht:    $summeDel"
def delSum = summetotal - summeComp 
println "Summe Datenzeilen komprimiert: $summeComp von $summetotal  -> $delSum"

use(groovy.time.TimeCategory) {
    def duration = new Date() - jetzt
    print "Laufzeit: Std.: ${duration.hours}, Min.: ${duration.minutes}, Sek.: ${duration.seconds}"
}
Zuletzt geändert von wak am 25.03.2024, 22:44, insgesamt 1-mal geändert.

Benutzeravatar
wak
Beiträge: 262
Registriert: 05.05.2014, 00:21
Hat sich bedankt: 2 Mal
Danksagung erhalten: 32 Mal

Re: Historian Script für Datenlöschung und Komprimierung

Beitrag von wak » 25.03.2024, 22:33

Hallo Historian-Freunde,

das ganze läuft bei mir als Job im Historian jeden Sonntag um 3:00 in der Früh:

Code: Alles auswählen

database.tasks.DelCompr.enable=true
database.tasks.DelCompr.cron="0 0 3 ? * SUN"
database.tasks.DelCompr.script={

     // Datenreihen löschen und komprimieren, V1.0

    // min, max, first, last or avg as value for compression
    def config = [[....
    
    // Testlauf durchführen? Bei einem Testlauf wird die Datenbank nicht verändert.
    // (Ja: true, Nein: false)
    def testRun = true
    ....
}
LG wak
Zuletzt geändert von wak am 25.03.2024, 22:44, insgesamt 1-mal geändert.

Benutzeravatar
wak
Beiträge: 262
Registriert: 05.05.2014, 00:21
Hat sich bedankt: 2 Mal
Danksagung erhalten: 32 Mal

Re: Historian Script für Datenlöschung und Komprimierung

Beitrag von wak » 25.03.2024, 22:44

Hallo Historian-Feunde,

für die Analyse hab ich meist das Script aus dem Historian Wiki verwendet, aber ohne Datum und alle Werte:

https://github.com/mdzio/ccu-historian/ ... eitbereich

Code: Alles auswählen

// Rangliste der Anzahl der Einträge, V1.0
def dpCount =[]
database.dataPoints.each { dp ->
    def cnt=database.getCount(dp, null, null)
    dpCount << [cnt, dp.displayName]
}
println " ANZAHL DATENPUNKT"
dpCount.sort { -it[0] }.each {
    println it[0].toString().padLeft(7) + " " + it[1]
}
Das zeigt die Datenpunkt IDs sortiert absteigend nach den meisten Werten in der Tabelle!

LG wak

Georgee
Beiträge: 158
Registriert: 22.05.2017, 11:58
System: Alternative CCU (auf Basis OCCU)
Hat sich bedankt: 5 Mal
Danksagung erhalten: 4 Mal

Re: Historian Script für Datenlöschung und Komprimierung

Beitrag von Georgee » 26.03.2024, 10:11

Hallo,

die Idee finde ich sehr gut. Erste Auswertung Zahl der Punkte hat bei einem Wert völlige überflüssige 850.000 Datenpunkte gezeigt. Ich werde daran arbeiten..

Bei compress and delete finde ich folgende Fehlermeldung:

Code: Alles auswählen

Bei der Skriptausführung ist ein Fehler aufgetreten:
startup failed:
UserScript: 31: expecting ')', found 'and' @ line 31, column 8.
          and conf[1].substring(0, conf[1].length() - 1).isInteger()
          ^

1 error
Ich bin bei diesen Sprachelementen überfordert und bitte um Hilfe.

VG Georgee
Tinker Board S, aktuelle Version, kleines Funkmodul mit USB-2, USV, ca. 45 Geräte, CUxD, Mail, Programme drucken, ccu-historian mit Highcharts, hm-pdetect

Benutzeravatar
wak
Beiträge: 262
Registriert: 05.05.2014, 00:21
Hat sich bedankt: 2 Mal
Danksagung erhalten: 32 Mal

Re: Historian Script für Datenlöschung und Komprimierung

Beitrag von wak » 26.03.2024, 11:48

Hallo Georgee,

genau diese überfüssigen Datenpunkte hatte ich auch.

Ich habe gerade das Script hier aus dem Posting genommen und ausgeführt und bekommen keinen Fehler!

Hast du am Script etwas angepaßt? Und welche Historian Version verwendest du?

Hier nochmal das Script on Zeilenschaltung an den genannten Fehler-Zeilen:

Code: Alles auswählen

// Datenreihen löschen und komprimieren, V1.0

// min, max, first, last or avg as value for compression
def config = [
              ["*DUTYCYCLE*"          , 365, 10, 60*60*1,   "max" ],    // Std. Max-Wert nach 10 Tagen und nach 365 Tagen löschen
              ["*CARRIER_SENSE*"      , 365, 10, 60*60*1,   "max" ],    // Std. Max-Wert nach 10 Tagen und nach 365 Tagen löschen 
              ["*HUMIDITY"            ,"3Y", 30, 60*60*2,   "avg" ],    // 2 Std. Durchschnitt nach 30 Tagen und nach 3 vollen Jahren löschen
              ["*TEMPERATURE"         ,"3Y", 30, 60*60*2,   "avg" ],    // 2 Std. Durchschnitt nach 30 Tagen und nach 3 vollen Jahren löschen
              ["*ENERGY_COUNTER"      ,"5Y", 30, 60*60*2,   "first" ],  // 2 Std. bei Zähler 1. Wert nach 30 Tagen und nach 5 vollen Jahren 
]


// Testlauf durchführen? Bei einem Testlauf wird die Datenbank nicht verändert.
// (Ja: true, Nein: false)
def testRun = true

// *** Skript ***

def dateFormat = "yyyy-MM-dd"   // standard Timestamp Format
def jetzt =  new Date()               // aktuellen StartTimeStamp speichern für Berechnungen

// 24 Bits für Flag berechnen 
def int bit24 = Math.pow(2,23)   // 8388608;

config.each { conf -> 

  // Löschdatum 5Y oder 3M auf Tage aktuell umrechnen mit 1. des Monats
  def orgConf = conf[1]
  if (conf[1].getClass() == String ) {
    if (conf[1].endsWith("Y") and conf[1].substring(0, conf[1].length() - 1).isInteger()) {
      def yearValue = conf[1].substring(0, conf[1].length() - 1).toInteger()
      def dateTmp = new Date(year: (jetzt.getYear() - yearValue ) , month: 0, date: 1, hours: 0, minutes: 0, seconds: 0)
      conf[1] = jetzt - dateTmp     // berechnete Löschtage setzen für xY
    } else if (conf[1].endsWith("M") and conf[1].substring(0, conf[1].length() - 1).isInteger() ) {
      def monthValue = conf[1].substring(0, conf[1].length() - 1).toInteger()
      def cal=Calendar.getInstance().clearTime()
      cal.add(Calendar.MONTH, monthValue * -1);
      cal.set(Calendar.DAY_OF_MONTH, 1);
      conf[1] = jetzt - cal.getTime() // berechnete Löschtage setzen für xM
    }
    def delDate = jetzt - conf[1]
    delDate.clearTime()
    println conf[0].padRight(40) + " Löschdatum berechnet " + delDate.format(dateFormat) + "  " + orgConf + "  " + conf[1]
  }

  // wildcards in regex umbauen 
  def txtResult = new StringBuffer()
  conf[0].each { ch ->
    switch (ch) {
    case '*':
        // Single '*' matches single dir/file; Double '*' matches sequence of zero or more dirs/files
        txtResult << /[^\/]*/
        break
    case '?':
        // Any character except the normalized file separator ('/')
        txtResult << /[^\/]/
        break
    case ['$', '|', '[', ']', '(', ')', '.', ':', '{', '}', '\\', '^', '+']:
        txtResult << '\\' + ch
        break
    default: txtResult << ch
    }
  }
  conf[0] = txtResult.toString().toUpperCase()
  // println dptxt[0]
}

def comprFactor=0
def dpCount=[]
def summeDel=0
def summeComp=0
def summetotal=0
def lastDPState
database.dataPoints.sort{ it.displayName.toUpperCase() }.each { dp ->

  // check dp.displayName wird in config gefunden
  def found=null
  def delBegin=null
  def delEnd=null
  def komprBegin=null
  def cntNew

  // Datenpunkt in Liste über REGEX suchen
  config.find { con ->
    if (dp.displayName.toUpperCase()==~con[0]) {
      // println  con[0] + " -> " + dp.displayName    // check REGEX Regeln
      found = con
      // beim ersten gefunden, merken und Schleife verlassen
      return true // break
    }
  }
  if (found) {

    // Datenlöschung     ******* 
    if (found[1] > 10) {
      cntNew=0
      // Löschzeitraum bestimmen
      delBegin=database.getFirstTimestamp(dp)
      delEnd=jetzt-found[1]      // found[1] = 2 spalte aus der Konfiguration Tabelle am Anfang
      delEnd.clearTime()         // Zeit auf 00:00:00 stellen

      if (testRun) {
        cntNew=database.getCount(dp, delBegin  , delEnd)
        if (cntNew>0) println "$dp.displayName: $cntNew werden gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat) + " (Testlauf)"
      } else {
        cntNew=database.deleteTimeSeries(dp, delBegin  , delEnd)
        if (cntNew>0) println "$dp.displayName: $cntNew gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat)
      }
      summeDel=summeDel+cntNew
    }

    // Datenkomprimierung   ****** 
    if (found[2] > 5) {
      // Komprimierungs Zeitraum bestimmen
      if (delEnd) {
         komprBegin=delEnd
      } else {
         komprBegin=database.getFirstTimestamp(dp)
      }
      def komprEnd=jetzt-found[2]
      komprEnd.clearTime()    // Zeit auf 00:00:00 stellen

      comprFactor = 1000*found[3]  // Sekunden von Konfigurationtabelle * 1000 auf Millisekungen
      def comprValue = found[4]    // von Konfigurationtalle "AVG", "MIN", "MAX", ...  

      def cnt
  
      // Zeitreihe holen
      def ts=database.getTimeSeriesRaw(dp, komprBegin, komprEnd)
  
      // erste Zeitreihe ohne komprimierung finden
      def komprBeg = komprEnd
      ts.find { pv ->
        if (!(pv.state&bit24)) { 
          komprBeg = pv.timestamp    // neues begin ermittelt
          return true // break
        }
      }
      //println "Erste Timestamp ohne KompFlag: $komprBeg"
  
      // Zeitreihe holen neu ohne bereits komprimierte
      ts=database.getTimeSeriesRaw(dp, komprBeg , komprEnd)
      cnt=ts.size
      
      // Statistik berechnen
      def duration=komprEnd.time - komprBeg.time 
      def min=Double.POSITIVE_INFINITY
      def max=Double.NEGATIVE_INFINITY
      def integr=0
      def intsum=0
      def intwert=0
      def anzahl=0
      def summe=0
      def lastTime=0
      def thisTime=0
      def firstValue=0
      def lastValue=0
      def avg=0
      def previous
      def comprPosible=false
  
      // neue komprimierte Zeitreihe erstellen    
      def timeSeries=new TimeSeries(dp)
      
      ts.each { pv ->
        thisTime=new Date( ( ( Math.floor(pv.timestamp.time/comprFactor)*comprFactor) as long) )
         
        if (thisTime!=lastTime) {
          if (lastTime!=0) {
            duration=lastTime.time - thisTime.time
            // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
            // def avg=integr/duration
            avg = Math.round(summe / anzahl * 10) / 10
            if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10
          
            // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"
          
            switch(comprValue) {            
            case "min": 
              timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
              break;
            case "max": 
              timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
              break;
            case "first": 
              timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
              break;
            case "last": 
              timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
              break;
            case "avg": 
              timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
              break;
            case "integral": 
              timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
              break;
            }
            if (anzahl>1) comprPosible=true
          }
    
          min=pv.value
          max=pv.value
          anzahl=0
          summe=0
          integr=0
          intsum=0
          intwert=pv.value 
          firstValue=pv.value
          lastDPState=pv.state
          lastTime=thisTime
        }
          
        if (pv.value<min) min=pv.value
        if (pv.value>max) max=pv.value
        if (previous!=null) {
          // Teilintegral berechnen: Messwert*Millisekunden
          integr+=previous.value*(pv.timestamp.time-previous.timestamp.time)
          intsum+=(pv.timestamp.time-previous.timestamp.time)
        }
        lastValue=pv.value
        anzahl=anzahl+1
        summe=summe+pv.value
        previous=pv
      }
      if (lastTime!=0) {
         duration=lastTime.time - thisTime.time
        // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
        // def avg=integr/duration
        avg = Math.round(summe / anzahl * 10) / 10
        if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10
      
        // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"
      
        switch(comprValue) {            
        case "min": 
          timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
          break;
        case "max": 
          timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
          break;
        case "first": 
          timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
          break;
        case "last": 
          timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
          break;
        case "avg": 
          timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
          break;
        case "integral": 
          timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
          break;
        }
        if (anzahl>1) comprPosible=true
      }
      if (comprPosible) {
        def comprFactorSek = comprFactor/1000  
        println "$dp.displayName: Konfig " + komprBegin.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " Zeitraum Sek: $comprFactorSek Wert: $comprValue"
        cntNew=0
        if (testRun) {
          println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " (Testlauf)"
          cntNew=timeSeries.size 
        } else {
          println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat)
          cntNew=database.replaceTimeSeries(dp, timeSeries, komprBeg, komprEnd)
        }
        summeComp=summeComp+ (cnt-cntNew)
        summetotal=summetotal+cnt
      }
    }
  }
}

println "Summe Datenzeilen gelöscht:    $summeDel"
def delSum = summetotal - summeComp 
println "Summe Datenzeilen komprimiert: $summeComp von $summetotal  -> $delSum"

use(groovy.time.TimeCategory) {
    def duration = new Date() - jetzt
    print "Laufzeit: Std.: ${duration.hours}, Min.: ${duration.minutes}, Sek.: ${duration.seconds}"
}

Viel Spaß beim Testen
wak

Georgee
Beiträge: 158
Registriert: 22.05.2017, 11:58
System: Alternative CCU (auf Basis OCCU)
Hat sich bedankt: 5 Mal
Danksagung erhalten: 4 Mal

Re: Historian Script für Datenlöschung und Komprimierung

Beitrag von Georgee » 26.03.2024, 13:03

Hallo wak,

danke für die schnelle Antwort.

Keine Änderungen vorgenommen, Historian 3.6

Nochmal versucht, gleicher Fehler

VG Georgee
Tinker Board S, aktuelle Version, kleines Funkmodul mit USB-2, USV, ca. 45 Geräte, CUxD, Mail, Programme drucken, ccu-historian mit Highcharts, hm-pdetect

Benutzeravatar
wak
Beiträge: 262
Registriert: 05.05.2014, 00:21
Hat sich bedankt: 2 Mal
Danksagung erhalten: 32 Mal

Re: Historian Script für Datenlöschung und Komprimierung

Beitrag von wak » 26.03.2024, 13:19

Hallo Georgee,

hier nochmal eine Version ohne "5Y" oder "3M" Umrechnung. Bitte nur mit Zahlen hier arbeiten.

Falls wieder ein Fehler kommt, bitte den Fehlertext posten!

Viele Glück beim Testen!
LG wak

Code: Alles auswählen

// Datenreihen löschen und komprimieren, V1.0

// min, max, first, last or avg as value for compression
def config = [
              ["*DUTYCYCLE*"          , 365, 10, 60*60*1,   "max" ],    // Std. Max-Wert nach 10 Tagen und nach 365 Tagen löschen
              ["*CARRIER_SENSE*"      , 365, 10, 60*60*1,   "max" ],    // Std. Max-Wert nach 10 Tagen und nach 365 Tagen löschen 
              ["*HUMIDITY"            ,1100, 30, 60*60*2,   "avg" ],    // 2 Std. Durchschnitt nach 30 Tagen und nach 3 vollen Jahren löschen
              ["*TEMPERATURE"         ,1100, 30, 60*60*2,   "avg" ],    // 2 Std. Durchschnitt nach 30 Tagen und nach 3 vollen Jahren löschen
              ["*ENERGY_COUNTER"     ,1600, 30, 60*60*2,   "first" ],  // 2 Std. bei Zähler 1. Wert nach 30 Tagen und nach 5 vollen Jahren 
]


// Testlauf durchführen? Bei einem Testlauf wird die Datenbank nicht verändert.
// (Ja: true, Nein: false)
def testRun = true

// *** Skript ***

def dateFormat = "yyyy-MM-dd"   // standard Timestamp Format
def jetzt =  new Date()               // aktuellen StartTimeStamp speichern für Berechnungen

// 24 Bits für Flag berechnen 
def int bit24 = Math.pow(2,23)   // 8388608;

config.each { conf -> 

  // wildcards in regex umbauen 
  def txtResult = new StringBuffer()
  conf[0].each { ch ->
    switch (ch) {
    case '*':
        // Single '*' matches single dir/file; Double '*' matches sequence of zero or more dirs/files
        txtResult << /[^\/]*/
        break
    case '?':
        // Any character except the normalized file separator ('/')
        txtResult << /[^\/]/
        break
    case ['$', '|', '[', ']', '(', ')', '.', ':', '{', '}', '\\', '^', '+']:
        txtResult << '\\' + ch
        break
    default: txtResult << ch
    }
  }
  conf[0] = txtResult.toString().toUpperCase()
  // println dptxt[0]
}
 
def comprFactor=0
def dpCount=[]
def summeDel=0
def summeComp=0
def summetotal=0
def lastDPState
database.dataPoints.sort{ it.displayName.toUpperCase() }.each { dp ->

  // check dp.displayName wird in config gefunden
  def found=null
  def delBegin=null
  def delEnd=null
  def komprBegin=null
  def cntNew

  // Datenpunkt in Liste über REGEX suchen
  config.find { con ->
    if (dp.displayName.toUpperCase()==~con[0]) {
      // println  con[0] + " -> " + dp.displayName    // check REGEX Regeln
      found = con
      // beim ersten gefunden, merken und Schleife verlassen
      return true // break
    }
  }
  if (found) {

    // Datenlöschung     ******* 
    if (found[1] > 10) {
      cntNew=0
      // Löschzeitraum bestimmen
      delBegin=database.getFirstTimestamp(dp)
      delEnd=jetzt-found[1]      // found[1] = 2 spalte aus der Konfiguration Tabelle am Anfang
      delEnd.clearTime()         // Zeit auf 00:00:00 stellen

      if (testRun) {
        cntNew=database.getCount(dp, delBegin  , delEnd)
        if (cntNew>0) println "$dp.displayName: $cntNew werden gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat) + " (Testlauf)"
      } else {
        cntNew=database.deleteTimeSeries(dp, delBegin  , delEnd)
        if (cntNew>0) println "$dp.displayName: $cntNew gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat)
      }
      summeDel=summeDel+cntNew
    }

    // Datenkomprimierung   ****** 
    if (found[2] > 5) {
      // Komprimierungs Zeitraum bestimmen
      if (delEnd) {
         komprBegin=delEnd
      } else {
         komprBegin=database.getFirstTimestamp(dp)
      }
      def komprEnd=jetzt-found[2]
      komprEnd.clearTime()    // Zeit auf 00:00:00 stellen

      comprFactor = 1000*found[3]  // Sekunden von Konfigurationtabelle * 1000 auf Millisekungen
      def comprValue = found[4]    // von Konfigurationtalle "AVG", "MIN", "MAX", ...  

      def cnt
  
      // Zeitreihe holen
      def ts=database.getTimeSeriesRaw(dp, komprBegin, komprEnd)
  
      // erste Zeitreihe ohne komprimierung finden
      def komprBeg = komprEnd
      ts.find { pv ->
        if (!(pv.state&bit24)) { 
          komprBeg = pv.timestamp    // neues begin ermittelt
          return true // break
        }
      }
      //println "Erste Timestamp ohne KompFlag: $komprBeg"
  
      // Zeitreihe holen neu ohne bereits komprimierte
      ts=database.getTimeSeriesRaw(dp, komprBeg , komprEnd)
      cnt=ts.size
      
      // Statistik berechnen
      def duration=komprEnd.time - komprBeg.time 
      def min=Double.POSITIVE_INFINITY
      def max=Double.NEGATIVE_INFINITY
      def integr=0
      def intsum=0
      def intwert=0
      def anzahl=0
      def summe=0
      def lastTime=0
      def thisTime=0
      def firstValue=0
      def lastValue=0
      def avg=0
      def previous
      def comprPosible=false
  
      // neue komprimierte Zeitreihe erstellen    
      def timeSeries=new TimeSeries(dp)
      
      ts.each { pv ->
        thisTime=new Date( ( ( Math.floor(pv.timestamp.time/comprFactor)*comprFactor) as long) )
         
        if (thisTime!=lastTime) {
          if (lastTime!=0) {
            duration=lastTime.time - thisTime.time
            // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
            // def avg=integr/duration
            avg = Math.round(summe / anzahl * 10) / 10
            if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10
          
            // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"
          
            switch(comprValue) {            
            case "min": 
              timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
              break;
            case "max": 
              timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
              break;
            case "first": 
              timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
              break;
            case "last": 
              timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
              break;
            case "avg": 
              timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
              break;
            case "integral": 
              timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
              break;
            }
            if (anzahl>1) comprPosible=true
          }
    
          min=pv.value
          max=pv.value
          anzahl=0
          summe=0
          integr=0
          intsum=0
          intwert=pv.value 
          firstValue=pv.value
          lastDPState=pv.state
          lastTime=thisTime
        }
          
        if (pv.value<min) min=pv.value
        if (pv.value>max) max=pv.value
        if (previous!=null) {
          // Teilintegral berechnen: Messwert*Millisekunden
          integr+=previous.value*(pv.timestamp.time-previous.timestamp.time)
          intsum+=(pv.timestamp.time-previous.timestamp.time)
        }
        lastValue=pv.value
        anzahl=anzahl+1
        summe=summe+pv.value
        previous=pv
      }
      if (lastTime!=0) {
         duration=lastTime.time - thisTime.time
        // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
        // def avg=integr/duration
        avg = Math.round(summe / anzahl * 10) / 10
        if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10
      
        // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"
      
        switch(comprValue) {            
        case "min": 
          timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
          break;
        case "max": 
          timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
          break;
        case "first": 
          timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
          break;
        case "last": 
          timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
          break;
        case "avg": 
          timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
          break;
        case "integral": 
          timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
          break;
        }
        if (anzahl>1) comprPosible=true
      }
      if (comprPosible) {
        def comprFactorSek = comprFactor/1000  
        println "$dp.displayName: Konfig " + komprBegin.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " Zeitraum Sek: $comprFactorSek Wert: $comprValue"
        cntNew=0
        if (testRun) {
          println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " (Testlauf)"
          cntNew=timeSeries.size 
        } else {
          println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat)
          cntNew=database.replaceTimeSeries(dp, timeSeries, komprBeg, komprEnd)
        }
        summeComp=summeComp+ (cnt-cntNew)
        summetotal=summetotal+cnt
      }
    }
  }
}

println "Summe Datenzeilen gelöscht:    $summeDel"
def delSum = summetotal - summeComp 
println "Summe Datenzeilen komprimiert: $summeComp von $summetotal  -> $delSum"

use(groovy.time.TimeCategory) {
    def duration = new Date() - jetzt
    print "Laufzeit: Std.: ${duration.hours}, Min.: ${duration.minutes}, Sek.: ${duration.seconds}"
}

Georgee
Beiträge: 158
Registriert: 22.05.2017, 11:58
System: Alternative CCU (auf Basis OCCU)
Hat sich bedankt: 5 Mal
Danksagung erhalten: 4 Mal

Re: Historian Script für Datenlöschung und Komprimierung

Beitrag von Georgee » 26.03.2024, 15:24

Danke, hat jetzt geklappt:

Code: Alles auswählen

2023-01-09 bis: 2024-02-25 (Testlauf)
Summe Datenzeilen gelöscht:    31888
Summe Datenzeilen komprimiert: 3983383 von 4105260  -> 121877
Laufzeit: Std.: 0, Min.: 18, Sek.: 2
Keine Änderung vorgenommen. Die Fleißarbeit beginnt ja erst jetzt.

Danke nochmal
Georgee
Tinker Board S, aktuelle Version, kleines Funkmodul mit USB-2, USV, ca. 45 Geräte, CUxD, Mail, Programme drucken, ccu-historian mit Highcharts, hm-pdetect

Benutzeravatar
wak
Beiträge: 262
Registriert: 05.05.2014, 00:21
Hat sich bedankt: 2 Mal
Danksagung erhalten: 32 Mal

Re: Historian Script für Datenlöschung und Komprimierung

Beitrag von wak » 26.03.2024, 15:31

super, danke für die Rückmeldung!
4 mio gesparrt wäre ja schon etwas ... :-)
Am besten mal auf einem Spielsystem machen, so habs ich gemacht. Historian ohne CCU Anbindung auf Windows installiert in einem Verzeichnis, backup eingespielt und rumprobiert, läuft dann auch super schnell und man kriegt schnell ein Gefühl, was man wie optimieren kann. Am besten langsam erstmal rantasten. Würde mich freuen wenn du dein Endergebnis uns mitteilen könntest. Und vielleicht hast du andere Homematic Geräde, und kannst noch Standard-Einstellungen beisteuern ..
lg wak

Georgee
Beiträge: 158
Registriert: 22.05.2017, 11:58
System: Alternative CCU (auf Basis OCCU)
Hat sich bedankt: 5 Mal
Danksagung erhalten: 4 Mal

Re: Historian Script für Datenlöschung und Komprimierung

Beitrag von Georgee » 29.03.2024, 11:07

Hallo,

die Scripte sind sehr hilfreich.

Die Rangfolge des gespeicherten Werte nach Anzahl erlaubt ersteinmal, die Datenbank des produktiven Systems aufzuräumen (warum Feuchtewert im Keller von 2022?). Korrektur von vielen Datenpunkten - vor allem mit dem Werkzeug, die Historie ausgewählter Datenpunkte zu löschen, hat sehr geholfen. Datenbank von 10 GB auf 500 MB!!!

Die weitere Funktion "Datenreihen löschen und komprimieren" zeigt unter Windows, lokale Datenbank, zwar viele Datenpunkte für die Bearbeitung an, aber löscht sie nicht, trotz Schreibzugriff und Testlauf false.

Im produktiven System
läuft diese Funktion bei gestopptem Historian nicht
bei aktivem Historian im Testlauf ok
bei aktivem Historian Schreibzugriff/false fehlerfrei mit folgenden Parameter

Code: Alles auswählen

def config = [
["*.Aussentuer:1.DEW_POINT*"    , 3, 10, 60*60*8,   "avg" ],    // avg, 8h, 10d, 3d 
["*.Aussentuer:1.ABS_HUMIDITY*"    , 3, 10, 60*60*8,   "avg" ],    // avg, 8h, 10d, 3d
//["*NB.aktuell.VALUE*"    , 3, 10, 60*60*24,   "avg" ],    // avg, 24h, 10d, 3d
//["*KE.Bad.Temperatur:1.DEW_POINT*"    , 3, 10, 60*60*8,   "avg" ], //avg, 8h, 10d, 3d
//["*FREQUENCY*"          , 3, 10, 60*60*1,   "max" ],    // Average, 10d, 3d
//["*VOLTAGE"      , 3, 10, 60*60*1,   "max" ],    // Average, 10d, 3d
//["*CURRENT"            ,3, 30, 60*60*2,   "avg" ],    // Average, 10d, 3d
//["*USV.TIMER"         ,3, 30, 60*60*2,   "max" ],    // Average, 10d, 3d
["*Counter.PV.Energy:5.POWER*"    , 720, 90, 60*60*2,   "avg" ],    // avg, 2h, 90d, 720d
]
aber dieses Ergebnis

Code: Alles auswählen

Summe Datenzeilen gelöscht:    0
Summe Datenzeilen komprimiert: 682401 von 685445  -> 3044
Laufzeit: Std.: 0, Min.: 3, Sek.: 45
Anschließender Check auf geänderte Zahl des Vorkommens bleibt unverändert, d.h. kein Löschen

Vielleicht als Hinweis für die Datenbankkonfiguration: vielleicht kann auch die Zahl der Tage, die Daten gespeichert bleiben, aufgenommen werden und z.B. nach tgl. Backup entsprechend angepasst werden

VG
Georgee
Tinker Board S, aktuelle Version, kleines Funkmodul mit USB-2, USV, ca. 45 Geräte, CUxD, Mail, Programme drucken, ccu-historian mit Highcharts, hm-pdetect

Antworten

Zurück zu „CCU-Historian“