Skip to content
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions examples/actors/mutate_remote_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from itertools import cycle
from pprint import pformat
from dataclasses import dataclass, field

import trio
import tractor


@dataclass
class MyProcessStateThing:
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this what are you after?

A function that creates some object and then makes that object mutateable from another inbound message?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you want something like rays "actors" (which i would argue aren't really "actor model" actors):
https://docs.ray.io/en/latest/actors.html

We can also accomplish this but it will require a slight bit more machinery.

state: dict = field(default_factory=dict)

def update(self, msg: dict):
self.state.update(msg)


_actor_state = MyProcessStateThing()


async def update_local_state(msg: dict):
"""Update process-local state from sent message and exit.

"""
actor = tractor.current_actor()

global _actor_state


print(f'Yo we got a message {msg}')

# update the "actor state"
_actor_state.update(msg)

print(f'New local "state" for {actor.uid} is {pformat(_actor_state.state)}')

# we're done so exit this task running in the subactor


async def main():
# Main process/thread that spawns one sub-actor and sends messages
# to it to update it's state.

actor_portals = []

# XXX: that subactor can **not** outlive it's parent, this is SC.
async with tractor.open_nursery() as tn:

portal = await tn.start_actor('even_boy', enable_modules=[__name__])
actor_portals.append(portal)

portal = await tn.start_actor('odd_boy', enable_modules=[__name__])
actor_portals.append(portal)

for i, (count, portal) in enumerate(
zip(range(100), cycle(actor_portals))
):
await portal.run(update_local_state, msg={f'msg_{i}': count})

# blocks here indefinitely synce we spawned "daemon actors" using
# .start_actor()`, you'll need to control-c to cancel.


if __name__ == '__main__':
trio.run(main)
153 changes: 153 additions & 0 deletions examples/actors/ray_style_classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import inspect
from typing import Any
from functools import partial
from contextlib import asynccontextmanager, AsyncExitStack
from itertools import cycle
from pprint import pformat

import trio
import tractor


log = tractor.log.get_logger(__name__)


class ActorState:
"""Singlteton actor per process.

"""
# this is a class defined variable and is thus both
# singleton across object instances and task safe.
state: dict = {}

def update(self, msg: dict) -> None:
_actor = tractor.current_actor()

print(f'Yo we got a message {msg}')
self.state.update(msg)

print(f'New local "state" for {_actor.uid} is {pformat(self.state)}')

def close(self):
# gives headers showing which process and task is active
log.info('Actor state is closing')

# if we wanted to support spawning or talking to other
# actors we can do that using a portal map collection?
# _portals: dict = {}


async def _run_proxy_method(
meth: str,
msg: dict,
) -> Any:
"""Update process-local state from sent message and exit.

"""
# Create a new actor instance per call.
# We can make this persistent by storing it either
# in a global var or are another clas scoped variable?
# If you want it somehow persisted in another namespace
# I'd be interested to know "where".
actor = ActorState()
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this isn't ideal (though it really is no performance hit) in an idiomatic python sense, but the alternative is some other way to store this instance across function-task calls.

The normal way would be a module level variable (since they're "globally scoped") but I guess in theory you could have a function that stays alive and constantly passes the instance to other tasks over a memory channel - still in that case how does the new task get access to the channel handle?). The alternative is a module level class which has a class level variable which is again globally scoped on the class.

if meth != 'close':
return getattr(actor, meth)(msg)
else:
actor.close()

# we're done so exit this task running in the subactor


class MethodProxy:
def __init__(
self,
portal: tractor._portal.Portal
) -> None:
self._portal = portal

async def _run_method(
self,
*,
meth: str,
msg: dict,
) -> Any:
return await self._portal.run(
_run_proxy_method,
meth=meth,
msg=msg
)


def get_method_proxy(portal, target=ActorState) -> MethodProxy:
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cases anyone gets cranky about this, from pykka docs:

The proxy object will use introspection to figure out what public attributes and methods the actor has, and then mirror the full API of the actor. Any attribute or method prefixed with underscore will be ignored, which is the convention for keeping stuff private in Python.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


proxy = MethodProxy(portal)

# mock all remote methods
for name, method in inspect.getmembers(
target, predicate=inspect.isfunction
):
if '_' == name[0]:
# skip private methods
continue

else:
setattr(proxy, name, partial(proxy._run_method, meth=name))

return proxy


@asynccontextmanager
async def spawn_proxy_actor(name):

# XXX: that subactor can **not** outlive it's parent, this is SC.
async with tractor.open_nursery(
debug_mode=True,
# loglevel='info',
) as tn:

portal = await tn.start_actor(name, enable_modules=[__name__])

proxy = get_method_proxy(portal)

yield proxy

await proxy.close(msg=None)


async def main():
# Main process/thread that spawns one sub-actor and sends messages
# to it to update it's state.

try:
stack = AsyncExitStack()

actors = []
for name in ['even', 'odd']:

actor_proxy = await stack.enter_async_context(
spawn_proxy_actor(name + '_boy')
)
actors.append(actor_proxy)

# spin through the actors and update their states
for i, (count, actor) in enumerate(
zip(range(100), cycle(actors))
):
# Here we call the locally patched `.update()` method of the
# remote instance

# NOTE: the instance created each call here is currently
# a new object - to persist it across `portal.run()` calls
# we need to store it somewhere in memory for access by
# a new task spawned in the remote actor process.
await actor.update(msg={f'msg_{i}': count})

# blocks here indefinitely synce we spawned "daemon actors" using
# .start_actor()`, you'll need to control-c to cancel.

finally:
await stack.aclose()


if __name__ == '__main__':
trio.run(main)
119 changes: 119 additions & 0 deletions examples/parallelism/concurrent_actors_primes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Demonstration of the prime number detector example from the
``concurrent.futures`` docs:

https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example

This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s channels.

"""
from contextlib import asynccontextmanager
from typing import List, Callable
import itertools
import math
import time

import tractor
import trio
from async_generator import aclosing


PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
]


def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


@asynccontextmanager
async def worker_pool(workers=4):
"""Though it's a trivial special case for ``tractor``, the well
known "worker pool" seems to be the defacto "but, I want this
process pattern!" for most parallelism pilgrims.

Yes, the workers stay alive (and ready for work) until you close
the context.
"""
async with tractor.open_nursery() as tn:

portals = []
snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES))

for i in range(workers):

# this starts a new sub-actor (process + trio runtime) and
# stores it's "portal" for later use to "submit jobs" (ugh).
portals.append(
await tn.start_actor(
f'worker_{i}',
enable_modules=[__name__],
)
)

async def _map(
worker_func: Callable[[int], bool],
sequence: List[int]
) -> List[bool]:

# define an async (local) task to collect results from workers
async def send_result(func, value, portal):
await snd_chan.send((value, await portal.run(func, n=value)))

async with trio.open_nursery() as n:

for value, portal in zip(sequence, itertools.cycle(portals)):
n.start_soon(
send_result,
worker_func,
value,
portal
)

# deliver results as they arrive
for _ in range(len(sequence)):
yield await recv_chan.receive()

# deliver the parallel "worker mapper" to user code
yield _map

# tear down all "workers" on pool close
await tn.cancel()


async def main():

async with worker_pool() as actor_map:

start = time.time()

async with aclosing(actor_map(is_prime, PRIMES)) as results:
async for number, prime in results:

print(f'{number} is prime: {prime}')

print(f'processing took {time.time() - start} seconds')


if __name__ == '__main__':
start = time.time()
trio.run(main)
print(f'script took {time.time() - start} seconds')
40 changes: 40 additions & 0 deletions examples/parallelism/concurrent_futures_primes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import time
import concurrent.futures
import math

PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]

def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
start = time.time()

for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))

print(f'processing took {time.time() - start} seconds')

if __name__ == '__main__':

start = time.time()
main()
print(f'script took {time.time() - start} seconds')
Loading