Request for Generic Websocket Data Plugin

To avoid hijack of AB 7 Wishlist thread, i am starting a separate thread.

is it possible to have a data plugin that will be a client that connects to a local websocket. The websocket will receive only RTD but use the AB local DB as storage?

So the idea is, many Brokers/Vendors provide data and they have client libraries as each is a bit custom. If there is a generic data plugin, it just has to connect to a local server. The onus of building the server program is on AB user. The plugin can have a format like the import files format.

I do not have a strong programming background and C++ is quite difficult to adapt for me.

@Tomasz After few hours going though ADK help about data plugins and looking at QT plugin code, it appears that the plugin requests data to http server each time. This would be completely opposite to websocket stream/broadcast.

Short guide to AmiBroker with QuoteTracker feed
For refresh interval it suggests 5-10 sec for intraday polling.

In next post, I have setup a small python script to simulate what could be a generic case. The format of the data is like format files specified for Imports.

A possible data structure if not simple plain text could be JSON, as each batch would have data for multiple symbols. How does QuoteEX() function handle that. I believe another function would do for each symbol in the batch of data received.

Of course, the idea is open to C++ enthusiasts as well, because currently it is out of my scope.
If such a plugin is useful for AB users, kindly comment incase its worth Tomasz putting his time into it. Thanks.

1 Like

ctrl+c will kill the running scripts, they are crude, but I think they get the point across. The client is supposed to work as the data plugin for AB.

pyServer.py

import asyncio
import websockets
import datetime

class PubSub:
    def __init__(self):
        self.waiter = asyncio.Future()

    def publish(self, value):
        waiter, self.waiter = self.waiter, asyncio.Future()
        waiter.set_result((value, self.waiter))

    async def subscribe(self):
        print(f"client sub.")
        waiter = self.waiter
        while True:
            value, waiter = await waiter
            yield value

    __aiter__ = subscribe


PUBSUB = PubSub()


async def handler(websocket):
    async for message in PUBSUB:
        try:
            await websocket.send(message)
        except websockets.ConnectionClosed:
            pass


async def broadcast(message):
    PUBSUB.publish(message)


async def broadcast_messages():
    while True:
        await asyncio.sleep(1)

        dt = datetime.datetime.now()
        ts      = f"{dt.strftime('%Y%m%d')},{dt.strftime('%H:%M:%S')}"
        message = f"SYM1,{ts},100,101,98,99,123,10\nSYM2,{ts},10,11,9,9.5,456,80\nSYM3,{ts},50,55,45,52,515,45"
        await broadcast(message)


async def main():
    print(f"Started RTD server:")
    async with websockets.serve(handler, "localhost", 8765):
        await broadcast_messages()


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop( loop )
    
    loop.run_until_complete( main() )
    loop.run_forever()

pyClient.py

import websockets
import asyncio

# with the server
async def ws_client():
    print("WebSocket: Client Connected.")
    url = "ws://127.0.0.1:8765"
    # Connect to the server

    async with websockets.connect(url) as ws:
        # await ws.send(f"{name}")

        # Stay alive forever, listen to incoming msgs
        while True:
            msg = await ws.recv()
            print(msg)


# Start the connection
asyncio.run(ws_client())

First run the server.py file and then the client.py file in a different terminal.

credits: Broadcasting messages - websockets 12.0 documentation

Here, the client.py file just prints blob of multiple ticks received but the intention is to populate local AB DB with real-time data

Because it is generic, it just tries to process data received and currently doesn't talk back to the server. Maybe if a whole structure is designed, then it could be standardised.

1 Like

this error appears:
image

Previous detailed post is pending Moderator approval. Thanks

When you use samples from ADK you need to use most recent version from gitlab and use CMAKE
It is essential to use CMAKE to get you project files configured correctly for installed compiler.

2 Likes

Thanks, it was a VS2022 thing.I had already included CMAKE tools component. was able to build x64 DLL with plugin UI working.
In the first method, clicking UI caused the crash.

For what it is worth, the QT plugin is for illustration purposes only. It does not do any trickery, it is plain simple and straightforward, as education examples should be. QT plugin makes requests to QT http sever each time because:

  • Qt http server is local and there is no internet traffic and local TCP/IP is fast
  • QT http server design was snapshot
  • All processing is done by Quote tracker so it is no processing left on http client side

The example goal was simplicity not complexity.

It can be used as starting point, not as full blown super hyper ready to use code. Still you can use this design almost without changes if only you do all the work in PYTHON

In your original post you mentioned that you already have Python code that does it all. Why don't you use it and expose http server locally the same way as QT did. Do all the work in Python in separate Python app exposing the server as you don't like C++, and on C++ plugin side don't do anything.

1 Like

Thanks for your post @Tomasz

As i mentioned, I have honestly never worked in C++. Python side is currently ok, and ALL brokers provide atleast a python Websocket client library. Currently, import is using OLE

  1. it makes AB UI lag due per sec import (some suggested RAM drive which I am using)
  2. OLE import works only for 1st instance, hence a bloated single DB

The idea with a Generic websocket is that it would consume data if "you" came up with a standard specification.

QT plugin requests per symbol( per second) to http server is not ideal unfortunately.

You will not believe in last 5 days i studied your open source IB plugin, You have used sockets there. People will not be able to appreciate the amount of work you put into IB plugin. So many functions and interfaces. I've failed to a proper code review yet :slight_smile:

Nowadays, Vendors are becoming very expensive and limiting for data and Brokers aren't charging for API/Websocket data.
If AB could have a generic say JSON format with Array of ticks, plugin would iterate through each and update respective ticker.
The server side could even subscribe multiple different sockets and pump all into one as well or different socket for multi AB instance etc.

1 Like

In that sense, even this generic websocket based data plugin wont be doing much,
connect to socket server, receive broadcast, if lets say its arrays of ticks, just iterate and feed AB. No real need for data processing. The Date/time/fields etc all to a common spec.

Updated the broadcast function for a more complete JSON tick record to simulate testing.

def r(l=1,u=9): return random.randint(l, u)

async def broadcast_messages():
    global sleepT, tf
    s1=s2=s3=0
    
    while True:
        await asyncio.sleep( sleepT )    #simulate ticks in seconds
        s1+=r(1,3); s2+=r(1,3); s3+=r(1,3) #total vol
        dt   = datetime.datetime.now()

        t    = dt.hour*10000 + int( dt.minute/tf )*tf*100
        d    = int( dt.strftime('%Y%m%d'))
        ## // 'n', 'd', 't', 'o', 'h', 'l', 'c', 'v', 'oi', 's','pc','bs','bp','as','ap' (s=total vol, pc=prev day close bs,bp,as,ap=bid ask )
        data = [{"n": "SYM1", "t":t, "d":d, "c": r(1,9), "o": r(1,9), "h": 9, "l": 1, "v": r(1,9), "bp": r(1,5), "ap": r(5,9), "s": s1,"bs":1,"as":1,"pc":1}
               ,{"n": "SYM2", "t":t, "d":d, "c": r(10,19), "o": r(10,19), "h": 19, "l": 10, "v": r(10,19), "bp": r(10,15), "ap": r(15,19), "s": s2}
               ,{"n": "SYM3", "t":t, "d":d, "c": r(20,29), "o": r(20,29), "h": 29, "l": 20, "v": r(1,9), "bp": r(20,25), "ap": r(25,29), "s": s3}]
        
        await broadcast( json.dumps( data, separators=(',', ':'))); # print(f'test')