Setting up DDE universal data Plugin with Python script as Server

Hello Everyone!
I continue creating data stream solution from Binance to Amibroker that was discussed here many times.
Streaming from a file generated by Python worked well except for "Permission denied error" on the Python side. I think is because Amibroker tries to read it same time Python write attempts.
So next upgrade solution would be to use DDE plugin.
After reading readme and thread1 and thread2 I run my python code:

import time
import win32ui, dde
from pywin.mfc import object as O_
import datetime
from datetime import datetime, timezone, timedelta


class DDETopic(O_.Object):
    def __init__(self, topicName):
        self.topic = dde.CreateTopic(topicName)
        O_.Object.__init__(self, self.topic)
        self.items = {}

    def setData(self, itemName, value):
        try:
            self.items[itemName].SetData(value)
            print(itemName+" "+value)
        except KeyError:
            print("error ")
            if itemName not in self.items:
                self.items[itemName] = dde.CreateStringItem(itemName)
                self.topic.AddItem( self.items[itemName] )
                self.items[itemName].SetData( str(value) )

serv_name='binance_'
t_name='BTCUSDT'

ddeServer = dde.CreateServer()
ddeServer.Create(serv_name)
#or serv_name+"|"+t_name+"!"+'Open' not working
ddeTopic1 = DDETopic(t_name+'!'+'Open')
ddeTopic2 = DDETopic(t_name+'!'+'High')
ddeTopic3 = DDETopic(t_name+'!'+'Low')
ddeTopic4 = DDETopic(t_name+'!'+'Last')
ddeTopic5 = DDETopic(t_name+'!'+'Time')
ddeTopic6 = DDETopic(t_name+'!'+'Volume')
ddeServer.AddTopic(ddeTopic1)
ddeServer.AddTopic(ddeTopic2)
ddeServer.AddTopic(ddeTopic3)
ddeServer.AddTopic(ddeTopic4)
ddeServer.AddTopic(ddeTopic5)
ddeServer.AddTopic(ddeTopic6)

while True:
    dat1='30000'
    dt_string = datetime.now().strftime("%H%M")#%H%M%S or %H:%M:%S  not working
    ddeTopic1.setData(t_name+'!'+'Open', dat1)
    ddeTopic2.setData(t_name+'!'+'High', dat1)
    ddeTopic3.setData(t_name+'!'+'Low', dat1)
    ddeTopic4.setData(t_name+'!'+'Last', dat1)
    ddeTopic5.setData(t_name+'!'+'Time', dt_string)
    ddeTopic6.setData(t_name+'!'+'Volume', '1')

    win32ui.PumpWaitingMessages(0, -1)
    print('running')
    time.sleep(3)


Above Python script output results:
DDE_2

and same time Amibroker not responding untill I close the Python script.

In the official docs every definition must consist of SERVER|TOPIC!ITEM - do I have to add SERVER| to the Topic?


Whatever I change it did not work.
Can anyone help plese?
Amibroker 6.18.0, DDE plugin version is 1.2.2.

DDE plugin version is 1.4.0

No, it is because of your Python not being able to write.
What you should do is to write a file under NEW name (not used) and use shareDenyNone flag and after writing is complete you should RENAME file to final name. Renaming is atomic, while writing not.

Regarding DDE:

  1. the LastPrice and/or Last Size and/or Volume must be changing
  2. Time field must include ENTIRE timestamp YYYY-MM-DD HH:MM:SS

Hello Tomasz!
Thank you for shareDenyNone flag! Ill google how to set it to new file and then rename it!

Regarding DDE:
I changed the code (lower part):

