Repository for Petra's work at ampli Jan-Feb 2019

downkwh.py 5.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. from argparse import ArgumentParser
  2. # from psycopg2 import sql
  3. import gc
  4. from util import getQuery, datevalid
  5. import pandas as p
  6. import gc
  7. from datetime import datetime
  8. from tqdm import tqdm, trange
  9. from pprint import pprint
  10. from tempfile import TemporaryDirectory
  11. import numpy as np
  12. tables = [
  13. 'public.best_icp', # All icps with at least 360 days of data in 2017
  14. 'public.best_icp_1618', # All icps with at least 720 days of data in 2 years from 1 April 2016
  15. 'public.best_icp_18m', # All icps with at least 540 days of data from July 2016 to end of 2017
  16. 'public.icp_sample', # A pre-generated 1k sample from best_icp
  17. 'public.icp_sample_5k', # A pre-generated 5k sample from best_icp
  18. 'public.icp_sample_1618', # A pre-generated 1k sample from best_icp_1618
  19. 'public.icp_sample_18m' # A pre-generated 1k sample from best_icp_18m
  20. ]
  21. def getkwh(datestart, dateend, timestart, timeend, icp_tab, verbose = True):
  22. """Get kwh data from database
  23. """
  24. query = """
  25. SELECT SUBSTRING(comb.icp_id FROM 2 FOR 6)::int AS icp_id, comb.read_time, COALESCE(kwh_tot, 0) AS kwh_tot
  26. FROM
  27. (
  28. SELECT read_time, icp_id
  29. FROM
  30. (
  31. SELECT read_time
  32. FROM GENERATE_SERIES(%(tsstart)s::timestamp, %(tsend)s::timestamp,
  33. '30 minutes'::interval) read_time
  34. ) AS tsdata CROSS JOIN {}
  35. ) AS comb
  36. LEFT JOIN
  37. (
  38. SELECT *, read_date + CONCAT(period / 2, ':', period %% 2 * 30, ':00')::time AS read_time
  39. FROM (
  40. SELECT a.icp_id
  41. , a.read_date
  42. , c.period
  43. , sum(c.read_kwh) as kwh_tot
  44. , sum(case when a.content_code = 'UN' then c.read_kwh else 0 end) as kwh_un
  45. , sum(case when a.content_code in ('CN','EG') then c.read_kwh else 0 end) as kwh_cn
  46. FROM coup_prd.coupdatamaster a,
  47. unnest(a.read_array) WITH ORDINALITY c(read_kwh, period)
  48. WHERE a.read_date >= to_date(%(datestart)s,'yyyy-mm-dd')
  49. and a.read_date < to_date(%(dateend)s,'yyyy-mm-dd')
  50. and a.content_code ~ ('UN|CN|EG')
  51. AND a.icp_id IN (
  52. SELECT icp_id FROM {}
  53. )
  54. GROUP BY 1, 2, 3
  55. ) AS coup_tall
  56. ) AS tall_timestamp
  57. ON comb.read_time = tall_timestamp.read_time AND comb.icp_id = tall_timestamp.icp_id;
  58. """
  59. query = query.format(icp_tab, icp_tab)
  60. pdict = {
  61. 'datestart': datestart,
  62. 'dateend': dateend,
  63. 'tsstart': timestart,
  64. 'tsend': timeend
  65. # 'subset': subset
  66. }
  67. if verbose:
  68. print("Getting data with parameters:")
  69. pprint(pdict)
  70. qdf = getQuery(query, pdict, verbose)
  71. if verbose:
  72. print("Optimising")
  73. qdf['icp_id'] = qdf['icp_id'].astype(np.int32)
  74. qdf['kwh_tot'] = qdf['kwh_tot'].astype(np.float16)
  75. # print("Done")
  76. return(qdf)
  77. def collateddownload(startd, endd, numdivis, icp_tab, pivot, verbose):
  78. """
  79. Download dataset in pieces, then combine
  80. """
  81. with TemporaryDirectory() as tempdir:
  82. divset = p.date_range(startd, endd, periods = numdivis + 1).strftime("%Y-%m-%d")
  83. divlow = divset[:-1]
  84. divhig = divset[1:]
  85. for i in trange(numdivis):
  86. datestart = divlow[i]
  87. dateend = divhig[i]
  88. datetimeend = dateend + " 00:00:00"
  89. datetimestart = datestart + " 00:30:00"
  90. filename = "{}/{}temp.pkl".format(tempdir, i)
  91. tdf = getkwh(datestart, dateend, datetimestart, datetimeend, icp_tab, verbose)
  92. if pivot:
  93. if verbose:
  94. print("Pivoting")
  95. tdf = tdf.pivot(index = 'read_time', columns = 'icp_id', values = 'kwh_tot')
  96. tdf.to_pickle(filename)
  97. del tdf
  98. coldf = p.read_pickle("{}/{}temp.pkl".format(tempdir, 0))
  99. for i in trange(1, numdivis):
  100. filename = "{}/{}temp.pkl".format(tempdir, i)
  101. tdf = p.read_pickle(filename)
  102. coldf = p.concat([coldf, tdf])
  103. del tdf
  104. gc.collect()
  105. return coldf
  106. def main():
  107. parser = ArgumentParser(description='Download kwh data from database')
  108. parser.add_argument("-o", "--output", dest="output", help = "output pickle path", metavar="PATH", required = True)
  109. parser.add_argument("-s", "--start-date", dest = "startdate", help = "start date for download; format: YYYY-MM-DD; default: 2017-01-01", metavar="DATE", default = "2017-01-01", type = datevalid)
  110. parser.add_argument("-e", "--end-date", dest = "enddate", help = "end date for download; format: YYYY-MM-DD; default: 2018-01-01", metavar="DATE", default = "2018-01-01", type = datevalid)
  111. parser.add_argument("-t", "--table", dest = "table", help = "table for download (constrained to specific values in source); default: public.icp_sample", metavar="TABLE", default = "public.icp_sample", choices = tables)
  112. parser.add_argument("-n", "--num-div", dest="numdiv", help = "number of segments to divide download into", metavar = "NUM", default = 12, type = int)
  113. parser.add_argument("--no-pivot", dest = "pivot", help = "output dataframe in tall (non-pivoted) format", action ="store_false")
  114. parser.add_argument("-v", "--verbose", dest = "verbose", action ="store_true")
  115. args = parser.parse_args()
  116. cdata = collateddownload(args.startdate, args.enddate, args.numdiv, args.table, args.pivot, args.verbose)
  117. cdata.to_pickle(args.output)
  118. if __name__ == "__main__":
  119. main()