InfluxDB v2 (Flux) Setup for Energy and Power Data from Iotawatt

This are my notes on setting up InfluxDBv2 for storing Iotawatt power readings. Specifically I needed to:

  • Transition a live site from InfluxDB1 to InfluxDB2
  • Setup InfluxDBv2 to use InfluxDBv1 api and auth, so that ‘old’ IotaWatts can continue to post data
  • Setup tasks (pka Continuous Queries) to handle downsampling
  • Setup retention periods and ‘prune’ existing data (1 billion points)

Install v2, Transfer Data (v1 to v2)

Good instructions on Influx website. Export v1 to line protocol, copy file, import from line protocol to v2.

Doing a everything-dump led to huge (100Gb) files and slow performance. A python script to batch (use -start, -end, -out) the processing is better.

To import, read the line-protocol files into a buckets. I did 1 week files, grouped by years.

for FILE in ./2019-*.inf; do influx write --bucket iotawatt --org phisaver -f $FILE --errors-file $FILE.error --skipHeader=5 ; done

A rough check (usewc -l) and

Data Schema

InfluxDB2 uses buckets structured as:

ORGANISATION

  • DASHBOARDS
  • TASKS
  • USERS
  • BUCKETS
    • Retention period (applies to whole bucket)
    • Data (records with fields, tags, timestamps)

BucketDatabaseRetention PeriodPeriodOrgNote
iotawattphisaver1 week10s to 60sphisaver*
energyphisaverinf1 hourphisaver*
powerphisaverinf5 minutesphisaver*

* the organisation could change to relate to fleets (eg. uq, cc, rc, general)

The DBRP mapping service uses the database and retention policy specified in 1.x compatibility API requests to route operations to a bucket.

Influx docs

This means we have:

  • v1: [ database, retention policy ] which is mapped to
  • v2: [ bucket ] which belongs to an organisation

Within both a [ database, retention policy ] and a [ bucket ] there are multiple measurements .

Within v1, I had a big [ phisaver, inf ] database with measurements of iotawatt,and energy.

In v2, I want [ iotawatt, 1w] bucket to record ‘raw’ readings, [ power, inf ], [ energy, inf] buckets to record downsampled data.

In v2, the [ iotawatt ] bucket has a Measurement iotawatt. The [power] and [energy] buckets have Measurements called either iotawatt or something else!

Let’s skip ahead to see the results of these three buckets for 12 hours of one sensor

  • Top is ‘raw’ data in iotawatt (1m)
  • Middle is downsampled power (5m): some loss of shape
  • Bottom is downsample energy (1h): only the total (i.e. area under curve) is now useful.
Screenshot from 2021 08 18 17 46 21
Screenshot from 2021 08 18 17 46 21

Exploring the schema

Use the docs:

import "influxdata/influxdb/schema"

schema.measurementFieldKeys(
  bucket: "example-bucket",
  measurement: "example-measurement",
  start: -30d
)

Note the start defaults to start=30d.

Other functions are useful in checking tags. For example

import "influxdata/influxdb/schema"

schema.measurementTagKeys(
  bucket: "power",
  measurement: "iotawatt"
)

Which shows us we have “device”, “sensor” and “units” tags as expected.

Screenshot from 2021 08 18 17 29 52
Screenshot from 2021 08 18 17 29 52

To find all devices

import "influxdata/influxdb/schema"

schema.tagValues(
  bucket: "iotawatt",
  tag: "device",
  start: -1y    // default is only -30d
)

Let’s try another method with basic syntax, just for fun.

from(bucket: "noaa")
  |> range(start: 2021-06-01T00:00:00Z, stop: 2021-06-02T00:00:00Z)
  |> filter(fn: (r) => r["_measurement"] == "h2o_quality")
  |> group()
  |> keyValues(keyColumns: ["randtag"])

The output of the filter function is a set of tables, one for each combination of the tags (randtag and location, in this case). Hence the default is to group(*by-every-tag*).

The group() function ungroups everything.

Check the imported data

