Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 130 additions & 73 deletions piker/brokers/ib/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch

conf = get_config()
entries = trades_to_ledger_entries(
entries = api_trades_to_ledger_entries(
conf['accounts'].inverse,
trade_entries,
)
Expand Down Expand Up @@ -371,8 +371,8 @@ async def update_and_audit_msgs(
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'

# raise ValueError(
log.error(
raise ValueError(
# log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
Expand Down Expand Up @@ -1123,18 +1123,16 @@ def norm_trade_records(
continue

# timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date')
if not date:
# probably a flex record with a wonky non-std timestamp..
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
flex_dtstr = record.get('dateTime')

else:
# epoch_dt = pendulum.from_timestamp(record.get('time'))
dt = pendulum.parse(date)
if dtstr or date:
dt = pendulum.parse(dtstr or date)

elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])

# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
Expand Down Expand Up @@ -1183,69 +1181,119 @@ def norm_trade_records(
return {r.tid: r for r in records}


def trades_to_ledger_entries(
def parse_flex_dt(
record: str,
) -> pendulum.datetime:
date, ts = record.split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)


def api_trades_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
source_type: str = 'api',

) -> dict:
'''
Convert either of API execution objects or flex report
entry objects into ``dict`` form, pretty much straight up
without modification.
Convert API execution objects entry objects into ``dict`` form,
pretty much straight up without modification except add
a `pydatetime` field from the parsed timestamp.

'''
trades_by_account = {}
for t in trade_entries:
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
# 'contract': {...
# 'execution': Execution(...
# 'time': 1654801166.0
# }

# flatten all sub-dicts and values into one top level entry.
entry = {}
for section, val in t.items():
match section:
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)

case 'time':
# ib has wack ns timestamps, or is that us?
continue

case _:
entry[section] = val

tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['pydatetime'] = dt
entry['datetime'] = str(dt)
acctid = accounts[entry['acctNumber']]

if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue

trades_by_account.setdefault(
acctid, {}
)[tid] = entry

# sort entries in output by python based datetime
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1].pop('pydatetime'),
))

return trades_by_account


def flex_records_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],

) -> dict:
'''
Convert flex report entry objects into ``dict`` form, pretty much
straight up without modification except add a `pydatetime` field
from the parsed timestamp.

'''
trades_by_account = {}
for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__

# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..

# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])

# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]

elif source_type == 'api':
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
# 'contract': {...
# 'execution': Execution(...
# 'time': 1654801166.0
# }

# flatten all sub-dicts and values into one top level entry.
entry = {}
for section, val in t.items():
match section:
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)

case 'time':
# ib has wack ns timestamps, or is that us?
continue

case _:
entry[section] = val

tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['date'] = str(dt)
acctid = accounts[entry['acctNumber']]
entry = t.__dict__

# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..

# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])

# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]

# probably a flex record with a wonky non-std timestamp..
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
entry['datetime'] = str(dt)

if not tid:
# this is likely some kind of internal adjustment
Expand All @@ -1263,6 +1311,12 @@ def trades_to_ledger_entries(
acctid, {}
)[tid] = entry

for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1]['pydatetime'],
))

return trades_by_account


Expand Down Expand Up @@ -1309,25 +1363,28 @@ def load_flex_trades(
ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query')

trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names
conf['accounts'].inverse,
trades_by_account = flex_records_to_ledger_entries(
conf['accounts'].inverse, # reverse map to user account names
trade_entries,
source_type='flex',
)

ledger_dict: Optional[dict] = None

for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid]

with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict)
log.info(
'New trades detected\n'
f'{pformat(tid_delta)}'
)
if tid_delta:
ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}
)
sorted_delta = dict(sorted(
{tid: trades_by_id[tid] for tid in tid_delta}.items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
ledger_dict.update(sorted_delta)

return ledger_dict

Expand Down