您的当前位置:首页正文

pyalgotrade源码阅读:utils里面单独的函数

2024-12-02 来源:个人技术集锦
"""
.. moduleauthor:: Gabriel Martin Becedillas Ruiz <gabriel.becedillas@gmail.com>
"""


def get_change_percentage(actual, prev):
    '''计算值变化的百分比
    
    Arguments:
        actual {[number]} -- [实际的当前值]
        prev {[number]} -- [前一个值]
    
    Raises:
        Exception -- [如果两个值任意一个是None或者前一个值是0]
    
    Returns:
        [float] -- [百分比的值]
    '''

    if actual is None or prev is None or prev == 0:
        raise Exception("Invalid values")

    diff = actual-prev
    ret = diff / float(abs(prev))
    return ret


def safe_min(left, right):
    '''安全的计算最小值,在一个值是None的情况下,仍旧可以得到
    '''

    if left is None:
        return right
    elif right is None:
        return left
    else:
        return min(left, right)


def safe_max(left, right):
    '''安全的计算最大值,在一个值是None的情况下,仍旧可以得到
    '''
    if left is None:
        return right
    elif right is None:
        return left
    else:
        return max(left, right)
import numpy as np


def lt(v1, v2):
    if v1 is None:
        return True
    elif v2 is None:
        return False
    else:
        return v1 < v2


# Returns (values, ix1, ix2)
# values1 and values2 are assumed to be sorted
def intersect(values1, values2, skipNone=False):
    ix1 = []
    ix2 = []
    values = []

    i1 = 0
    i2 = 0
    while i1 < len(values1) and i2 < len(values2):
        v1 = values1[i1]
        v2 = values2[i2]
        if v1 == v2 and (v1 is not None or skipNone is False):
            ix1.append(i1)
            ix2.append(i2)
            values.append(v1)
            i1 += 1
            i2 += 1
        elif lt(v1, v2):
            i1 += 1
        else:
            i2 += 1

    return (values, ix1, ix2)


# Like a collections.deque but using a numpy.array.
class NumPyDeque(object):
    def __init__(self, maxLen, dtype=float):
        assert maxLen > 0, "Invalid maximum length"

        self.__values = np.empty(maxLen, dtype=dtype)
        self.__maxLen = maxLen
        self.__nextPos = 0

    def getMaxLen(self):
        return self.__maxLen

    def append(self, value):
        if self.__nextPos < self.__maxLen:
            self.__values[self.__nextPos] = value
            self.__nextPos += 1
        else:
            # Shift items to the left and put the last value.
            # I'm not using np.roll to avoid creating a new array.
            self.__values[0:-1] = self.__values[1:]
            self.__values[self.__nextPos - 1] = value

    def data(self):
        # If all values are not initialized, return a portion of the array.
        if self.__nextPos < self.__maxLen:
            ret = self.__values[0:self.__nextPos]
        else:
            ret = self.__values
        return ret

    def resize(self, maxLen):
        assert maxLen > 0, "Invalid maximum length"

        # Create empty, copy last values and swap.
        values = np.empty(maxLen, dtype=self.__values.dtype)
        lastValues = self.__values[0:self.__nextPos]
        values[0:min(maxLen, len(lastValues))] = lastValues[-1*min(maxLen, len(lastValues)):]
        self.__values = values

        self.__maxLen = maxLen
        if self.__nextPos >= self.__maxLen:
            self.__nextPos = self.__maxLen

    def __len__(self):
        return self.__nextPos

    def __getitem__(self, key):
        return self.data()[key]


# I'm not using collections.deque because:
# 1: Random access is slower.
# 2: Slicing is not supported.
class ListDeque(object):
    def __init__(self, maxLen):
        assert maxLen > 0, "Invalid maximum length"

        self.__values = []
        self.__maxLen = maxLen

    def getMaxLen(self):
        return self.__maxLen

    def append(self, value):
        self.__values.append(value)
        # Check bounds
        if len(self.__values) > self.__maxLen:
            self.__values.pop(0)

    def data(self):
        return self.__values

    def resize(self, maxLen):
        assert maxLen > 0, "Invalid maximum length"

        self.__maxLen = maxLen
        self.__values = self.__values[-1*maxLen:]

    def __len__(self):
        return len(self.__values)

    def __getitem__(self, key):
        return self.__values[key]