I checked the lines in a line protocol file and then counted points in the v2 database. They did not match. Eventually I found the same point (i.e. same timestamp) was included multiple (!) times in the line protocol line. See below.

Screenshot from 2021 08 17 10 05 23
Line number versus Timestamp for a single field. Note times are repeated.

Instead, we can use frequency count:

SELECT count("Watts") FROM "iotawatt" WHERE ("sensor" = 'Net') AND $timeFilter GROUP BY time(1d), "device" fill(null)

and compare this v1 to v2:

from(bucket: "iotawatt")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "iotawatt")
  |> filter(fn: (r) => r["sensor"] == "Net")
  |> filter(fn: (r) => r["_field"] == "Watts")
  |> group(columns: ["device"])
  |> aggregateWindow(every: 1d, fn: count)
Screenshot from 2021 08 17 10 09 56
Screenshot from 2021 08 17 10 09 56

Once off mega tasks

Transfer from bucket/measurement to another

from(bucket: "iotawatt")

  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "energy")
  |> filter(fn: (r) => r["_field"] == "kwh")
  |> to(bucket: "energy", org:"phisaver")

Counting lotsa points

The imported database is too big. It has 1 billion points. It explodes (memory) on large queries, even simple ones. For example, to get the total number of points, a “SELECT count(*) from iotawatt” fails. Instead we can batch queries by date and device. For example (v1):

       q = 'show tag values from iotawatt with key = "device"'
       r = self.client.query(q, database=self.args.database)
       devices = [i['value'] for i in list(r.get_points())]

       for device in devices:

            interval_start = pytz.timezone(
                "Australia/Brisbane").localize(dt(2018, 1, 1))

            while interval_start < pytz.timezone("Australia/Brisbane").localize(dt.now()):
                interval_start += timedelta(days=30)
                interval_end = interval_start + timedelta(days=30)
                q = "SELECT COUNT(Watts) FROM iotawatt"
                q += f" WHERE units = 'Watts' AND device='{device}' "
                q += f" AND time > '{pyrfc3339.generate(interval_start,utc=False)}'"
                q += f" AND time <= '{pyrfc3339.generate(interval_end,utc=False)}'"
                q += " GROUP BY time(1h), sensor, device fill(0)"
                r = self.client.query(q, database=self.args.database)

Downsampling

I want to downsample from “iotawatt” (10s to 60s interval) to “power” (5m intervals). Ideally, a single Task can do this, but alas, there are too many points and it explodes.

I first tried using the python client but it was really slow on writes. Py

Instead I wrote the queries (example below) to seperate files and then, on the server, ran

for FILE in *.query; do influx query --org phisaver -f $FILE; done

An example query for downsampling

from(bucket: "phisaver/autogen")
// change in each file
|> range(start: 2021-06-28T00:00:00+10:00, stop: 2021-12-27T00:00:00+10:00)
|> filter(fn: (r) =>(r["_measurement"] == "iotawatt"))
|> filter(fn: (r) =>(r["_field"] == "Watts"))
|> filter(fn: (r) =>(r["device"] == "uq85"))
// removes random tags not needed now
|> keep(columns: ["_time","_field","_value","device","sensor","units","_measurement"])
|> aggregateWindow(fn: mean, every: 5m, createEmpty: false)
|> to(bucket: "power", org: "phisaver")

This works but is not pretty. Must be a better way?

For new data, a Task with a limited time span (e.g. 24h) should be about to handle all devices at once.

Power to Energy : Downsampling with a twist

Same approach: use seperate queries to bin time.

from(bucket: "iotawatt")
|> range(start: 2020-02-26T00:00:00+10:00, stop: 2020-03-04T00:00:00+10:00)
|> filter(fn: (r) =>(r["_measurement"] == "iotawatt"))
|> filter(fn: (r) =>(r["_field"] == "Watts"))
// strip extraneous tags
|> keep(columns: ["_time","_field","_value","device","sensor","_measurement"])
// rename a field
|> set(key:"_field", value: "kWh")
// W -> Wh: integral is better but mean ok
|> aggregateWindow(fn: mean, every: 1h, createEmpty: false)
// Wh -> kWh
|> map(fn: (r) => ({ r with _value: r._value / 1000.0}))
|> to(bucket: "energy", org: "phisaver")

