I am building an algorithmic trading platform using Python. Multiple algorithms are monitoring the market and execute trades accordingly daily from 09:30 to 16:00.
What I'm looking for is to start and stop algorithms arbitrarily from a client. Therefore I want to have a server script running using multiprocessing and a client which can start/stop/list algorithms (which should run in separate process) at any given time.
Any examples of how this can be done? The majority of online examples are for queue servers, which do not seem to fit my problem.
EDIT:
I am trying to to this with the package multiprocessing. The idea of using a queue seems wrong to me, as I know an arbitrary number of processes will for a fact run for the whole day or at least until I say stop. I'm not trying to run a short script and let a worker consume the next job from a queue once the previous is done. Actually I'm thinking of having a server script using a Manager which will run forever and just start new scripts in separate processes/threads when requested. I would however, like to be able to send a stop signal to a process to kill it. I do have a feeling that I'm doing this kinda backwards :-) What I have is:
server.py:
import multiprocessing as mp
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from time import strftime
class Server(object):
def __init__(self, port=50000, authkey=''):
self.processes = {}
self._authkey = authkey
self.port = port
self.server = None
self.running = False
BaseManager.register('get_process', callable=lambda: self)
def start_server(self):
manager = BaseManager(address=('', self.port), authkey=self._authkey)
self.server = manager.get_server()
try:
self._logmessage("Server started")
self.running = True
self.server.serve_forever()
except (KeyboardInterrupt, SystemExit):
self.shutdown()
def start_process(self, mod, fcn, *args, **kwargs):
mod = __import__(mod, globals(), locals(), ['object'], -1)
key = "{0}.{1}".format(mod, fcn)
assert not key in self.processes, \
"Process named '%s' already exists" % key
p = Process(target=getattr(mod, fcn), name=mod, args=(None, ), kwargs=kwargs)
self._logmessage("Process '%s' started" % key)
p.start()
# p.join()
self.processes[key] = p
def stop_process(self, key):
self.processes[key].terminate()
del self.processes[key]
def get_processes(self):
return self.processes.keys()
def shutdown(self):
for child in mp.active_children():
child.terminate()
self.server.shutdown()
self.running = False
print "Shutting down"
def _logmessage(self, msg):
print "%s: %s" % (strftime('%Y-%m-%d %H:%M:%S'), msg)
if __name__ == '__main__':
server = Server(authkey='abc')
try:
server.start_server()
except (KeyboardInterrupt, SystemExit):
server.shutdown()
client.py:
from multiprocessing.managers import BaseManager
import time
class Client(object):
def __init__(self, host='', port=50000, authkey=''):
self.host = host
self.port = port
self.manager = None
self.process = None
self._type_id = 'get_process'
self._authkey = authkey
self.manager = BaseManager(address=(self.host, self.port), authkey=self._authkey)
BaseManager.register(self._type_id)
def connect(self):
try:
self.manager.connect()
self._logmessage("Connected to server")
except:
self._logmessage("Could not connect to server")
self.process = getattr(self.manager, self._type_id)()
def start_process(self, mod, fcn):
self.process.start_process(mod, fcn)
self._logmessage("Process '%s' started" % fcn)
def list_processes(self):
print self.process.get_processes()
@property
def connected(self):
return self.manager._state.value == self.manager._state.STARTED
def _logmessage(self, msg):
print "%s: %s" % (time.strftime('%Y-%m-%d %H:%M:%S'), msg)
def test(data):
while True:
print time.time()
time.sleep(1.)
if __name__ == '__main__':
from algotrading.server.process_client import Client
client = Client(authkey='abc')
client.connect()
client.start_process("algotrading.server.process_client", "test")
client.list_processes()
Check out Supervisord which allows for remote management of processes, plus automatic start/restart configurability.
Depending on your scalability and disaster-recovery needs, you may be thinking about distributing your "monitoring/trading processes" across running multiple servers. While supervisord is really only designed to manage a single machine, you could build a manager app which coordinates multiple servers, each running supervisord, via it's xml-rpc interface.
Cron or Celery could be used for your daily start/stop scheduling.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With