vol=1
dt=30000
dat1=str(dt)
while True:
    
    if dt==30000:
        dt=30001
    else:
        dt=30000
    last=str(dt)
    vol=vol+1
    dt_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")#%H%M%S or %H:%M:%S  not working
    ddeTopic1.setData(t_name+'!'+'Open', str(30000))
    ddeTopic2.setData(t_name+'!'+'High', str(30010))
    ddeTopic3.setData(t_name+'!'+'Low', str(29999))
    ddeTopic4.setData(t_name+'!'+'Last', last)
    ddeTopic5.setData(t_name+'!'+'Time', dt_string)
    ddeTopic6.setData(t_name+'!'+'Volume', str(vol))

    win32ui.PumpWaitingMessages(0, -1)
    print('running')
    time.sleep(1)

So now it does as per your recommendations:
DDE_5


But still - Amibroker is not responding, but if I close Python - its back ok.

Adding to what Tomasz wrote,

With newer Windows, DDE is considered bad (malicious) like other things such as, Flash, etc. Honestly, DDE is obsolete, as it does not offer "sustainable" solution. It is available only in form of backward compatibility.

Since, you want to give Pythonic touch, my suggestion for you would be to use ODBC/SQL Universal Data/AFL plugin instead. Using Python you can feed a SQL Express Server, then from AmiBroker connect to that DB to enjoy both the worlds of Data and AFL plugin. That would not only be convenient but also open several effective, flexible doors for you.

Otherwise, if you are willing to invest some time to dive deep, then writing a DLL data-plugin is not that difficult either. Yes, one would need basic knowledge of C++, C# or Rust. The process is explained in ADK (with C++ examples), it is as simple as fitting two ends of a pipe! Assign respective ADK structures with your corresponding datafeed API (mostly WSS), abide by the rules as explained in it, that's it!

1 Like

Thanks a lot for ODBC/SQL! I come to same ideas to investigate it. Regarding ADK - I also consider to dive deep there but later. I remember Tomasz recomendations - to use Ami standard options before goint into ADK C++ works.

1 Like

You're welcome!

It's pretty straight forward!

Binance API is pretty juicy + good doc.

And you've pyodbc for C, R, U (most of the times) and D (bad ticks if any), rest AB's configured ODBC plugin gives O₂ to your breathing charts, eh! :wink:

Thanks a lot Cougar!
(A bit offtopic) Can you advise as well how would you do Amibroker-binance trading solution?I read some few threads here and not sure if anyone succeded to send any order to binance directly from ami. Additionally my trading system also needs to send up to 50-70 orders at once(trading at 1m tframe). I can do it from python script for example. My idea was to use odbc or write/read by files. Can you recommend your vision on that?

to send orders directly from AFL you'd need to compute a cryptographic hash and signature of your request

I've done it in the past from AFL but with external helper utility

also, if you are not aware - there are some plugins for crypto/Binance out there (some of them free), have you tried them already ?

1 Like

Binance provides an official Python connector. Before anything else, you must be thorough with given examples in that repo.


ODBC is for data (consider these examples for feeding SQL Express), that's got nothing to do with order placements (consider these examples to trigger orders).


There are many ways!

To keep things pythonic, simplest & effective implementation can be done by creating an Asynchronous server using FastAPI segregating all your intended tasks (such as place new, cancel, modify orders) onto respective routes. The server is set to run on uvicorn, like so:

uvicorn.run( 'myBinanceApp:app', host = '127.0.0.1', port = 5000 )

Here, the server runs on localhost:5000, so, you could now basically hit http://localhost:5000/<specify your task route> with respective URL-params (specifying Symbol, Quantity, Price, OrderType, etc). Each of those routes are assigned to capture URL-params for its corresponding Request (GET, POST...) to Binance (or any other) API Endpoints.
For example, a new Order route's structure would somewhat look like:

from binance.websocket.spot.websocket_api import SpotWebsocketAPIClient

from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse, HTMLResponse
from starlette.middleware.sessions import SessionMiddleware

app = FastAPI()
app.add_middleware( SessionMiddleware, secret_key = 'mysecret' )

@app.get( '/' )
async def index( request: Request ):
    return HTMLResponse( 'Hello there! server is up & running...' )

