1
#!/usr/bin/env python
2
# vim: set et ts=4 sw=4 fileencoding=utf-8 :
3
u"""
4
5
Éleconso : sources et capacité de production électrique de la France
6
--------------------------------------------------------------------
7
8
*Ou: dois-je lancer mon sèche-linge à 19h...*
9
10
Génère un graphe de la consommation électrique (au quart d'heure près) en
11
surimpression de la capacité de production électrique française, de façon à
12
connaître d'un coup d'oeil la nature de l'électricité consommée :
13
14
* peu émettrice de CO2
15
* très émettrice de CO2
16
* très émettrice de CO2 et importée depuis nos voisins
17
18
Les données sources sont publiées sur le site de RTE, le réseau de transport
19
d'électricité français :
20
21
* http://clients.rte-france.com/lang/fr/visiteurs/vie/courbes.jsp
22
* http://clients.rte-france.com/lang/fr/visiteurs/vie/prod/realisation_production.jsp
23
24
.. :Authors:
25
       Aurélien Bompard <aurelien@bompard.org> <http://aurelien.bompard.org>
26
27
.. :License:
28
       GNU GPL v3 or later
29
30
"""
31
32
import os
33
import re
34
import datetime
35
import urllib2
36
import optparse
37
38
import matplotlib
39
matplotlib.use("Agg")
40
import matplotlib.pyplot as plt
41
from matplotlib.dates import  HourLocator, DateFormatter
42
from matplotlib.font_manager import FontProperties
43
import numpy
44
45
46
class Production(object):
47
    """
48
    La production électrique française, par source. Un point toutes les heures
49
    mais les données peuvent dater d'un jour ou deux.
50
    """
51
52
    def __init__(self):
53
        self.page_url = "http://clients.rte-france.com/lang/fr/visiteurs/vie/prod/realisation_production.jsp"
54
        self.data_line = re.compile('<param\s+name="data"\s*value="([^"]+)"\s*/>')
55
        self.data = {}
56
        self.date = None
57
        self.considered_clean = [u"Nucléaire", u"Hydraulique"]
58
59
    def read(self, debug=False):
60
        if debug:
61
            page = open(os.path.basename(self.page_url))
62
        else:
63
            page = urllib2.urlopen(self.page_url)
64
        for line in page:
65
            if '<param name="data" ' not in line:
66
                continue
67
            line_match = self.data_line.match(line.strip())
68
            data = line_match.group(1)
69
            break
70
        for source in data.split("|"):
71
            if not source:
72
                continue
73
            source_data = source.split(";")
74
            source_name = source_data.pop(0).decode("iso-8859-1")
75
            self.data[source_name] = {}
76
            self.date = source_data.pop(0)
77
            self.date = datetime.date(int(self.date[6:10]),
78
                                      int(self.date[3:5]),
79
                                      int(self.date[0:2]))
80
            for hour, prod in enumerate(source_data):
81
                prod_time = datetime.time(hour, 0, 0)
82
                if prod.endswith(".0"):
83
                    prod = prod[:-2]
84
                try:
85
                    prod = int(prod)
86
                except ValueError:
87
                    prod = None
88
                self.data[source_name][prod_time] = prod
89
        page.close()
90
91
    def get_averages(self):
92
        avg = {}
93
        for source, data in self.data.iteritems():
94
            prod_values = [v for v in data.values() if v is not None]
95
            if not prod_values:
96
                avg[source] = None
97
            else:
98
                avg[source] = sum(prod_values) / len(prod_values)
99
        return avg
100
101
    def get_limit(self, sourcetype, time=None):
102
        if sourcetype == "clean":
103
            return self._get_limit(self.considered_clean, time)
104
        elif sourcetype == "cheap":
105
            return self._get_limit(["Total"], time)
106
        raise KeyError()
107
108
    def _get_limit(self, sources, time=None):
109
        if time is None:
110
            values = self.get_averages()
111
        else:
112
            dt = datetime.time(time.hour, 0, 0)
113
            values = {}
114
            for source in sources:
115
                values[source] = self.data[source][dt]
116
        return sum([values[source] for source in sources])
117
118
    def get_aggregate(self, sourcetype):
119
        if sourcetype == "clean":
120
            return self._get_aggregate(self.considered_clean)
121
        elif sourcetype == "cheap":
122
            return self._get_aggregate(["Total"])
123
        raise KeyError()
124
125
    def _get_aggregate(self, sources):
126
        data = {}
