bullmq

Solutions on MaxInterview for bullmq by the best coders in the world

showing results for - "bullmq"
Grace
08 Nov 2019
1const { Worker } = require('bullmq')
2const userSchema = require('./model.js')
3const { resultsPublisher } = require('./publisher')
4
5/**
6 * @description queueCreatePublisher
7 */
8
9const queueCreatePublisher = new Worker('create service', async (job) => {
10	if (job.name == 'create:service') {
11		queueCreatePublisher.emit('create:service', JSON.stringify({ data: job.data }))
12	}
13})
14
15queueCreatePublisher.on('completed', (job) => console.log(`job create completed ${job.id}`))
16queueCreatePublisher.on('waiting', (job) => console.log(`job create waiting ${job.id}`))
17queueCreatePublisher.on('active', (job) => console.log(`job create active ${job.id}`))
18queueCreatePublisher.on('failed', (job) => console.log(`job create failed ${job.id}`))
19
20exports.createSubscriber = () => {
21	return new Promise((resolve, reject) => {
22		queueCreatePublisher.once('create:service', async (data) => {
23			const response = await insertOne(JSON.parse(data).data)
24			resolve(response)
25		})
26	})
27}
28
29function insertOne(res) {
30	return new Promise(async (resolve, reject) => {
31		try {
32			const checkEmail = await userSchema.findOne({ email: res.email }).lean()
33			if (checkEmail) {
34				resolve({ statusCode: 409, message: 'email already exist' })
35			}
36			const saveEmail = await userSchema.create({ email: res.email })
37			if (saveEmail) {
38				resolve({ statusCode: 201, message: 'add new email successfully' })
39			} else {
40				resolve({ statusCode: 400, message: 'add new email failed' })
41			}
42		} catch (err) {
43			reject({ statusCode: 500, message: 'internal server error' })
44		}
45	})
46}
47
48/**
49 * @description queueResultsPublisher
50 */
51
52const queueResultsPublisher = new Worker('results service', async (job) => {
53	if (job.name == 'results:service') {
54		queueResultsPublisher.emit('results:service', JSON.stringify({ data: job.data }))
55	}
56})
57
58queueResultsPublisher.on('completed', (job) => console.log(`job results completed ${job.id}`))
59queueResultsPublisher.on('waiting', (job) => console.log(`job results waiting ${job.id}`))
60queueResultsPublisher.on('active', (job) => console.log(`job results active ${job.id}`))
61queueResultsPublisher.on('failed', (job) => console.log(`job results failed ${job.id}`))
62
63exports.findAllSubscriber = async () => {
64	await findAll()
65	return new Promise((resolve, reject) => {
66		queueResultsPublisher.once('results:service', (data) => {
67			const response = JSON.parse(data).data
68			resolve(response)
69		})
70	})
71}
72
73async function findAll() {
74	try {
75		const findAllEmail = await userSchema.find({}).lean()
76
77		if (findAllEmail.length < 1) {
78			await resultsPublisher({ statusCode: 404, message: 'email is not exist', data: findAllEmail })
79		} else {
80			await resultsPublisher({ statusCode: 200, message: 'email already to use', data: findAllEmail })
81		}
82	} catch (err) {
83		await resultsPublisher({ statusCode: 500, message: 'internal server error' })
84	}
85}
86
Luigi
18 Oct 2019
1example bullmq todoapp + custom bullmq pub/sub
2
3https://github.com/restuwahyu13/express-todo-bullmq
similar questions
bullmq redis cluster
queries leading to this page
bullmqbullmq