on
celery 분리 구성 [2]
celery 분리 구성 [2]
celery server, celery worker 분리 이슈
django에서 celery를 설정하고, 실행하면 task가 어떻게 등록되는지 확인해보자.
- csapi/tasks.py
from celery import shared_task @shared_task def test_task(): print('connect test task') return 'test'
현재 csapi 앱의 tasks.py에 test_task가 shared_task로 등록되어 있다.
- csapi/views.py
from django.shortcuts import render, redirect from .forms import UploadForm import uuid import boto3 import botocore from .tasks import test_task def upload(request): if request.method == 'POST': file = request.FILES['origin_image'] file_name = str(uuid.uuid1()) bucket = 'csapi-bucket' content_type = file.content_type.split('/')[1] key = 'images/origin-images/' + file_name + '.' + content_type s3 = boto3.client('s3') response = s3.upload_fileobj(file, bucket, key, ExtraArgs = {'ContentType': content_type}) test_task.delay() return redirect('csapi:image_show', file_name + '.' + content_type) else: form = UploadForm() return render(request, 'csapi/upload.html', { 'form' : form })
현재 aws의 s3로 file을 업로드하는 함수에 test_task.delay()를 호출하여 실행하도록 해놓았다.
- celery_app.py
ls Dockerfile __pycache__ celery_app.py csapi-boto.py requirements.txt tasks.py from celery import Celery app = Celery('celery', backend='rpc://', broker='amqp://guest:[email protected]:5672//')
간단하게 celery 스크립트를 설정하고 celery를 실행해본다.
- celery 실행
celery -A celery_app worker --loglevel=DEBUG (생략) [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap
실행하면 등록된 task 목록을 확인할 수 있다. 여기서 tasks.py에 등록된 task가 등록이 되지 않았다. 이와 관련해서는 아래에서 다룬다.
- django에서 task 실행
실행을 하면, unregistered task라는 메시지가 출력된다. 이는 broker를 통해 전달된 task가 celery에 등록되어 있지 않기 때문이다.
celery app에 task 등록
- tasks 확인
celery -A celery_app worker --loglevel=DEBUG (생략) [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap
다시 tasks를 살펴보면, tasks.py에 등록된 task가 등록되지 않았다. 이를 등록하기 위해 수정한다.
- task include
from celery import Celery app = Celery('celery', backend='rpc://', broker='amqp://guest:[email protected]:5672//', include=['tasks'])
celery 인스턴스를 등록할 때, include를 파라미터로 추가한다.
- task 목록
celery -A celery_app worker --loglevel=DEBUG (생략) [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap . tasks.startAPI . tasks.test_task
tasks.py에 등록한 shared_task가 등록된 걸 확인할 수 있다.
task 등록 key
분명 동일한 이름의 task가 등록되어 있어도, unregistred task 에러가 발생한다.
- task Key 비교
# django의 task key csapi.tasks.test_task # 현재 celery의 task key tasks.test_task
위처럼 등록된 task의 key가 다르다. 이를 맞춰줘야 한다.
- 디렉토리 구조 변경
root@ip-192-168-3-53:/celery# mkdir csapi root@ip-192-168-3-53:/celery# mv celery_app.py tasks.py csapi/ root@ip-192-168-3-53:/celery# ls csapi celery_app.py tasks.py root@ip-192-168-3-53:/celery# celery -A csapi.celery_app worker --loglevel=debug
위처럼 csapi 디렉토리를 생성하고, 위처럼 실행한다.
- celery_app.py
from celery import Celery app = Celery('celery', backend='rpc://', broker='amqp://guest:[email protected]:5672//', include=['csapi.tasks'])
celery worker를 실행하는 위치가 바뀌었으므로 include도 변경해준다. (실행위치 기준)
- celery worker 실행
celery -A csapi.celery_app worker --loglevel=debug (생략) [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap . csapi.tasks.startAPI . csapi.tasks.test_task
등록된 task key가 변경된 것을 확인할 수 있다.
- django에서 celery task 수행
django에서 전달한 task가 celery에서 정상적으로 수행되는것을 확인할 수 있다. celery의 최종 디렉토리 구조는 아래와 같다.
root@ip-192-168-3-53:/celery# ls csapi/ celery_app.py tasks.py
결과
디렉토리 구조와 worker 시작 커맨드가 수행되는 위치만 잘 감안하면 얼마든지 celery worker를 분리할 수 있다.
from http://not-to-be-reset.tistory.com/549 by ccl(A) rewrite - 2021-12-15 13:01:21