Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize the communication loop #44

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

Conversation

basnijholt
Copy link
Owner

@basnijholt basnijholt commented May 26, 2020

In the server, sending an object takes too long. When many (1000) clients connect, I don't want the program to slow down.

Blocked by cloudpipe/cloudpickle#374.

The current implementation is:
client.py

import time

import zmq

ctx = zmq.Context()


def send_something(url: str, t_start) -> None:
    with ctx.socket(zmq.REQ) as socket:
        socket.connect(url)
        socket.send_pyobj(t_start)
        reply = socket.recv_pyobj()
        print(reply - t_start, time.time() - t_start)

url = "tcp://192.168.1.13:5555"
send_something(url, time.time())

server.py

import asyncio
import time

import zmq
import zmq.asyncio

ctx = zmq.asyncio.Context()

url = "tcp://*:5555"


def _dispatch(request):
    time.sleep(5)
    reply = time.time()
    return reply


async def server(url) -> None:
    sock = ctx.socket(zmq.REP)
    sock.bind(url)
    try:
        while True:
            request = await sock.recv_pyobj()
            reply = _dispatch(request)
            await sock.send_pyobj(reply)
            print(f"time {reply - request}")
    finally:
        sock.close()


ioloop = asyncio.get_event_loop()
proc = ioloop.run_until_complete(server(url))

@basnijholt
Copy link
Owner Author

It appears like the REQ REP protocol isn't suitable for what I want.

Instead I could replace server.py by (from here)

import time
import threading
import zmq


def _dispatch(request):
    time.sleep(5)
    reply = time.time()
    return reply


def worker_routine(worker_url, context=None):
    """Worker routine"""
    context = context or zmq.Context.instance()
    socket = context.socket(zmq.REP)
    socket.connect(worker_url)
    try:
        while True:
            request = socket.recv_pyobj()
            reply = _dispatch(request)
            socket.send_pyobj(reply)
            print(f"time {reply - request}")
    finally:
        socket.close()


def main():
    """Server routine"""

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets
    context = zmq.Context.instance()

    # Socket to talk to clients
    clients = context.socket(zmq.ROUTER)
    clients.bind(url_client)

    # Socket to talk to workers
    workers = context.socket(zmq.DEALER)
    workers.bind(url_worker)

    # Launch pool of worker threads
    for i in range(5):
        thread = threading.Thread(target=worker_routine, args=(url_worker,))
        thread.start()

    zmq.proxy(clients, workers)

    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    context.term()


if __name__ == "__main__":
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant