作为全网功能强大、灵活易用的量化交易平台,发明者量化交易平台虽然使用门槛非常低,机器人程序占用资源很少。但是我们依然希望,让机器人需要运行的时候启动,不需要运行时停止。
例如,在做商品期货程序化、量化交易时,非开市时间占全天的绝大部分。这样我们就非常希望只让机器人在开市时间运行,每天只开市时间运行多节省费用,想想都激动。为了这个需求,我们可以使用Python语言编写一个在FMZ量化交易平台上运行的策略机器人,让这个机器人通过
发明者量化交易平台的扩展API接口,定时控制机器人的启动和停止。
其实整个代码非常简单,调用发明者量化交易平台扩展API接口的例子直接可以使用
地址: https://www.fmz.com/api#简单的例子
直接使用例子中的函数:def api(method, *args)
我们需要调用的接口也很简单,用到以下两个接口(在FMZ文档上可以查询到)
id,这样调用即可:api(\'RestartRobot\', id)ID。机器人ID查看、获取,截图:

调用发明者量化交易平台扩展API需要使用发明者量化交易平台的API KEY。
可以在账户管理中生成自己的API KEY。
我们把API KEY作为策略的参数传入。
获取发明者量化交易平台账号API KEY截图:
![]()
剩下的就是编写定时逻辑了,也非常简单,设置定时参数:
[\"175708,14:55:33-15:10:33\", ...]
省略号代表可以设置更多的类似\"175708,14:55:33-15:10:33\"的设置。
定时参数是一个JSON字符串,策略代码中会对其解析,解析为一个列表。列表中每个元素就是一组机器人启动/停止设置。
即:
\"175708,14:55:33-15:10:33\"
其中以逗号间隔。逗号之前的部分175708为机器人ID,逗号后面的部分为启动时间/停止时间。
如上例子,是ID为:175708的机器人,14:55:33启动,15:10:33停止。
然后策略中不停轮寻,每次轮寻先获取当前时间,然后根据当前时间和定时时间对比,判断是否需要触发启动或者停止机器人。
如果触发了,就调用api(\’RestartRobot\’, id)或者api(\’StopRobot\’, id) ,来启动、停止机器人。
完整的策略代码:
# -*- coding: utf-8 -*-
import time
import json
try:
import md5
import urllib2
from urllib import urlencode
except:
import hashlib as md5
import urllib.request as urllib2
from urllib.parse import urlencode
def api(method, *args):
d = {
\'version\': \'1.0\',
\'access_key\': accessKey,
\'method\': method,
\'args\': json.dumps(list(args)),
\'nonce\': int(time.time() * 1000),
}
d[\'sign\'] = md5.md5((\'%s|%s|%s|%d|%s\' % (d[\'version\'], d[\'method\'], d[\'args\'], d[\'nonce\'], secretKey)).encode(\'utf-8\')).hexdigest()
return json.loads(urllib2.urlopen(\'https://www.fmz.com/api/v1\', urlencode(d).encode(\'utf-8\')).read().decode(\'utf-8\'))
RobotParams = json.loads(strRobotParams)
def main():
global RobotParams
arrParams = []
nowDay = 0
strPush = \"\"
if isPushMsg:
strPush = \"@\"
for i in range(len(RobotParams)):
param = {}
arr = RobotParams[i].split(\",\")
if len(arr) != 2:
raise Exception(\"字符串配置错误:分隔符号,\")
param[\"id\"] = arr[0]
param[\"isProcessOpenThisDay\"] = False
param[\"isProcessCloseThisDay\"] = False
arr = arr[1].split(\"-\")
if len(arr) != 2:
raise Exception(\"字符串配置错误:分隔符号-\")
begin = arr[0]
arrBegin = begin.split(\":\")
if len(arrBegin) != 3:
raise Exception(\"字符串配置错误:起始时间分隔符号:\")
param[\"begin\"] = {}
param[\"begin\"][\"hour\"] = float(arrBegin[0])
param[\"begin\"][\"min\"] = float(arrBegin[1])
param[\"begin\"][\"sec\"] = float(arrBegin[2])
end = arr[1]
arrEnd = end.split(\":\")
if len(arrEnd) != 3:
raise Exception(\"字符串配置错误:结束时间分隔符号:\")
param[\"end\"] = {}
param[\"end\"][\"hour\"] = float(arrEnd[0])
param[\"end\"][\"min\"] = float(arrEnd[1])
param[\"end\"][\"sec\"] = float(arrEnd[2])
arrParams.append(param)
# 测试
Log(\"输出参数\", arrParams, \"#FF0000\")
while True:
nowTime = time.localtime(time.time())
nowHour = nowTime.tm_hour
nowMin = nowTime.tm_min
nowSec = nowTime.tm_sec
tbl = {
\"type\" : \"table\",
\"title\" : \"msg\",
\"cols\" : [\"id\", \"begin\", \"end\", \"今天是否执行过启动\", \"今天是否执行过停止\"],
\"rows\" : []
}
for i in range(len(arrParams)):
tbl[\"rows\"].append([arrParams[i][\"id\"], json.dumps(arrParams[i][\"begin\"]), json.dumps(arrParams[i][\"end\"]), arrParams[i][\"isProcessOpenThisDay\"], arrParams[i][\"isProcessCloseThisDay\"]])
if nowDay != nowTime.tm_mday:
arrParams[i][\"isProcessOpenThisDay\"] = False
arrParams[i][\"isProcessCloseThisDay\"] = False
if arrParams[i][\"isProcessOpenThisDay\"] == False:
if nowTime.tm_hour == arrParams[i][\"begin\"][\"hour\"] and nowTime.tm_min >= arrParams[i][\"begin\"][\"min\"] and nowTime.tm_sec >= arrParams[i][\"begin\"][\"sec\"]:
ret = api(\'RestartRobot\', int(arrParams[i][\"id\"]))
arrParams[i][\"isProcessOpenThisDay\"] = True
Log(\"机器人ID:\", arrParams[i][\"id\"], \"执行启动,请登录平台检查是否启动成功\", \"扩展API返回值:\", ret, strPush)
if arrParams[i][\"isProcessCloseThisDay\"] == False:
if nowTime.tm_hour == arrParams[i][\"end\"][\"hour\"] and nowTime.tm_min >= arrParams[i][\"end\"][\"min\"] and nowTime.tm_sec >= arrParams[i][\"end\"][\"sec\"]:
ret = api(\'StopRobot\', int(arrParams[i][\"id\"]))
arrParams[i][\"isProcessCloseThisDay\"] = True
Log(\"机器人ID:\", arrParams[i][\"id\"], \"执行停止,请登录平台检查是否停止成功\", \"扩展API返回值:\", ret, strPush)
if nowDay != nowTime.tm_mday:
nowDay = nowTime.tm_mday
LogStatus(_D(), nowTime, \"\\n`\" + json.dumps(tbl) + \"`\")
Sleep(500)

![]()
截图
![]()
被这个策略操作的机器人:
![]()
策略地址:https://www.fmz.com/strategy/184600
作为抛砖引玉,发明者量化交易平台的扩展API还强大的很,用这些扩展API基于FMZ平台做一个自己的量化交易平台都完全没问题。
这个定时机器人设计的比较简单,只是无脑到时间开启,到时间停止,并没有加入启动是否成功,检验,异常重试,等等机制,有兴趣的可以增加功能,扩展。
策略仅供学习参考
买好币上币库:https://www.kucoin.com/r/1f7w3