127
        for source in sources:
128
            for date, value in self.data[source].iteritems():
129
                if date not in data:
130
                    data[date] = 0
131
                data[date] += value
132
        return data
133
134
135
class Consommation(object):
136
    """La consommation électrique française, au quart d'heure près"""
137
138
    def __init__(self):
139
        self.page_url = "http://clients.rte-france.com/lang/fr/visiteurs/vie/courbes.jsp"
140
        self.page_params = {"Chaine2": "Consommation",
141
                            "Chaine4": "Prev J-1",
142
                            "Chaine6": "Prev J",
143
                           }
144
        self.data_line_re = '<param\s+name\s*=\s*"%s"\s*value\s*=\s*"([^"]+);"\s*/>'
145
        self.date_line_re = re.compile('<param\s+name\s*=\s*"Jour"\s*value\s*=\s*"(\d\d)/(\d\d)/(\d\d\d\d)"\s*/>')
146
        self.data = {}
147
        self.date = None
148
149
    def read(self, debug=False):
150
        if debug:
151
            page = open(os.path.basename(self.page_url))
152
        else:
153
            page = urllib2.urlopen(self.page_url)
154
        data = {}
155
        for line in page:
156
            if '<param ' not in line:
157
                continue # légère optimisation inutile
158
            for param_page, param_name in self.page_params.iteritems():
159
                line_match = re.match(self.data_line_re % param_page,
160
                                      line.strip())
161
                if line_match is None:
162
                    continue
163
                data[param_name] = line_match.group(1)
164
            line_match = self.date_line_re.match(line.strip())
165
            if line_match is not None:
166
                self.date = datetime.date(int(line_match.group(3)),
167
                                          int(line_match.group(2)),
168
                                          int(line_match.group(1)))
169
        for data_name, data_values in data.iteritems():
170
            if data_values:
171
                self.data[data_name] = {}
172
            for index, value in enumerate(data_values.split(",")):
173
                data_time = index * 15 # une valeur tous les quarts d'heure
174
                data_time = datetime.time(data_time / 60, data_time % 60, 0)
175
                try:
176
                    conso_value = int(value)
177
                except ValueError:
178
                    conso_value = None
179
                self.data[data_name][data_time] = conso_value
180
        page.close()
181
182
    def get_last_time(self):
183
        now = datetime.datetime.now()
184
        now = datetime.time(hour=now.hour, minute=now.minute/15*15) # arrondir à 15 min
185
        for i in range(10): # 4 essais pour trouver une valeur
186
            if now in self.data["Consommation"]:
187
                return now
188
            now = datetime.datetime.combine(self.date, now) - datetime.timedelta(minutes=15)
189
            if now.date() < self.date:
190
                break
191
            now = now.time()
192
        return None
193
194
195
class ConsoGraph(object):
196
    """
197
    Le graphe de la consommation sur la production
198
    """
199
200
    def __init__(self, prod, conso):
201
        self.prod = prod
202
        self.conso = conso
203
        self.date = self.conso.date
204
        self.title = "Consommation le %s" % self.date.strftime("%d/%m/%Y")
205
        self.size = (6.4, 4.8)
206
        self.fontsize = 10
207
        self.status_message = ["OK", u"électricité polluante", u"électricité polluante et importée"]
208
        self.status_color = ["green", "orange", "red"]
209
210
    def get_status(self):
211
        clean_limit = self.prod.get_limit("clean", datetime.datetime.now())
212
        cheap_limit = self.prod.get_limit("cheap", datetime.datetime.now())
213
        last_time = self.conso.get_last_time()
214
        if last_time is None:
215
            return None
216
        cur_conso = self.conso.data["Consommation"][last_time]
217
        status = 0
218
        if cur_conso > clean_limit:
219
            status += 1
220
        if cur_conso > cheap_limit:
221
            status += 1
222
        #print "(conso:%s prod:%s)" % (cur_conso, cheap_limit)
223
        return status
224
225
    def get_series(self):
226
        x = []
227
        series = {"clean": [], "cheap": [], "conso": [], "prev": []}
228
        clean_values = self.prod.get_aggregate("clean")
229
        cheap_values = self.prod.get_aggregate("cheap")
230
        for time in sorted(self.conso.data["Prev J-1"]):
231
            x.append(datetime.datetime.combine(self.conso.date, time))
232
            # clean
233
            if time in clean_values:
234
                series["clean"].append(clean_values[time])
