diff --git a/data/data.go b/data/data.go new file mode 100644 index 0000000..4c2455a --- /dev/null +++ b/data/data.go @@ -0,0 +1,94 @@ +package data + +import ( + "database/sql" + "time" + + "git.fuyu.moe/5GPowerQuality/parser/internal/model" + "git.ultraware.nl/NiseVoid/qb" + "git.ultraware.nl/NiseVoid/qb/qc" + "git.ultraware.nl/NiseVoid/qb/qf" +) + +// GetMeterID gets the meter ID, if there is none yet a new record is add to meter +func GetMeterID(source Source, meterID string) (id int) { + me := model.Meter() + q := me.Select(me.ID). + Where( + qc.Eq(me.UID, meterID), + qc.Eq(me.Source, source), + ) + + err := db.QueryRow(q).Scan(&id) + switch err { + case nil: + case sql.ErrNoRows: + iq := me.Insert(me.Name, me.UID, me.Source). + Values(``, meterID, source) + + err = db.QueryRow(qb.Returning(iq, me.ID)).Scan(&id) + if err != nil { + panic(err) + } + default: + panic(err) + } + + return +} + +// GetStartDate gets the last measurement date +func GetStartDate(source Source) time.Time { + m, me := model.Measurement(), model.Meter() + + q := m.Select(qf.Max(m.Time)). + InnerJoin(me.ID, m.MeterID). + Where(qc.Eq(me.Source, source)) + + var t *time.Time + err := db.QueryRow(q).Scan(&t) + if (err == nil && t == nil) || err == sql.ErrNoRows { + return time.Date(2018, 4, 1, 0, 0, 0, 0, time.Local) + } + if err != nil { + panic(err) + } + + *t = t.Add(time.Second) + + return *t +} + +func InsertSets(sets Sets) { + if len(sets) == 0 { + return + } + + m := model.Measurement() + q := m.Insert(m.Time, m.MeterID, + m.CGem1, m.CGem2, m.CGem3, + m.Ep1, m.Ep2, m.Ep3, + m.IGem1, m.IGem2, m.IGem3, + m.IMax1, m.IMax2, m.IMax3, + m.PMax1, m.PMax2, m.PMax3, + m.SMax1, m.SMax2, m.SMax3, + m.UGem1, m.UGem2, m.UGem3, + ) + + for k, v := range sets { + q.Values(k.Time, k.Meter, + v[`CGem1`], v[`CGem2`], v[`CGem3`], + v[`EP1`], v[`EP2`], v[`EP3`], + v[`IGem1`], v[`IGem2`], v[`IGem3`], + v[`IMax1`], v[`IMax2`], v[`IMax3`], + v[`PMax1`], v[`PMax2`], v[`PMax3`], + v[`SMax1`], v[`SMax2`], v[`SMax3`], + v[`UGem1`], v[`UGem2`], v[`UGem3`], + ) + } + + err := db.Exec(q) + if err != nil { + panic(err) + } +} diff --git a/db.go b/data/db.go similarity index 81% rename from db.go rename to data/db.go index 35fb976..2806c46 100644 --- a/db.go +++ b/data/db.go @@ -1,15 +1,18 @@ -package main +package data import ( "database/sql" "git.fuyu.moe/5GPowerQuality/parser/internal/migrations" "git.ultraware.nl/NiseVoid/qb/driver/autoqb" + "git.ultraware.nl/NiseVoid/qb/qbdb" _ "github.com/lib/pq" // PostgreSQL driver ) -func initDB() { +var db *qbdb.DB + +func InitDB() { d, err := sql.Open(`postgres`, `host=/tmp dbname=power_quality`) if err != nil { panic(err) diff --git a/data/type.go b/data/type.go new file mode 100644 index 0000000..b39ecf5 --- /dev/null +++ b/data/type.go @@ -0,0 +1,27 @@ +package data + +import "time" + +// Source is a source of power quality data +type Source uint8 + +// All known source +const ( + SourceFortop = iota + 1 + SourceEnvitron +) + +func (s Source) String() string { + return []string{``, `Fortop`, `Envitron`}[s] +} + +// Sets are multiple sets of date +type Sets map[Key]Set + +// Set is a set of data +type Set map[string]float64 + +type Key struct { + Time time.Time + Meter int +} diff --git a/envitron/bind.go b/envitron/bind.go new file mode 100644 index 0000000..1752c4d --- /dev/null +++ b/envitron/bind.go @@ -0,0 +1,47 @@ +package envitron + +import ( + "encoding/json" + "io" + + "github.com/labstack/echo" +) + +type jsonError struct { + Message string `json:"error"` +} + +func (e jsonError) Error() string { + return e.Message +} + +type bindError struct { + Message string `json:"error"` + Offset int64 `json:"offset"` + Field string `json:"field,omitempty"` +} + +func (e bindError) Error() string { + return e.Message +} + +type jsonBinder struct{} + +func (b jsonBinder) Bind(i interface{}, c echo.Context) error { + err := json.NewDecoder(c.Request().Body).Decode(i) + if err == nil { + return nil + } + + switch e := err.(type) { + case *json.SyntaxError: + return bindError{Message: `Syntax error`, Offset: e.Offset, Field: ``} + case *json.UnmarshalTypeError: + return bindError{Message: `Type error`, Offset: e.Offset, Field: e.Field} + } + + if err == io.ErrUnexpectedEOF || err == io.EOF { + return jsonError{`Unexpected EOF`} + } + return jsonError{`Unknown error occurred while parsing JSON`} +} diff --git a/envitron/receive.go b/envitron/receive.go new file mode 100644 index 0000000..a215e3e --- /dev/null +++ b/envitron/receive.go @@ -0,0 +1,62 @@ +package envitron + +import ( + "strconv" + "time" + + "git.fuyu.moe/5GPowerQuality/parser/data" + "git.fuyu.moe/5GPowerQuality/parser/shared" + "github.com/labstack/echo" +) + +func ReceiveData() { + e := echo.New() + e.Binder = jsonBinder{} + + e.POST(`/`, handleReq) + + e.Start(`:44444`) +} + +type input struct { + Time shared.UnixTimestamp `json:"timestamp"` + UID string `json:"uuid"` + Modules []module `json:"modules"` +} + +type module struct { + Port int `json:"port"` + Voltage float64 `json:"volt"` + Ampere float64 `json:"ampere"` +} + +func handleReq(c echo.Context) error { + var i []input + err := c.Bind(&i) + if err != nil { + return c.JSON(400, err) + } + + sets := data.Sets{} + for _, v := range i { + mID := data.GetMeterID(data.SourceEnvitron, v.UID) + key := data.Key{Meter: mID, Time: time.Time(v.Time)} + if _, ok := sets[key]; !ok { + sets[key] = data.Set{} + } + + for _, m := range v.Modules { + if m.Port == 4 { + continue + } + p := strconv.Itoa(m.Port) + + sets[key][`UGem`+p] = m.Voltage + sets[key][`IGem`+p] = m.Ampere + } + } + + data.InsertSets(sets) + + return c.JSONBlob(200, []byte(`{"success": true}`)) +} diff --git a/fortop/fetch.go b/fortop/fetch.go new file mode 100644 index 0000000..e551601 --- /dev/null +++ b/fortop/fetch.go @@ -0,0 +1,73 @@ +package fortop + +import ( + "encoding/xml" + "net/http" + "strconv" + "time" + + "git.fuyu.moe/5GPowerQuality/parser/data" + "golang.org/x/net/html/charset" +) + +var start time.Time + +func FetchData() { + if start.IsZero() { + start = data.GetStartDate(data.SourceFortop) + } + + end := start.Add(time.Hour) + + resp := getFortopData(start, end) + + sets := data.Sets{} + for _, meter := range resp { + mID := data.GetMeterID(data.SourceFortop, meter.MeterID) + + for _, v := range meter.Meetwaarde { + valName := v.Naam + + for _, r := range v.Range { + key := data.Key{Meter: mID, Time: time.Time(r.Date)} + if _, ok := sets[key]; !ok { + sets[key] = data.Set{} + } + + sets[key][valName] = r.Value + } + } + } + + data.InsertSets(sets) + + if end.Before(time.Now()) { + start = end.Add(time.Second) + return + } + + time.Sleep(time.Minute) + start = data.GetStartDate(data.SourceFortop) +} + +func getFortopData(startDate, endDate time.Time) []Meter { + url := `https://energy4all.energyportal.online/index?p5g=S_1_` + strconv.FormatInt(startDate.Unix(), 10) + `_` + strconv.FormatInt(endDate.Unix(), 10) + resp, err := http.Get(url) + if err != nil { + panic(err) + } + if resp.StatusCode != 200 { + panic(`Got status code:` + strconv.Itoa(resp.StatusCode)) + } + + decoder := xml.NewDecoder(resp.Body) + decoder.CharsetReader = charset.NewReaderLabel + + var data FortopFormat + err = decoder.Decode(&data) + if err != nil { + panic(err) + } + + return data.Trafo.Meter +} diff --git a/fortop/type.go b/fortop/type.go new file mode 100644 index 0000000..127507c --- /dev/null +++ b/fortop/type.go @@ -0,0 +1,36 @@ +package fortop + +import ( + "encoding/xml" + + "git.fuyu.moe/5GPowerQuality/parser/shared" +) + +// FortopFormat .. +type FortopFormat struct { + XMLName xml.Name `xml:"Trafo-Details"` + Trafo struct { + TrafoNummer int `xml:"trafonummer"` + Meter []Meter `xml:"meter"` + } `xml:"trafo"` +} + +// Meter contains all data from one specific meter +type Meter struct { + MeterID string `xml:"meter-id"` + StartDate shared.UnixTimestamp `xml:"startdate"` + EndDate shared.UnixTimestamp `xml:"enddate"` + Meetwaarde []Meetwaarde `xml:"meetwaarde"` +} + +// Meetwaarde contains the data that is not pointless garbage +type Meetwaarde struct { + Naam string `xml:"naam"` + Range []Range `xml:"range"` +} + +// Range is a single set of data +type Range struct { + Date shared.UnixTimestamp `xml:"date"` + Value float64 `xml:"value"` +} diff --git a/internal/migrations/files/0005.sql b/internal/migrations/files/0005.sql new file mode 100644 index 0000000..073943f --- /dev/null +++ b/internal/migrations/files/0005.sql @@ -0,0 +1,8 @@ +ALTER TABLE meter + RENAME fortop_uid TO uid; + +ALTER TABLE meter + ADD source smallint NOT NULL DEFAULT 1; + +ALTER TABLE meter + ALTER source DROP DEFAULT; diff --git a/internal/model/db.json b/internal/model/db.json index 31c3238..48384f2 100644 --- a/internal/model/db.json +++ b/internal/model/db.json @@ -121,13 +121,7 @@ }, { "name": "public.meter", - "alias": "me", "fields": [ - { - "name": "fortop_uid", - "data_type": "character varying", - "size": 50 - }, { "name": "id", "data_type": "integer", @@ -137,6 +131,16 @@ "name": "name", "data_type": "character varying", "size": 255 + }, + { + "name": "source", + "data_type": "smallint", + "size": 2 + }, + { + "name": "uid", + "data_type": "character varying", + "size": 50 } ] } diff --git a/main.go b/main.go index 789e126..27bc95c 100644 --- a/main.go +++ b/main.go @@ -1,24 +1,12 @@ package main import ( - "database/sql" - "encoding/xml" - "fmt" - "net/http" - "strconv" - "time" - - "git.fuyu.moe/5GPowerQuality/parser/internal/model" + "git.fuyu.moe/5GPowerQuality/parser/data" + "git.fuyu.moe/5GPowerQuality/parser/envitron" + "git.fuyu.moe/5GPowerQuality/parser/fortop" "git.fuyu.moe/Fuyu/flog" - "git.ultraware.nl/NiseVoid/qb" - "git.ultraware.nl/NiseVoid/qb/qbdb" - "git.ultraware.nl/NiseVoid/qb/qc" - "git.ultraware.nl/NiseVoid/qb/qf" - "golang.org/x/net/html/charset" ) -var db *qbdb.DB - func main() { defer recoverFunc() setLogger() @@ -26,11 +14,10 @@ func main() { flog.Info(`Starting`) catchSignals() - initDB() + data.InitDB() - for { - fetchData() - } + go fetchFortopData() + envitron.ReceiveData() } func recoverFunc() { @@ -40,157 +27,11 @@ func recoverFunc() { } } -var start time.Time - -func fetchData() { - defer recoverFunc() - - if start.IsZero() { - start = GetStartDate() - } - - end := start.Add(time.Hour) - - data := GetFortopData(start, end) - ranges := map[int]map[string][]Range{} - for _, meter := range data { - mID := GetMeterID(meter.MeterID) - ranges[mID] = map[string][]Range{} - - for _, v := range meter.Meetwaarde { - ranges[mID][v.Naam] = v.Range - } - } - - sets := map[int]Sets{} - for meter, val := range ranges { - sets[meter] = Sets{} - - for k, v := range val { - for _, r := range v { - date := time.Time(r.Date) - set, ok := sets[meter][date] - if !ok { - sets[meter][date] = Set{} - set = sets[meter][date] - } - - set[k] = r.Value - } - } - } - - insertSets(sets) - - if end.Before(time.Now()) { - start = end.Add(time.Second) - return - } - - time.Sleep(time.Minute) - - start = GetStartDate() -} - -// GetMeterID gets the meter ID, if there is none yet a new record is add to meter -func GetMeterID(meterID string) (id int) { - me := model.Meter() - q := me.Select(me.ID). - Where(qc.Eq(me.FortopUID, meterID)) - - err := db.QueryRow(q).Scan(&id) - switch err { - case nil: - case sql.ErrNoRows: - iq := me.Insert(me.Name, me.FortopUID). - Values(``, meterID) - - err = db.QueryRow(qb.Returning(iq, me.ID)).Scan(&id) - if err != nil { - panic(err) - } - default: - panic(err) - } - - return -} - -// GetStartDate gets the last measurement date -func GetStartDate() time.Time { - m := model.Measurement() - var t *time.Time - - q := m.Select(qf.Max(m.Time)) - err := db.QueryRow(q).Scan(&t) - if (err == nil && t == nil) || err == sql.ErrNoRows { - return time.Date(2018, 4, 1, 0, 0, 0, 0, time.Local) - } - if err != nil { - panic(err) - } - - *t = t.Add(time.Second) - - return *t -} - -func insertSets(sets map[int]Sets) { - if len(sets) == 0 { - return - } - - m := model.Measurement() - q := m.Insert(m.Time, m.MeterID, - m.CGem1, m.CGem2, m.CGem3, - m.Ep1, m.Ep2, m.Ep3, - m.IGem1, m.IGem2, m.IGem3, - m.IMax1, m.IMax2, m.IMax3, - m.PMax1, m.PMax2, m.PMax3, - m.SMax1, m.SMax2, m.SMax3, - m.UGem1, m.UGem2, m.UGem3, - ) - - for meter, s := range sets { - for k, v := range s { - q.Values(k, meter, - v[`CGem1`], v[`CGem2`], v[`CGem3`], - v[`EP1`], v[`EP2`], v[`EP3`], - v[`IGem1`], v[`IGem2`], v[`IGem3`], - v[`IMax1`], v[`IMax2`], v[`IMax3`], - v[`PMax1`], v[`PMax2`], v[`PMax3`], - v[`SMax1`], v[`SMax2`], v[`SMax3`], - v[`UGem1`], v[`UGem2`], v[`UGem3`], - ) - } - } - - err := db.Exec(q) - if err != nil { - panic(err) +func fetchFortopData() { + for { + func() { + defer recoverFunc() + fortop.FetchData() + }() } } - -// GetFortopData retrieves data from fortop -func GetFortopData(startDate, endDate time.Time) []Meter { - url := `https://energy4all.energyportal.online/index?p5g=S_1_` + strconv.FormatInt(startDate.Unix(), 10) + `_` + strconv.FormatInt(endDate.Unix(), 10) - resp, err := http.Get(url) - if err != nil { - panic(err) - } - if resp.StatusCode != 200 { - fmt.Println(`Got status code:`, resp.StatusCode) - return nil - } - - decoder := xml.NewDecoder(resp.Body) - decoder.CharsetReader = charset.NewReaderLabel - - var data FortopFormat - err = decoder.Decode(&data) - if err != nil { - panic(err) - } - - return data.Trafo.Meter -} diff --git a/shared/type.go b/shared/type.go new file mode 100644 index 0000000..9ab8246 --- /dev/null +++ b/shared/type.go @@ -0,0 +1,28 @@ +package shared + +import ( + "strconv" + "time" +) + +// UnixTimestamp is a time.Time that can parse unix timestamps +type UnixTimestamp time.Time + +// UnmarshalText implements encoding.TextUnmarshaler +func (t *UnixTimestamp) UnmarshalText(b []byte) error { + i, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return err + } + *t = UnixTimestamp(time.Unix(i, 0)) + return nil +} + +// UnmarshalJSON implements json.Unmarshaler +func (t *UnixTimestamp) UnmarshalJSON(b []byte) error { + return t.UnmarshalText(b) +} + +func (t UnixTimestamp) String() string { + return time.Time(t).Format(`2006-01-02 15:04`) +} diff --git a/type.go b/type.go deleted file mode 100644 index f0d3966..0000000 --- a/type.go +++ /dev/null @@ -1,59 +0,0 @@ -package main - -import ( - "encoding/xml" - "strconv" - "time" -) - -// FortopFormat .. -type FortopFormat struct { - XMLName xml.Name `xml:"Trafo-Details"` - Trafo struct { - TrafoNummer int `xml:"trafonummer"` - Meter []Meter `xml:"meter"` - } `xml:"trafo"` -} - -// Meter contains all data from one specific meter -type Meter struct { - MeterID string `xml:"meter-id"` - StartDate UnixTimestamp `xml:"startdate"` - EndDate UnixTimestamp `xml:"enddate"` - Meetwaarde []Meetwaarde `xml:"meetwaarde"` -} - -// Meetwaarde contains the data that is not pointless garbage -type Meetwaarde struct { - Naam string `xml:"naam"` - Range []Range `xml:"range"` -} - -// Range is a single set of data -type Range struct { - Date UnixTimestamp `xml:"date"` - Value float64 `xml:"value"` -} - -// UnixTimestamp is a time.Time that can parse unix timestamps -type UnixTimestamp time.Time - -// UnmarshalText implements encoding.TextUnmarshaler -func (t *UnixTimestamp) UnmarshalText(b []byte) error { - i, err := strconv.ParseInt(string(b), 10, 64) - if err != nil { - return err - } - *t = UnixTimestamp(time.Unix(i, 0)) - return nil -} - -func (t UnixTimestamp) String() string { - return time.Time(t).Format(`2006-01-02 15:04`) -} - -// Sets are multiple sets of date -type Sets map[time.Time]Set - -// Set is a set of data -type Set map[string]float64