Skip to content

Supporting Arctic Backend Provider & Orderbook, Tick Data Example #744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b81e566
change weight_decay & batchsize
bxdd Dec 8, 2020
3dc5e6d
del weight_decay
bxdd Dec 8, 2020
60a03d5
big weight_decay
bxdd Dec 8, 2020
1521adf
mid weight_decay
bxdd Dec 8, 2020
7792f79
small layer
bxdd Dec 8, 2020
c53d6a1
2 layer
bxdd Dec 8, 2020
2bbd04c
full layer
bxdd Dec 8, 2020
ab4abc7
no weight decay
bxdd Dec 8, 2020
0342850
divide into two data source
wangwenxi-handsome Nov 16, 2021
ace8920
change parse field
wangwenxi-handsome Nov 16, 2021
91e1f82
delete some debug
wangwenxi-handsome Nov 16, 2021
f308d2c
add Toperator
wangwenxi-handsome Nov 17, 2021
518341f
new format of arctic
wangwenxi-handsome Nov 17, 2021
556786a
fix cache bug to arctic read
wangwenxi-handsome Nov 17, 2021
5ae2b5c
fix connection problem
wangwenxi-handsome Nov 18, 2021
acc5fc9
add some operator
luocy16 Nov 19, 2021
b0c3c2f
final version for arcitc
luocy16 Dec 9, 2021
f679602
merge microsoft qlib
luocy16 Dec 13, 2021
e4ff5bd
clear HZ cache
luocy16 Dec 13, 2021
95417c3
remove not used function
luocy16 Dec 14, 2021
33cc78a
add topswrappers
luocy16 Dec 15, 2021
d12ece6
successfully import data and run first test
you-n-g Jan 6, 2022
31d8e1b
Merge remote-tracking branch 'origin/main' into lcy_main
you-n-g Jan 6, 2022
2779b0d
A simpler version to support arctic
you-n-g Jan 14, 2022
a7092bb
Merge remote-tracking branch 'origin/main' into lcy_main
you-n-g Jan 14, 2022
2ea1b82
Successfully run all high-freq expressions
you-n-g Jan 17, 2022
1736ebe
Merge remote-tracking branch 'origin/main' into lcy_main
you-n-g Jan 17, 2022
8f4a717
Black format and fix add docs
you-n-g Jan 17, 2022
2bb154f
Add docs for download and test data
you-n-g Jan 17, 2022
5e21100
update scripts and docs
you-n-g Jan 17, 2022
6ed3d0a
Add docs
you-n-g Jan 17, 2022
2251402
fix bug
you-n-g Jan 17, 2022
0c9d8d7
Refine docs
you-n-g Jan 17, 2022
e59dc16
fix test bug
you-n-g Jan 17, 2022
97685b0
fix CI error
you-n-g Jan 17, 2022
ef7da9e
clean code
you-n-g Jan 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions examples/orderbook_data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Introduction



# Installation

