Вопрос:
Мне удалось получить периодические задания, работающие в django-celery, путем подклассификации PeriodicTask. Я попытался создать тестовую задачу и настроить ее на выполнение чего-то бесполезного. Оно работает.
Теперь я не могу это остановить. Я прочитал документацию, и я не могу узнать, как удалить задачу из очереди выполнения. Я попытался использовать celeryctl и использовать оболочку, но registry.tasks() пуст, поэтому я не вижу, как его удалить.
Я видел предложения о том, что я должен “отменить” его, но для этого мне, похоже, нужен идентификатор задачи, и я не вижу, как найти идентификатор задачи.
Спасибо.
Лучший ответ:
Задача – это сообщение, а “периодическая задача” периодически отправляет сообщения задачи. Каждая из присланных задач будет иметь уникальный идентификатор, назначенный ему.
revoke отменяет только одно сообщение задачи. Чтобы получить идентификатор для задачи, которую вы должны сохранить
отслеживать отправленный идентификатор, но вы также можете указать пользовательский идентификатор при отправке задачи.
Я не уверен, что вы хотите отменить одно сообщение задачи или хотите остановить периодическую задачу от отправки большего количества сообщений, поэтому я перечислил ответы для обоих.
Нет встроенного способа сохранить идентификатор задачи, отправленной с периодическими задачами,
но вы можете установить идентификатор для каждой задачи на имя периодической задачи, таким образом
идентификатор будет ссылаться на любую задачу, отправленную с периодической задачей (обычно последней).
Вы можете указать собственный идентификатор таким образом,
либо с декоратором @periodic_task:
@periodic_task(options={«task_id»: «my_periodic_task»}) def my_periodic_task(): pass
или с настройкой CELERYBEAT_SCHEDULE:
CELERYBEAT_SCHEDULE = {name: {«task»: task_name, «options»: {«task_id»: name}}}
Если вы хотите удалить периодическую задачу, вы просто удалите @periodic_task из кодовой базы или удалите запись из CELERYBEAT_SCHEDULE.
Если вы используете планировщик базы данных Django, вам нужно удалить периодическую задачу
из интерфейса администратора Django.
PS1: revoke не останавливает задание, которое уже запущено. Это только отменяет
задачи, которые еще не начаты. Вы можете завершить выполняющуюся задачу, используя
revoke(task_id, terminate=True). По умолчанию это будет посылать сигнал TERM на
процесс, если вы хотите отправить другой сигнал (например, KILL), используйте
revoke(task_id, terminate=True, signal=»KILL»).
PS2: отмена – это команда удаленного управления, поэтому она поддерживается только RabbitMQ
и Redis broker.
Если вы хотите, чтобы ваша задача поддерживала отмену, вы должны сделать это, сохранив cancelled
флаг в базе данных и проверьте, что флаг при запуске:
from celery.task import Task class RevokeableTask(Task): «»»Task that can be revoked. Example usage: @task(base=RevokeableTask) def mytask(): pass «»» def __call__(self, *args, **kwargs): if revoke_flag_set_in_db_for(self.request.id): return super(RevokeableTask, self).__call__(*args, **kwargs) Ответ №1
На всякий случай это может помочь кому-то… У нас была такая же проблема на работе, и мы пытаемся найти какую-то команду управления для удаления периодической задачи, мы не смогли. Итак, вот несколько указателей.
Вероятно, вы должны сначала проверить, какой класс планировщика вы используете.
Планировщик по умолчанию – celery.beat.PersistentScheduler, который просто отслеживает последнее время выполнения в локальном файле базы данных (полка).
В нашем случае мы использовали класс djcelery.schedulers.DatabaseScheduler.
django-celery также поставляется с планировщиком, который хранит расписание в базе данных Django
Хотя в документации упоминается способ удаления периодических задач:
Используя планировщик django-сельдерея, вы можете добавлять, изменять и удалять периодические задания из Django Admin.
Мы хотели выполнить удаление программно или через команду (сельдерей/управление) в оболочке.
Поскольку мы не смогли найти командную строку, мы использовали оболочку django/python:
$ python manage.py shell >>> from djcelery.models import PeriodicTask >>> pt = PeriodicTask.objects.get(name=’the_task_name’) >>> pt.delete()
Надеюсь, это поможет!