"""
.. moduleauthor:: Gabriel Martin Becedillas Ruiz <gabriel.becedillas@gmail.com>
"""
def get_change_percentage(actual, prev):
'''计算值变化的百分比
Arguments:
actual {[number]} -- [实际的当前值]
prev {[number]} -- [前一个值]
Raises:
Exception -- [如果两个值任意一个是None或者前一个值是0]
Returns:
[float] -- [百分比的值]
'''
if actual is None or prev is None or prev == 0:
raise Exception("Invalid values")
diff = actual-prev
ret = diff / float(abs(prev))
return ret
def safe_min(left, right):
'''安全的计算最小值,在一个值是None的情况下,仍旧可以得到
'''
if left is None:
return right
elif right is None:
return left
else:
return min(left, right)
def safe_max(left, right):
'''安全的计算最大值,在一个值是None的情况下,仍旧可以得到
'''
if left is None:
return right
elif right is None:
return left
else:
return max(left, right)
import numpy as np
def lt(v1, v2):
if v1 is None:
return True
elif v2 is None:
return False
else:
return v1 < v2
# Returns (values, ix1, ix2)
# values1 and values2 are assumed to be sorted
def intersect(values1, values2, skipNone=False):
ix1 = []
ix2 = []
values = []
i1 = 0
i2 = 0
while i1 < len(values1) and i2 < len(values2):
v1 = values1[i1]
v2 = values2[i2]
if v1 == v2 and (v1 is not None or skipNone is False):
ix1.append(i1)
ix2.append(i2)
values.append(v1)
i1 += 1
i2 += 1
elif lt(v1, v2):
i1 += 1
else:
i2 += 1
return (values, ix1, ix2)
# Like a collections.deque but using a numpy.array.
class NumPyDeque(object):
def __init__(self, maxLen, dtype=float):
assert maxLen > 0, "Invalid maximum length"
self.__values = np.empty(maxLen, dtype=dtype)
self.__maxLen = maxLen
self.__nextPos = 0
def getMaxLen(self):
return self.__maxLen
def append(self, value):
if self.__nextPos < self.__maxLen:
self.__values[self.__nextPos] = value
self.__nextPos += 1
else:
# Shift items to the left and put the last value.
# I'm not using np.roll to avoid creating a new array.
self.__values[0:-1] = self.__values[1:]
self.__values[self.__nextPos - 1] = value
def data(self):
# If all values are not initialized, return a portion of the array.
if self.__nextPos < self.__maxLen:
ret = self.__values[0:self.__nextPos]
else:
ret = self.__values
return ret
def resize(self, maxLen):
assert maxLen > 0, "Invalid maximum length"
# Create empty, copy last values and swap.
values = np.empty(maxLen, dtype=self.__values.dtype)
lastValues = self.__values[0:self.__nextPos]
values[0:min(maxLen, len(lastValues))] = lastValues[-1*min(maxLen, len(lastValues)):]
self.__values = values
self.__maxLen = maxLen
if self.__nextPos >= self.__maxLen:
self.__nextPos = self.__maxLen
def __len__(self):
return self.__nextPos
def __getitem__(self, key):
return self.data()[key]
# I'm not using collections.deque because:
# 1: Random access is slower.
# 2: Slicing is not supported.
class ListDeque(object):
def __init__(self, maxLen):
assert maxLen > 0, "Invalid maximum length"
self.__values = []
self.__maxLen = maxLen
def getMaxLen(self):
return self.__maxLen
def append(self, value):
self.__values.append(value)
# Check bounds
if len(self.__values) > self.__maxLen:
self.__values.pop(0)
def data(self):
return self.__values
def resize(self, maxLen):
assert maxLen > 0, "Invalid maximum length"
self.__maxLen = maxLen
self.__values = self.__values[-1*maxLen:]
def __len__(self):
return len(self.__values)
def __getitem__(self, key):
return self.__values[key]
"""
.. moduleauthor:: Gabriel Martin Becedillas Ruiz <gabriel.becedillas@gmail.com>
"""
import csv
import logging
import six
from six.moves import xrange
import requests
logging.getLogger("requests").setLevel(logging.ERROR)
# A faster (but limited) version of csv.DictReader
class FastDictReader(object):
def __init__(self, f, fieldnames=None, dialect="excel", *args, **kwargs):
self.__fieldNames = fieldnames
self.reader = csv.reader(f, dialect, *args, **kwargs)
if self.__fieldNames is None:
self.__fieldNames = six.next(self.reader)
self.__dict = {}
def _next_impl(self):
# Skip empty rows.
row = six.next(self.reader)
while row == []:
row = six.next(self.reader)
# Check that the row has the right number of columns.
assert len(self.__fieldNames) == len(row), "Expected columns: %s. Actual columns: %s" % (
self.__fieldNames, list(row.keys())
)
# Copy the row values into the dict.
for i in xrange(len(self.__fieldNames)):
self.__dict[self.__fieldNames[i]] = row[i]
return self.__dict
def __iter__(self):
return self
def __next__(self):
return self._next_impl()
def next(self):
return self._next_impl()
def download_csv(url, url_params=None, content_type="text/csv"):
response = requests.get(url, params=url_params)
response.raise_for_status()
response_content_type = response.headers['content-type']
if response_content_type != content_type:
raise Exception("Invalid content-type: %s" % response_content_type)
ret = response.text
# Remove the BOM
while not ret[0].isalnum():
ret = ret[1:]
return ret
def float_or_string(value):
try:
ret = float(value)
except Exception:
ret = value
return ret
import datetime
import pytz
def datetime_is_naive(dateTime):
""" Returns True if dateTime is naive."""
return dateTime.tzinfo is None or dateTime.tzinfo.utcoffset(dateTime) is None
# Remove timezone information.
def unlocalize(dateTime):
return dateTime.replace(tzinfo=None)
def localize(dateTime, timeZone):
"""Returns a datetime adjusted to a timezone:
* If dateTime is a naive datetime (datetime with no timezone information), timezone information is added but date
and time remains the same.
* If dateTime is not a naive datetime, a datetime object with new tzinfo attribute is returned, adjusting the date
and time data so the result is the same UTC time.
"""
if datetime_is_naive(dateTime):
ret = timeZone.localize(dateTime)
else:
ret = dateTime.astimezone(timeZone)
return ret
def as_utc(dateTime):
return localize(dateTime, pytz.utc)
def datetime_to_timestamp(dateTime):
""" Converts a datetime.datetime to a UTC timestamp."""
diff = as_utc(dateTime) - epoch_utc
return diff.total_seconds()
def timestamp_to_datetime(timeStamp, localized=True):
""" Converts a UTC timestamp to a datetime.datetime."""
ret = datetime.datetime.utcfromtimestamp(timeStamp)
if localized:
ret = localize(ret, pytz.utc)
return ret
def get_first_monday(year):
ret = datetime.date(year, 1, 1)
if ret.weekday() != 0:
diff = 7 - ret.weekday()
ret = ret + datetime.timedelta(days=diff)
return ret
def get_last_monday(year):
ret = datetime.date(year, 12, 31)
if ret.weekday() != 0:
diff = ret.weekday() * -1
ret = ret + datetime.timedelta(days=diff)
return ret
epoch_utc = as_utc(datetime.datetime(1970, 1, 1))
import numpy
def mean(values):
ret = None
if len(values):
ret = numpy.array(values).mean()
return ret
def stddev(values, ddof=1):
ret = None
if len(values):
ret = numpy.array(values).std(ddof=ddof)
return ret