WsRtd - Websocket-Json standard Data plugin - Test Release

Just use update method with "prune_previous_versions=True" as an argument to method.

I have sent you details to join the test program

2 Likes

It would be better to write() for a new symbol, and use append() for subsequent insertions.
Then prune is not required.

1 Like

thank a lot for answer and you wonderful development. :folded_hands: :folded_hands:

1 Like

Thanks for this great plugin and RTD solution @nsm51 . Could you please share the latest plugin DLL so I can try this? I have RT data saved to clickhouse and want to use this plugin to connect AB to the DB

Hi
I upgrade my Amibroker X 64 to version 6.3 so now I can try to use your plugin . And I have to say thank you for your offer to the Amibroker community.

I have no idea about Python code so I ask ChatGpt, and after 5 day trying I have not luck.
I ask for a python script to receive data for the ticker BTCUSD as this ticker working 24 hours. They have a plugin to fetch data form this TVwebsite.

Can someone give me an easy example to understand what i need to send data to Rtd_Ws_AB_plugin ?

This is the console output from below script

✅ Received Bar:
symbol    Coinbase:BTCUSD
open            104641.03
high             104652.9
low              104639.6
close            104652.9
volume           0.459209
Name: 2025-06-20 01:25:00, dtype: object
📤 Sent to clients: [{"ticker":"BTCUSD","d":20250620,"t":12500,"o":104641.03,"h":104652.9,"l":104639.6,"c":104652.9,"v":0.45921}]

And this is the Python script: Runs every 10 second

# tv_to_ws_btcusd_V5.py
## pip install --upgrade --no-cache-dir git+https://github.com/rongardF/tvdatafeed.git

import asyncio
import json
from datetime import datetime, timezone
from websockets import serve
from tvDatafeed import TvDatafeed, Interval
import pandas as pd

clients = set()

tv = TvDatafeed(username='boutalas', password='tv12345')

## Retrieves the latest bar (n_bars=1) from TV View and converts it into a JSON-compatible dictionary.
def get_bar():
    try:
        bars = tv.get_hist("BTCUSD", "Coinbase", interval=Interval.in_1_minute, n_bars=1)
        bar = bars.iloc[-1]         ## Means: take the last row (i.e., the latest bar).

        print("✅ Received Bar:")
        print(bar)

        if not isinstance(bar, pd.Series):
            return None

        dt = pd.to_datetime(bar.name).tz_localize('UTC')
        date_int = int(dt.strftime('%Y%m%d'))
        time_int = int(dt.strftime('%H%M%S'))

        ## is this return has to be matrix return [ { ... } ]  or just return { ... }  ?

         # Return as a list containing a single JSON object
        return [{
            "ticker": "BTCUSD",  # <- I know this might need to be "n"?
            ## "datetime": dt.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "d": date_int,
            "t": time_int,
            "o": round(float(bar.open), 5),
            "h": round(float(bar.high), 5),
            "l": round(float(bar.low), 5),
            "c": round(float(bar.close), 5),
            "v": round(float(bar.volume), 5)
        }]

    except Exception as e:
        print("❌ Error in get_bar:", e)
        return None

async def handler(websocket, path):
    print("📡 AmiBroker connected.")
    clients.add(websocket)
    try:
        await websocket.wait_closed()
    finally:
        clients.remove(websocket)
        print("🔌 AmiBroker disconnected.")


## Every 10 seconds: Retrieves a new bar and converts it to a JSON string.
## Sends it to all connected clients. If no bar is received, it shows a warning and waits 10 seconds.
async def broadcast():
    while True:
        bar = get_bar()
        if bar:
            message = json.dumps(bar, separators=(',', ':')) + '\n'     ## <-- no spaces, newline-ended
            print("📤 Sent to clients:", message.strip())
            await asyncio.gather(*(client.send(message) for client in clients))
        else:
            print("⚠️ Skipping bar due to error")
        await asyncio.sleep(10)


## Start the WebSocket server
async def main():
    async with serve(handler, "127.0.0.1", 10102):
        print("✅ WebSocket Server started on ws://127.0.0.1:10102")
        await broadcast()

if __name__ == "__main__":
    asyncio.run(main())

with that setting

1 Like

Thanks for the kind words.

1 Like

@PanoS
Few things are not optimal as you mixed relay code with sample server.

I have written it as quick fix for now and works with AB+WsRtd.

# tv_to_ws_btcusd_V6.py ## modified by NSM51 (suboptimal quick fix)
## pip install --upgrade --no-cache-dir git+https://github.com/rongardF/tvdatafeed.git