Deleting

I found it easiest from the cli on the server:

influx delete --bucket "phisaver/autogen" --start 2015-09-01T00:00:00+10:00 --stop 2021-12-31T00:00:00+10:00 --org phisaver -p 'device="apbee" and _measurement=iotawatt'

To “clean” ridic data (like 1.1e1000 Watts), you cannot delete “values greater than X” (reference). So you first find the day and then delete a time range. Get the

// Find out-of-range values (slow)
from(bucket: "power")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "iotawatt")
  |> filter(fn: (r) => r["_field"] == "Watts")
  |> filter(fn: (r) => r["_value"] > 1000000.0 or r["_value"] < -1000000.0)
  |> group(columns: ["device"])
  // optionally, just show days
  |> aggregateWindow(every: 1d, fn: count, createEmpty: false)

And then delete the whole day/s will dud data:

influx delete --bucket power --org phisaver -p 'device=rc352 and _measurement=iotawatt' --start 2021-08-25T00:00:00+10:00 --stop  2021-08-26T00:00:00+10:00

Flux and InfluxQL

Flux is great but a bit hard to get started. I needed to read the docs and use the sample doc at influxdata.com.

Custom Functions

//
// Often can get the average kWh/day, then multiple values by 24 to get kWh/day efficiently
// This function helps
//
daily = (tables=<-) => 
  tables
    |> map(fn: (r) => ({
      r with
      _value: r._value * 24.0
    })
  ) 

Handling Empty Tables

There are ways to handle null values (Googlable) but what if you need to pivot a table with a missing (empty) row? You can do this:

// Export and Import are in two rows: pivot to columns for calcs
|> pivot(       
    rowKey:["_start"],
    columnKey: ["sensor"],
    valueColumn: "_value")
// How we have a single row with columns Export, Import and _value
// However, Export might be empty. We need to set it to 0 for future calcs
|> map(fn: (r) => ({
    r with 
      Import :   r.Import * 24.0  , // will always exist
      Export : if exists r.Export then  r.Export * 24.0 else 0.0 , // *** set to 0 if empty ***
  })
)

Calculating between rows to find fraction of solar contribution

Unioning Recent (finely sampled) and Historical (downsampled) Data

Below is a working approach but it fails in Grafana if the ‘dsCrossOverTime’ is not within the window being viewed. It could be adapted.

import "math"
import "experimental"


// iotawatt bucket has a RP of 1 week so exclude downsamples (power) data after that
dsCrossoverTime = experimental.subDuration(
  d: 7d,
  from: now(),
)


// Stop at the first of crossover or window end
// Too complex - dont use
ds_f = math.mMin(x: float(v:uint(v:dsCrossoverTime)), y: float(v:uint(v: v.timeRangeStop)))
ds_uint = uint(v: ds_f)
ds_time = time(v: ds_uint)

downsampled=from(bucket: "power")
  |> range(start: v.timeRangeStart, stop: dsCrossoverTime)
  |> filter(fn: (r) => r["_measurement"] == "iotawatt" and 
                       r["_field"] == "Watts" and
                       r.device == "${device}" 
  )
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> keep(columns:["_time","sensor","_value"])


  recent   = from(bucket: "iotawatt")
  |> range(start: dsCrossoverTime, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "iotawatt" and 
                       r["_field"] == "Watts" and
                       r.device == "${device}"       
                           
  )
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> keep(columns:["_time","sensor","_value"])
    
  union(tables:[downsampled,recent])
  |> yield()

One thought on “InfluxDB v2 (Flux) Setup for Energy and Power Data from Iotawatt

  1. Hi Brett. Thanks to your site – I’ve been working on setting up my Iotawatt system with Influxdb 2.0 and grafana. Your Influxdb v1 examples were very helpful. I’m hoping to read more about getting v2 up and running. Your live phisaver site is not showing any data. Is this part of the v1 to v2 transition?

Leave a Reply

Your email address will not be published.