@app.get( '/newOrder' )
async def order( request: Request ):
    symbol   = request.query_params.get( 'symbol' )
    side     = request.query_params.get( 'side' )
    type     = request.query_params.get( 'type' )
    quantity = request.query_params.get( 'quantity' )
    price    = request.query_params.get( 'price' )

    my_client = SpotWebsocketAPIClient(
        stream_url  = "wss://testnet.binance.vision/ws-api/v3",
        api_key     = api_key,
        api_secret  = api_secret,
        on_message  = message_handler,
        on_close    = on_close,
    )

    my_client.new_order(
        id               = id,
        symbol           = symbol,
        side             = side,
        type             = type,
        timeInForce      = "GTC",
        quantity         = quantity,
        price            = price,
        newClientOrderId = generateSeqOrderNum(),
        newOrderRespType = "RESULT",
    )

Once your server is up & running, upon satisfying trade logics (discretionally or systematically automated) AmiBroker places new order(s) through InternetPostRequest( url ), where:

url = StrFormat( "http://localhost:5050/newOrder?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
	"symbol",symbol,
    "side",side,
    "type",type,
    "quantity",quantity,
    "price",price
);

After few months once you become fluent, using NGNIX you may host that Python-based backend server (maybe along with a ReactJS based frontend interface) to AWS or anything (:heart: Linode) for 24/7 anytime, anywhere access. Then, you may place order to https://zoringer.com/newOrder from AmiBroker relaxing on a Greece beach.

peace-laptop

P.S. Before jumping onto coding, writing algorithms (in pen-n-paper) is essential. It is also necessary to test all API (functionalities + routes) in Postman beforehand. That would substantially help you plot a flawless map!

2 Likes

Not directly from Amibroker, but in the end, what I did was use calls for historic data via the REST API, and then import that using an .abb Amibroker batch file, via ASCII. I did explore using websocket to stream and continuously update a file, but from recall I think I ran into a similar problem as you with file access. Thanks @Tomasz for mentioning that flag, I’m going to look into that.

What I then did was run a batch .abb file to run an exploration periodically (e.g 5mins) and capture the various arrays in ‘AddColumn’ then in the same batch, export to something like \binanceOrders.CSV.

Then, again using the same batch, I execute a Python script which uses the pandas library from_csv function to access the arrays for the order, and then use a ready made function from the Python Binance connector to fire them at Binance. I had a websocket listen for when the response was filled so I could immediately throw a stop loss in, as Binance didn’t allow bracket orders.

Hi Zoringer. I'm just wondering whether you ever got this to work?

I've been trying and I'm having the same issues, and I cant get Amibroker to see the server I am creating.
I took some snippets of your code and modified it a little to get the websochet data;

import websocket
import json
import pandas as pd
from datetime import datetime
import win32ui, dde
from pywin.mfc import object as O_
import time


def on_open(_wsa):
    data = dict(
        method='SUBSCRIBE',
        id=1,
        params=["btcusdt@kline_1m"]
    )
    _wsa.send(json.dumps(data))


def on_message(_wsa, data):
    try:
        df = pd.read_json(data, orient='index')
        timeOfCall = (df[0]['E'])
        normTimeOfCall = str(datetime.fromtimestamp(timeOfCall / 1000))
        timeval= normTimeOfCall
        data2 = (df[0]['k'])
        df2 = pd.DataFrame(data2, index=[0])
        sym= df2['s'][0]
        op= (df2['o'][0])
        hi= (df2['h'][0])
        lo= (df2['l'][0])
        cl= (df2['c'][0])
        vol= (df2['v'][0])
        serverCreator(timeval,sym,op,hi,lo,cl,vol)


    except Exception as e:
        print(f'Error on Message: {e}')

class DDETopic(O_.Object):
    def __init__(self, topicName):
        self.topic = dde.CreateTopic(topicName)
        O_.Object.__init__(self, self.topic)
        self.items = {}

    def setData(self, itemName, value):
        try:
            self.items[itemName].SetData(value)
            print(itemName+" "+value)
        except KeyError:
            print("error ")
            if itemName not in self.items:
                self.items[itemName] = dde.CreateStringItem(itemName)
                self.topic.AddItem( self.items[itemName] )
                self.items[itemName].SetData( str(value) )