import asyncio
import json
from datetime import datetime, timezone
from websockets import serve
from tvDatafeed import TvDatafeed, Interval
import pandas as pd

clients = set()

tv = TvDatafeed(username='boutalas', password='tv12345')

## Retrieves the latest bar (n_bars=1) from TV View and converts it into a JSON-compatible dictionary.
def get_bar():
    try:
        bars = tv.get_hist("BTCUSD", "Coinbase", interval=Interval.in_1_minute, n_bars=1)
        bar = bars.iloc[-1]         ## Means: take the last row (i.e., the latest bar).

        print("✅ Received Bar:")
        print(bar)

        if not isinstance(bar, pd.Series):
            return None

        dt = pd.to_datetime(bar.name).tz_localize('UTC')
        date_int = int(dt.strftime('%Y%m%d'))
        time_int = int(dt.strftime('%H%M%S'))

        ## is this return has to be matrix return [ { ... } ]  or just return { ... }  ?

         # Return as a list containing a single JSON object
        return [{"n": "BTCUSD","d": date_int,"t": time_int,"o": round(float(bar.open), 5),"h": round(float(bar.high), 5),
            "l": round(float(bar.low), 5),"c": round(float(bar.close), 5), "v": round(float(bar.volume), 5)
        }]

    except Exception as e:
        print("❌ Error in get_bar:", e)
        return None

async def handler(websocket, path):
    print("📡 AmiBroker connected.")
    clients.add(websocket)
    try:
        await websocket.wait_closed()
    finally:
        clients.remove(websocket)
        print("🔌 AmiBroker disconnected.")


## Every 10 seconds: Retrieves a new bar and converts it to a JSON string.
## Sends it to all connected clients. If no bar is received, it shows a warning and waits 10 seconds.
async def broadcast():
    while True:
        bar = get_bar()
        if bar:
            message = json.dumps(bar, separators=(',', ':'))     ## <-- no spaces, newline-ended
            print( f"📤 Sent to clients:{message}\n" )
            #await asyncio.gather(*(client.send(message) for client in clients))
            for client in clients:
                try:
                    await client.send( message )
                except Exception as e:
                    print(e)
        else:
            print("⚠️ Skipping bar due to error")
        await asyncio.sleep(10)


async def start_ws_server( aport ):
    global tCount, incSym
    tf = 1; tCount = 1; incSym = 1
    print( f"Started RTD server: port={aport}, tf={tf}min, sym_count={tCount}, increment_sym={incSym}" )

    async with serve( handler, "localhost", aport ):
        await broadcast()
    
    return


## Start the WebSocket server
async def main():
    global wsport   #wsport1,2,3
    wsport = 10102
    await asyncio.gather( start_ws_server( wsport ) )   ## more tasks


if __name__ == "__main__":
    asyncio.run(main())

updated plugin to handle the float volume a better, in some case the record is skipped.

2 Likes

@panos
your TV account may risk getting blocked and it is way bad than sub-optimal.
And, change the password as well.

tv.get_hist("...") will fetch the history, upto 5000 bars. Use this for backfill, but fetching so many bars every 10 secs is overloading the server.
You need to tweak the pandas object to json-hist format for WsRtd Backfill.

For RTD, use:
tvl = TvDatafeedLive(username, password)

You can then use the json-RTD format for live ticks. You may need to build minute bars if this return the LTP.
See py sample class on github.

The plugin is quite strict for performance reasons, if format or order is not adhered, the record is skipped.

2 Likes

Hi
thank you . thank you . thank you

I am NOT going to use this Project.. But for me was a nice journey to learn little bit of Python.

This is a good stuff .... a working code

We modified the script so that it first downloads and sends the historical data (backfill) for the previous day for each symbol,
and then starts the live loop, sending new bars every 10 seconds.

  1. The backfill should be sent ONLY after at least one client is connected (e.g., the AmiBroker plugin).
  2. Before starting the backfill, we wait until at least one client is connected (while not clients: check).
    3)If no client is connected, the script pauses (await asyncio.sleep(1)) and checks again.
  3. The same applies to the live broadcast — data is only sent if clients are connected.
    This script handles everything together:
    WebSocket server + TView data fetch + JSON formatting + broadcasting.

# tv_broadcast_server_backfill.py
# https://forum.amibroker.com/t/wsrtd-websocket-json-standard-data-plugin-test-release/39640/156

import asyncio
import websockets
import json
from datetime import datetime, timedelta
from tvDatafeed import TvDatafeed, Interval