Please refer to [the installation docs](https://docs.mongodb.com/manual/installation/) of mongodb.


# Importing example data

[ ] TODO: download the original data
[ ] TODO: download Qlib data.

```bash
python create_dataset_new.py initialize_library # Initialization Libraries
python create_dataset_new.py import_data # Initialization Libraries
```

# Query Examples




# Known limitations
Expression computing between different frequencies are not supported yet



316 changes: 316 additions & 0 deletions examples/orderbook_data/create_dataset_new.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
NOTE:
- This scripts is a demo to import example data import Qlib
- !!!!!!!!!!!!!!!TODO!!!!!!!!!!!!!!!!!!!:
- Its structure is not well designed, your contribution is welcome to make importing dataset easier
"""
from datetime import date, datetime as dt
import os
from pathlib import Path
import random
import shutil
import time
import traceback

from arctic import Arctic, chunkstore
import arctic
from arctic import Arctic, CHUNK_STORE
from arctic.chunkstore.chunkstore import CHUNK_SIZE
import fire
from joblib import Parallel, delayed, parallel
import numpy as np
import pandas as pd
from pandas import DataFrame
from pandas.core.indexes.datetimes import date_range
from pymongo.mongo_client import MongoClient

DIRNAME = Path(__file__).absolute().resolve().parent

# CONFIG
N_JOBS = -1 # leaving one kernel free
LOG_FILE_PATH = DIRNAME / "log_file"
DATA_PATH = DIRNAME / "raw_data"
DATABASE_PATH = DIRNAME / "orig_data"
DATA_INFO_PATH = DIRNAME / "data_info"
DATA_FINISH_INFO_PATH = DIRNAME / "./data_finish_info"
DOC_TYPE = ["Tick", "Order", "OrderQueue", "Transaction", "Day", "Minute"]
MAX_SIZE = 3000 * 1024 * 1024 * 1024
ALL_STOCK_PATH = DIRNAME / "all.txt"
ARCTIC_SRV = "127.0.0.1"


def get_library_name(doc_type):
if str.lower(doc_type) == str.lower("Tick"):
return "ticks"
else:
return str.lower(doc_type)


def is_stock(exchange_place, code):
if exchange_place == "SH" and code[0] != "6":
return False
if exchange_place == "SZ" and code[0] != "0" and code[:2] != "30":
return False
return True


def add_one_stock_daily_data(filepath, type, exchange_place, arc, date):
"""
exchange_place: "SZ" OR "SH"
type: "tick", "orderbook", ...
filepath: the path of csv
arc: arclink created by a process
"""
code = os.path.split(filepath)[-1].split(".csv")[0]
# print(code)
# print(code, exchange_place)se
if exchange_place == "SH" and code[0] != "6":
return
if exchange_place == "SZ" and code[0] != "0" and code[:2] != "30":
return

df = pd.read_csv(filepath, encoding="gbk", dtype={"code": str})
code = os.path.split(filepath)[-1].split(".csv")[0]

def format_time(day, hms):
day = str(day)
hms = str(hms)
if hms[0] == "1": # >=10,
return (
"-".join([day[0:4], day[4:6], day[6:8]]) + " " + ":".join([hms[:2], hms[2:4], hms[4:6] + "." + hms[6:]])
)
else:
return (
"-".join([day[0:4], day[4:6], day[6:8]]) + " " + ":".join([hms[:1], hms[1:3], hms[3:5] + "." + hms[5:]])
)

## 如果出现有问题的时间戳,直接丢弃整行数据
timestamp = list(zip(list(df["date"]), list(df["time"])))
error_index_list = []
for index, t in enumerate(timestamp):
try:
pd.Timestamp(format_time(t[0], t[1]))
except Exception:
error_index_list.append(index) ## 出问题的行数

# to-do: writting to logs

if len(error_index_list) > 0:
print("error: {}, {}".format(filepath, len(error_index_list)))

df = df.drop(error_index_list)
timestamp = list(zip(list(df["date"]), list(df["time"]))) ## 清理后的时间戳
# 生成时间戳
pd_timestamp = pd.DatetimeIndex(
[pd.Timestamp(format_time(timestamp[i][0], timestamp[i][1])) for i in range(len(df["date"]))]
)
df = df.drop(columns=["date", "time", "name", "code", "wind_code"])
# df = pd.DataFrame(data=df.to_dict("list"), index=pd_timestamp)
df["date"] = pd.to_datetime(pd_timestamp)
df.set_index("date", inplace=True)

if str.lower(type) == "orderqueue":
## 提取ab1~ab50
df["ab"] = [
",".join([str(int(row["ab" + str(i + 1)])) for i in range(0, row["ab_items"])])
for timestamp, row in df.iterrows()
]
df = df.drop(columns=["ab" + str(i) for i in range(1, 51)])

type = get_library_name(type)
# arc.initialize_library(type, lib_type=CHUNK_STORE)
lib = arc[type]

symbol = "".join([exchange_place, code])
if symbol in lib.list_symbols():
print("update {0}, date={1}".format(symbol, date))
if df.empty == True:
return error_index_list
lib.update(symbol, df, chunk_size="D")
else:
print("write {0}, date={1}".format(symbol, date))
lib.write(symbol, df, chunk_size="D")
return error_index_list


def add_one_stock_daily_data_wrapper(filepath, type, exchange_place, index, date):
pid = os.getpid()
code = os.path.split(filepath)[-1].split(".csv")[0]
arc = Arctic(ARCTIC_SRV)
try:
if index % 100 == 0:
print("index = {}, filepath = {}".format(index, filepath))
error_index_list = add_one_stock_daily_data(filepath, type, exchange_place, arc, date)
if error_index_list is not None and len(error_index_list) > 0:
f = open(os.path.join(LOG_FILE_PATH, "temp_timestamp_error_{0}_{1}_{2}.txt".format(pid, date, type)), "a+")
f.write("{}, {}, {}\n".format(filepath, error_index_list, exchange_place + "_" + code))
f.close()

except Exception as e:
info = traceback.format_exc()
print("error:" + str(e))
f = open(os.path.join(LOG_FILE_PATH, "temp_fail_{0}_{1}_{2}.txt".format(pid, date, type)), "a+")
f.write("fail:" + str(filepath) + "\n" + str(e) + "\n" + str(info) + "\n")
f.close()

finally:
arc.reset()


def add_data(tick_date, doc_type, stock_name_dict):
pid = os.getpid()

if doc_type not in DOC_TYPE:
print("doc_type not in {}".format(DOC_TYPE))
return
try:
begin_time = time.time()
os.system(f"cp {DATABASE_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} {DATA_PATH}/")

os.system(
f"tar -xvzf {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} -C {DATA_PATH}/ {tick_date + '_' + doc_type}/SH"
)
os.system(
f"tar -xvzf {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} -C {DATA_PATH}/ {tick_date + '_' + doc_type}/SZ"
)
os.system(f"chmod 777 {DATA_PATH}")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SH")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SZ")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SH/{tick_date}")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SZ/{tick_date}")

print("tick_date={}".format(tick_date))

temp_data_path_sh = os.path.join(DATA_PATH, tick_date + "_" + doc_type, "SH", tick_date)
temp_data_path_sz = os.path.join(DATA_PATH, tick_date + "_" + doc_type, "SZ", tick_date)
is_files_exist = {"sh": os.path.exists(temp_data_path_sh), "sz": os.path.exists(temp_data_path_sz)}

sz_files = (
(
set([i.split(".csv")[0] for i in os.listdir(temp_data_path_sz) if i[:2] == "30" or i[0] == "0"])
& set(stock_name_dict["SZ"])
)
if is_files_exist["sz"]
else set()
)
sz_file_nums = len(sz_files) if is_files_exist["sz"] else 0
sh_files = (
(
set([i.split(".csv")[0] for i in os.listdir(temp_data_path_sh) if i[0] == "6"])
& set(stock_name_dict["SH"])
)
if is_files_exist["sh"]
else set()
)
sh_file_nums = len(sh_files) if is_files_exist["sh"] else 0
print("sz_file_nums:{}, sh_file_nums:{}".format(sz_file_nums, sh_file_nums))

f = (DATA_INFO_PATH / "data_info_log_{}_{}".format(doc_type, tick_date)).open("w+")
f.write("sz:{}, sh:{}, date:{}:".format(sz_file_nums, sh_file_nums, tick_date) + "\n")
f.close()

if sh_file_nums > 0:
# write线程不安全, update应该是安全的
Parallel(n_jobs=N_JOBS)(
delayed(add_one_stock_daily_data_wrapper)(
os.path.join(temp_data_path_sh, name + ".csv"), doc_type, "SH", index, tick_date
)
for index, name in enumerate(list(sh_files))
)
if sz_file_nums > 0:
# write线程不安全, update应该是安全的
Parallel(n_jobs=N_JOBS)(
delayed(add_one_stock_daily_data_wrapper)(
os.path.join(temp_data_path_sz, name + ".csv"), doc_type, "SZ", index, tick_date
)
for index, name in enumerate(list(sz_files))
)

os.system(f"rm -f {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)}")
os.system(f"rm -rf {DATA_PATH}/{tick_date + '_' + doc_type}")
total_time = time.time() - begin_time
f = (DATA_FINISH_INFO_PATH / "data_info_finish_log_{}_{}".format(doc_type, tick_date)).open("w+")
f.write("finish: date:{}, consume_time:{}, end_time: {}".format(tick_date, total_time, time.time()) + "\n")
f.close()

except Exception as e:
info = traceback.format_exc()
print("date error:" + str(e))
f = open(os.path.join(LOG_FILE_PATH, "temp_fail_{0}_{1}_{2}.txt".format(pid, tick_date, doc_type)), "a+")
f.write("fail:" + str(tick_date) + "\n" + str(e) + "\n" + str(info) + "\n")
f.close()


class DSCreator:
"""Dataset creator"""

def clear(self):
client = MongoClient(ARCTIC_SRV)
client.drop_database("arctic")

def initialize_library(self):
arc = Arctic(ARCTIC_SRV)
for doc_type in DOC_TYPE:
arc.initialize_library(get_library_name(doc_type), lib_type=CHUNK_STORE)

def _get_empty_folder(self, fp: Path):
fp = Path(fp)
shutil.rmtree(fp)
fp.mkdir(parents=True, exist_ok=True)

def import_data(self):
# clear all the old files
for fp in LOG_FILE_PATH, DATA_INFO_PATH, DATA_FINISH_INFO_PATH, DATA_PATH:
self._get_empty_folder(fp)

arc = Arctic(ARCTIC_SRV)
for doc_type in DOC_TYPE:
# arc.initialize_library(get_library_name(doc_type), lib_type=CHUNK_STORE)
arc.set_quota(get_library_name(doc_type), MAX_SIZE)
arc.reset()

# doc_type = 'Day'
doc_type = "Tick"
date_list = list(set([int(path.split("_")[0]) for path in os.listdir(DATABASE_PATH) if doc_type in path]))
date_list.sort()
date_list = [str(date) for date in date_list]

f = open(ALL_STOCK_PATH, "r")
stock_name_list = [lines.split("\t")[0] for lines in f.readlines()]
f.close()
stock_name_dict = {
"SH": [stock_name[2:] for stock_name in stock_name_list if "SH" in stock_name],
"SZ": [stock_name[2:] for stock_name in stock_name_list if "SZ" in stock_name],
}

lib_name = get_library_name(doc_type)
a = Arctic(ARCTIC_SRV)
# a.initialize_library(lib_name, lib_type=CHUNK_STORE)

stock_name_exist = a[lib_name].list_symbols()
lib = a[lib_name]
initialize_count = 0
for stock_name in stock_name_list:
if stock_name not in stock_name_exist:
initialize_count += 1
# A placeholder for stocks
pdf = pd.DataFrame(index=[pd.Timestamp("1900-01-01")])
pdf.index.name = "date" # an col named date is necessary
lib.write(stock_name, pdf)
print("initialize count: {}".format(initialize_count))
print("tasks: {}".format(date_list))
a.reset()

# date_list = [files.split("_")[0] for files in os.listdir("./raw_data_price") if "tar" in files]
# print(len(date_list))
date_list = ["20201231"] # for test
Parallel(n_jobs=min(2, len(date_list)))(
delayed(add_data)(date, doc_type, stock_name_dict) for date in date_list
)


if __name__ == "__main__":
fire.Fire(DSCreator)
Loading