235
            elif time.replace(minute=0) in clean_values:
236
                series["clean"].append(clean_values[time.replace(minute=0)])
237
            # cheap
238
            if time in clean_values:
239
                series["cheap"].append(cheap_values[time])
240
            elif time.replace(minute=0) in cheap_values:
241
                series["cheap"].append(cheap_values[time.replace(minute=0)])
242
            # conso
243
            if time in self.conso.data["Consommation"]:
244
                series["conso"].append(self.conso.data["Consommation"][time])
245
            series["prev"].append(self.conso.data["Prev J-1"][time])
246
        series["clean"] = numpy.array(series["clean"])
247
        series["cheap"] = numpy.array(series["cheap"])
248
        series["conso"] = numpy.array(series["conso"])
249
        series["prev"] = numpy.array(series["prev"])
250
        return x, series
251
252
    def graph(self, filename=None):
253
        x, series = self.get_series()
254
        matplotlib.rc('font', size=self.fontsize)
255
        fig = plt.figure(figsize=self.size)
256
        ax = fig.add_subplot(111)
257
        ax.xaxis.set_minor_locator(HourLocator())
258
        ax.xaxis.set_major_formatter(DateFormatter("%H:%M"))
259
        plt.suptitle(self.title, fontsize="large", weight="bold")
260
        plt.ylabel('MW', weight="bold")
261
        ax.fill_between(x, 0, series["clean"], facecolor="g", axes=ax, label="Production sans CO2")
262
        ax.fill_between(x, series["clean"], series["cheap"], facecolor="orange", axes=ax, label="Production avec CO2")
263
        ax.plot(x[:len(series["conso"])], series["conso"], axes=ax, lw=2, color="blue", label="Consommation")
264
        ax.fill_between(x[:len(series["conso"])],
265
                        series["cheap"][:len(series["conso"])],
266
                        series["conso"],
267
                        where=series["cheap"][:len(series["conso"])]<series["conso"],
268
                        facecolor="red", interpolate=True)
269
        ax.fill_between(x[:len(series["conso"])],
270
                        series["conso"],
271
                        series["cheap"][:len(series["conso"])],
272
                        where=series["conso"]<series["cheap"][:len(series["conso"])],
273
                        facecolor="white", alpha=0.6, interpolate=True)
274
        ax.plot(x, series["prev"], axes=ax, lw=1, color="blue", linestyle="dashed", label=u"Prévision")
275
        #plt.axis([0, None, 0, None])
276
        ax.xaxis_date()
277
        #ax.autoscale_view(scaley=False)
278
        plt.axis([None, None, 55000, None])
279
        plt.setp(plt.gca().get_xticklabels(), rotation=45, horizontalalignment='right')
280
        plt.legend(loc="lower left", fancybox=True, prop=FontProperties(size=11), handlelength=4)
281
        plt.grid(True)
282
        status = self.get_status()
283
        if status is not None:
284
            props = dict(boxstyle='round', facecolor=self.status_color[status], alpha=0.5)
285
            last_time = self.conso.get_last_time()
286
            ax.text(0.98, 0.97, u"À %s : %s" %
287
                    (last_time.strftime("%Hh%M"), self.status_message[status]),
288
                    transform=ax.transAxes, fontsize="large", ha="right",
289
                    verticalalignment='top', bbox=props)
290
            # On laisse de la place pour la boîte
291
            old_ylim = ax.get_ylim()
292
            ax.set_ylim(top=old_ylim[1]+5000)
293
        if filename is None:
294
            plt.show()
295
        else:
296
            fig.savefig(filename, dpi=96)
297
298
299
300
def parse_opts():
301
    """Command-line options"""
302
    parser = optparse.OptionParser()
303
    parser.add_option("-o", "--output", dest="output",
304
                      help=u"Fichier image à générer")
305
    parser.add_option("-d", "--debug", dest="debug", action="store_true",
306
                      help=u"Mode déboguage")
307
    options, args = parser.parse_args()
308
    if args:
309
        parser.error(u"Pas d'argument autorisé")
310
    if not options.output:
311
        parser.error("Il faut choisir un fichier de sortie")
312
    return options
313
314
def main():
315
    opts = parse_opts()
316
    prod = Production()
317
    prod.read(opts.debug)
318
    conso = Consommation()
319
    conso.read(opts.debug)
320
    graph = ConsoGraph(prod, conso)
321
    graph.graph(opts.output)
322
323
if __name__ == "__main__":
324
    main()