tickers = ['GER30', 'NAS100', 'SPX500']
exchange = 'FX'
tv = TvDatafeed()
clients = set()

async def handler(websocket, path):
    print("📡 AmiBroker connected.")
    clients.add(websocket)
    try:
        await websocket.wait_closed()
    finally:
        clients.remove(websocket)
        print("🔌 AmiBroker disconnected.")

async def send_message(message):
    if clients:
        await asyncio.gather(*(client.send(message) for client in clients))
    else:
        print("⏳ No clients connected, message skipped.")

def format_bar(ticker, dt, bar):
    date_int = int(dt.strftime("%Y%m%d"))
    time_int = int(dt.strftime("%H%M%S"))
    return [{
        "n": ticker,
        "d": date_int,
        "t": time_int,
        "o": round(float(bar.open), 5),
        "h": round(float(bar.high), 5),
        "l": round(float(bar.low), 5),
        "c": round(float(bar.close), 5),
        "v": int(bar.volume)
    }]

async def send_backfill():
    print("🔄 Waiting for at least one client to connect before sending backfill...")
    while not clients:
        await asyncio.sleep(1)

    print("✅ Client connected, sending backfill data...")
    end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
    start_date = end_date - timedelta(days=1)

    for ticker in tickers:
        try:
            df = tv.get_hist(symbol=ticker, exchange=exchange, interval=Interval.in_1_minute, n_bars=1440)
            if df is None or df.empty:
                print(f"⚠️ No backfill data for {ticker}")
                continue

            df_filtered = df[(df.index >= start_date) & (df.index < end_date)]
            if df_filtered.empty:
                print(f"⚠️ No backfill data in date range for {ticker}")
                continue

            for idx, row in df_filtered.iterrows():
                bar = format_bar(ticker, idx.to_pydatetime(), row)
                message = json.dumps(bar) + '\n'
                print(f"📤 Sending backfill: {bar}")
                await send_message(message)
                await asyncio.sleep(0.01)

        except Exception as e:
            print(f"❌ Error backfill {ticker}: {e}")

async def broadcast_live():
    while True:
        if clients:
            for ticker in tickers:
                try:
                    df = tv.get_hist(symbol=ticker, exchange=exchange, interval=Interval.in_1_minute, n_bars=1)
                    if df is None or df.empty:
                        print(f"⚠️ No data for {ticker}")
                        continue

                    bar = df.iloc[-1]
                    dt = bar.name.to_pydatetime()
                    bar_data = format_bar(ticker, dt, bar)
                    message = json.dumps(bar_data) + '\n'
                    print(f"📤 Broadcasting live: {bar_data}")
                    await send_message(message)
                except Exception as e:
                    print(f"❌ Error live {ticker}: {e}")
        else:
            print("⏳ Waiting for clients to connect before broadcasting live data.")

        await asyncio.sleep(10)

async def main():
    async with websockets.serve(handler, "127.0.0.1", 10102):
        print("✅ WebSocket Server started on ws://127.0.0.1:10102")
        await send_backfill()
        await broadcast_live()

if __name__ == "__main__":
    asyncio.run(main())

1 Like

Hello @nsm51
Came across your post recently and am very glad that a standard WS Data Plugin is initiated. I have been writing some Py codes using WS and HTTP Backfil URL using Dhan Broker data.

How can I test the standard plugin you developed?
Where/How to download WsRTD.dll ?

Thank you for your work and detailed explanations in this thread. Much appreciated.

1 Like

Thanks. I've sent you a DM for testing.

@nsm51 Well done :clap:,thanks for sharing

1 Like

same here , how to download WsrtD.dll file. In GitHub .dll file not available

Same here, I am also trying for how to download WsrtD.dll file. In GitHub .dll file not available Can you share me the link

Thanks for sharing :smiling_face:

Hi Sandeep do you got the DLL Link if yes can you please send me I am also working on Dhan

I am also waiting.

Even though the documentation is as elaborate as possible, I'm having to reply/guide almost everyone in the test program.
Thats why it is not public yet, but a message to join is welcome.
The last build was back in February, I think.

The goal was to have users put out their client-APP codes to integrate a majority og brokers/vendors but there isn't much progress on it.

1 Like

Just an update. I have been building elsewhere for my infrastructure but I hope to have something available soon for both IBKR and ccxt, which should allow must data sources. Looking around, IBKR data seems cheaper than most. I'll fork on GitHub when I'm ready, then submit a push.

2 Likes