serv_name='binance_'
ddeServer = dde.CreateServer()
ddeServer.Create(serv_name)

def serverCreator(timeval,sym,op,hi,lo,cl,vol):
    t_name=sym
    #or serv_name+"|"+t_name+"!"+'Open' not working
    ddeTopic1 = DDETopic(t_name+'!'+'Open')
    ddeTopic2 = DDETopic(t_name+'!'+'High')
    ddeTopic3 = DDETopic(t_name+'!'+'Low')
    ddeTopic4 = DDETopic(t_name+'!'+'Last')
    ddeTopic5 = DDETopic(t_name+'!'+'Time')
    ddeTopic6 = DDETopic(t_name+'!'+'Volume')
    ddeServer.AddTopic(ddeTopic1)
    ddeServer.AddTopic(ddeTopic2)
    ddeServer.AddTopic(ddeTopic3)
    ddeServer.AddTopic(ddeTopic4)
    ddeServer.AddTopic(ddeTopic5)
    ddeServer.AddTopic(ddeTopic6)
    while True:
        ddeTopic1.setData(serv_name+'|'+t_name + '!' + 'Open', op)
        ddeTopic2.setData(serv_name+'|'+t_name + '!' + 'High', hi)
        ddeTopic3.setData(serv_name+'|'+t_name + '!' + 'Low', lo)
        ddeTopic4.setData(serv_name+'|'+t_name + '!' + 'Last', cl)
        ddeTopic5.setData(serv_name+'|'+t_name + '!' + 'Time', timeval)
        ddeTopic6.setData(serv_name+'|'+t_name + '!' + 'Volume', vol)

        win32ui.PumpWaitingMessages(0, -1)
        print('running')
        time.sleep(3)


def run():
    stream_name = 'some_name'  # this is not important
    wss = 'wss://stream.binance.com:9443/ws/%s' % stream_name
    wsa = websocket.WebSocketApp(wss, on_message=on_message, on_open=on_open)
    wsa.run_forever()

print("Opening Binance Websockets")


if __name__ == '__main__':
    run()



I am pulling the values from the console. I've even tried concatenating the lines in setData() to include 'binance_' but it really makes no difference.
Screen Shot 2023-11-30 at 10.38.05 pm

As far as I can see, I'm configuring things how it is specified in the documentation.

Did you manage to get the DDE server to work, or did you explore the ODBC/SQL solution kindly suggested by @Cougar?

I'm not very effective at coding and I'm wondering how many (more) hours I should invest in trying to learn how SQL works or whether I'm on a fools errand with this whole project.

What I'm working with for now, which at least works, is doing a REST API call once every 60 minutes. I call all of the symbols in loop (I call about 30 symbols per loop, and run about 25 loops in paralell lanuched as a background process from batch). It takes about 35 seconds to get about 750 CSV files, which I then import via ASCII using the batch importer, which takes another 30 seconds or so.

So one of my loops would be something like this;

import pandas as pd
import KlineCallLoopMaster as looper

dfUSDT=pd.read_csv("C:\Program Files\AmiBroker\Databases\Crypto\WatchLists\BinancePairsUSDT.tls",header=None)
for i in range(224,252):
    sym = (dfUSDT[0][i])
    looper.dataCaller(sym, "5m", 200)
    print("Symbol= " + (str(sym)))

with the KlineCallLoopMaster.py as ;

from binance.spot import Spot
import pandas as pd
import logging
from datetime import datetime