"""
.. moduleauthor:: Gabriel Martin Becedillas Ruiz <gabriel.becedillas@gmail.com>
"""

import csv
import logging

import six
from six.moves import xrange
import requests


logging.getLogger("requests").setLevel(logging.ERROR)


# A faster (but limited) version of csv.DictReader
class FastDictReader(object):
    def __init__(self, f, fieldnames=None, dialect="excel", *args, **kwargs):
        self.__fieldNames = fieldnames
        self.reader = csv.reader(f, dialect, *args, **kwargs)
        if self.__fieldNames is None:
            self.__fieldNames = six.next(self.reader)
        self.__dict = {}

    def _next_impl(self):
        # Skip empty rows.
        row = six.next(self.reader)
        while row == []:
            row = six.next(self.reader)

        # Check that the row has the right number of columns.
        assert len(self.__fieldNames) == len(row), "Expected columns: %s. Actual columns: %s" % (
            self.__fieldNames, list(row.keys())
        )

        # Copy the row values into the dict.
        for i in xrange(len(self.__fieldNames)):
            self.__dict[self.__fieldNames[i]] = row[i]

        return self.__dict

    def __iter__(self):
        return self

    def __next__(self):
        return self._next_impl()

    def next(self):
        return self._next_impl()


def download_csv(url, url_params=None, content_type="text/csv"):
    response = requests.get(url, params=url_params)

    response.raise_for_status()
    response_content_type = response.headers['content-type']
    if response_content_type != content_type:
        raise Exception("Invalid content-type: %s" % response_content_type)

    ret = response.text

    # Remove the BOM
    while not ret[0].isalnum():
        ret = ret[1:]

    return ret


def float_or_string(value):
    try:
        ret = float(value)
    except Exception:
        ret = value
    return ret
import datetime
import pytz


def datetime_is_naive(dateTime):
    """ Returns True if dateTime is naive."""
    return dateTime.tzinfo is None or dateTime.tzinfo.utcoffset(dateTime) is None


# Remove timezone information.
def unlocalize(dateTime):
    return dateTime.replace(tzinfo=None)


def localize(dateTime, timeZone):
    """Returns a datetime adjusted to a timezone:

     * If dateTime is a naive datetime (datetime with no timezone information), timezone information is added but date
       and time remains the same.
     * If dateTime is not a naive datetime, a datetime object with new tzinfo attribute is returned, adjusting the date
       and time data so the result is the same UTC time.
    """

    if datetime_is_naive(dateTime):
        ret = timeZone.localize(dateTime)
    else:
        ret = dateTime.astimezone(timeZone)
    return ret


def as_utc(dateTime):
    return localize(dateTime, pytz.utc)


def datetime_to_timestamp(dateTime):
    """ Converts a datetime.datetime to a UTC timestamp."""
    diff = as_utc(dateTime) - epoch_utc
    return diff.total_seconds()


def timestamp_to_datetime(timeStamp, localized=True):
    """ Converts a UTC timestamp to a datetime.datetime."""
    ret = datetime.datetime.utcfromtimestamp(timeStamp)
    if localized:
        ret = localize(ret, pytz.utc)
    return ret


def get_first_monday(year):
    ret = datetime.date(year, 1, 1)
    if ret.weekday() != 0:
        diff = 7 - ret.weekday()
        ret = ret + datetime.timedelta(days=diff)
    return ret


def get_last_monday(year):
    ret = datetime.date(year, 12, 31)
    if ret.weekday() != 0:
        diff = ret.weekday() * -1
        ret = ret + datetime.timedelta(days=diff)
    return ret


epoch_utc = as_utc(datetime.datetime(1970, 1, 1))
import numpy


def mean(values):
    ret = None
    if len(values):
        ret = numpy.array(values).mean()
    return ret


def stddev(values, ddof=1):
    ret = None
    if len(values):
        ret = numpy.array(values).std(ddof=ddof)
    return ret

 

显示全文