1 1 if __name__ == "__main__":
2 2 format = "%(asctime)s: %(message)s"
3 3 logging.basicConfig(format=format, level=logging.INFO,
4 4 datefmt="%H:%M:%S")
5 5 # logging.getLogger().setLevel(logging.DEBUG)
6 6
7 7 pipeline = Pipeline()
8 8 event = threading.Event()
9 9 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
1010 executor.submit(producer, pipeline, event)
1111 executor.submit(consumer, pipeline, event)
1212
1313 time.sleep(0.1)
1414 logging.info("Main: about to set event")
1515 event.set()
16
1 1 def producer(pipeline, event):
2 2 """Pretend we're getting a number from the network."""
3 3 while not event.is_set():
4 4 message = random.randint(1, 101)
5 5 logging.info("Producer got message: %s", message)
6 6 pipeline.set_message(message, "Producer")
7 7
8 8 logging.info("Producer received EXIT event. Exiting")
9
1 1 def consumer(pipeline, event):
2 2 """Pretend we're saving a number in the database."""
3 3 while not event.is_set() or not pipeline.empty():
4 4 message = pipeline.get_message("Consumer")
5 5 logging.info(
6 6 "Consumer storing message: %s (queue size=%s)",
7 7 message,
8 8 pipeline.qsize(),
9 9 )
1010
1111 logging.info("Consumer received EXIT event. Exiting")
12