def dataCaller(sym,int,lim):
  cols = ['ticker', 'date', 'time', 'open', 'high', 'low', 'close', 'vol']
  dfCols = pd.DataFrame(columns=cols)
  dfCols.to_csv("C:\Program Files\AmiBroker\Brokers\Binance\Data\symbolcsv\klines\csvFiles\\"+sym+".Binance", index=False,header=True, mode="w")
  Client = Spot(<apiKey>,<apiSecret>)
  klines = Client.klines(symbol=sym, interval=int, limit=lim)
  df = pd.DataFrame(klines)

  date = []
  for i in range(len(df)):
      timestamp = (df[6][i])
      normTime = str(datetime.fromtimestamp((timestamp +1) / 1000))
      indSpace = normTime.find(' ')
      dateVal = (normTime[:indSpace])
      date.append(dateVal)

  time = []
  for i in range(len(df)):
      timestamp = (df[6][i])
      normTime = str(datetime.fromtimestamp((timestamp +1) / 1000))
      indSpace = normTime.find(' ')
      timeVal = (normTime[indSpace + 1:])
      time.append(timeVal)
  dfKlines = pd.DataFrame(df)
  dfKlines['date'] = date
  dfKlines['time'] = time
  titled_columns = {'ticker':sym,
                    'date': dfKlines['date'],
                    'time': dfKlines['time'],
                    'open': df[1],
                    'high': df[2],
                    'low': df[3],
                    'close': df[4],
                    'vol': df[5]
                    }
  dfKlines2 = pd.DataFrame(titled_columns)
  dfKlines2.to_csv("C:\Program Files\AmiBroker\Brokers\Binance\Data\symbolcsv\klines\csvFiles\\"+sym+".Binance", index=False,header=False, mode="a")

(Note I'm using the candle close time, instead of the open, I think I probably need to change that. at timestamp=(df[6][i]) you would change the 6 to a 0, as Binance places the open time in the first key of the returned data.

Yeah, its ugly, but it works. Your signals are going to be a minute late each time, so you might want to think about what time frame you want to trade on.

Frankly speaking I don' t even think if DDE in Python works at all. DDE is old Windows-only tech pre-dating Python and Python is not really into Windows-only old tech.

I would suggest using AmiBroker Development Kit to write a plugin that connects directly to whatever you want to connect to.

@Tomasz I have been pondering over this for a some time, but still dont know how to frame the proper question. I will try based on my limited knowledge.

From the past till now, major development from Brokers I see is that they are offering API and Websockets like never before.

As you say, Plugin can address the issue but you are well aware that Plugin in no joke and its not for the 99%

So far OLE and ole import have worked, but it limits to windows limitation and therefore the first instance. Apart from that, it has worked smoothly and for long time people have used RAM drive for performance.
Till here, its onto the user to get data, parse it, format it etc and then just import to AB.

Apart from this, what if from each running instance of AB, one could from the interface add a "windows named pipe" with "AB format file" mapped to it.
Because it is core windows tech, would all the pros outweigh the cons?
Would it allow multiple AB instances to run in peace?
Would your efforts be minimal and give every ole data importer a benefit versus more custom code in plugin?
I see windows named pipes using memory, so is it reasonably efficient?
Most languages that use OLE to interact should also work with them?
and named pipes would not go obsolete in the future?

This is just a discussion to know your opinion and not a feature request :slight_smile: Thanks

I would certainly be interested in contributing some effort on a collaboration for this. I have a few snippets of code I have put together which I think could be useful, on the binance websockets, but I don't know how to avoid all of the errors in getting that data into the DDE plugin and I'll be honest, I'm not certain if I am setting up the server correctly. I am getting a print output in the console, but I know next to nothing about servers and DDE. I've shelved it for now in favor of REST API calls.

From a cryptocurrency perspective, there's ccxt which @HelixTrader told me about last year. It is a neat library which provides a ton of exchanges endpoints which would I expect, be enormously valuable in such a project.

1 Like

Just saw this comment.
I suspect you are right @Tomasz.

I do see though that Python in Excel is in the beta channel. From there, I think it will only be a hop, skip and a jump.

In any case, for now, I am trying my luck with REST API and doing a course in C++, as I suspect a 2 way .dll plug in with ccxt will be fantastic.