pip
安装:brew install redis
,然后使用redis-server
命令手动启动服务。rq worker
来启动一个worker进程了:microblog-tasks
的队列上查看可能分配给它的任何作业。 如果你想启动多个worker来扩展吞吐量,你只需要运行rq worker
来生成更多连接到同一个队列的进程。 然后,当作业出现在队列中时,任何可用的worker进程都可以获取它。 在生产环境中,你可能希望至少运行可用CPU数量的worker。example()
任务:Queue
类表示从应用程序端看到的任务队列。 它采用的参数是队列名称和一个Redis
连接对象,本处使用默认URL进行初始化。 如果你的Redis服务器运行在不同的主机或端口号上,则需要使用其他URL。enqueue()
方法用于将作业添加到队列中。 第一个参数是要执行的任务的名称,可直接传入函数对象或导入字符串。 我发现传入字符串更加方便,因为不需要在应用程序的一端导入函数。 对enqueue()
传入的任何剩余参数将被传递给worker中运行的函数。enqueue()
调用,运行着RQ worker的终端窗口上就会出现一些活动。 你会看到example()
函数正在运行,并且每秒打印一次计数器。 同时,你的其他终端不会被阻塞,你可以继续在shell中执行表达式。在上面的例子中,我调用job.get_id()
方法来获取分配给任务的唯一标识符。 你可以尝试使用另一个有趣表达式来检查worker上的函数是否已完成:23
,那么函数将运行约23秒。 在那之后,job.is_finished
表达式将变为True
。 就是这么简单,炫酷否?enqueue()
调用。 队列中存储的有关任务的数据将保留一段时间(默认为500秒),但最终会被删除。 这很重要,任务队列不保留已执行作业的历史记录。meta
属性来支持这一点。 让我重写example()
任务来编写进度报告:example()
使用RQ的get_current_job()
函数来获取一个作业实例,该实例与提交任务时返回给应用程序的实例类似。 作业对象的meta
属性是一个字典,任务可以编写任何想要与应用程序通信的自定义数据。 在这个例子中,我写入了progress
,表示完成任务的百分比。 每次进程更新时,我都调用job.save_meta()
指示RQ将数据写入Redis,应用程序可以在其中找到它。meta
属性可以被读取。 需要调用refresh()
方法来从Redis更新内容。Task
模型实现:id
主键字段是字符串类型,而不是整数类型。 这是因为对于这个模型,我不会依赖数据库自己的主键生成,而是使用由RQ生成的作业标识符。complete
字段的目的是将正在运行的任务与已完成的任务分开,因为运行中的任务需要特殊处理才能显示最新进度。get_rq_job()
辅助方法可以用给定的任务ID加载RQJob
实例。 这是通过Job.fetch()
完成的,它会从Redis中存在的数据中加载Job
实例。 get_progress()
方法建立在get_rq_job()
的基础之上,并返回任务的进度百分比。 该方法做一些有趣的假设,如果模型中的作业ID不存在于RQ队列中,则表示作业已完成并且数据已过期并已从队列中删除,因此在这种情况下返回的百分比为100。 另一方面,如果job存在,但'meta'属性中找不到进度相关的信息,那么可以安全地假定该job计划运行,但还没有启动,所以在这种情况下进度是0。app.task_queue
将成为提交任务的队列。 将队列附加到应用上会提供很大的便利,因为我可以在应用的任何地方使用current_app.task_queue
来访问它。 为了方便应用的任何部分提交或检查任务,我可以在User
模型中创建一些辅助方法:launch_task()
方法负责将任务提交到RQ队列,并将其添加到数据库中。 name
参数是函数名称,如app/tasks.py中所定义的那样。 提交给RQ时,该函数会将app.tasks.
预先添加到该名称中以构建符合规范的函数名称。description
参数是对呈现给用户的任务的友好描述。 对于导出用户动态的函数,我将名称设置为export_posts
,将描述设置为Exporting posts...
。 其余参数将传递给任务函数。 launch_task()
函数首先调用队列的enqueue()
方法来提交作业。 返回的作业对象包含由RQ分配的任务ID,因此我可以使用它在我的数据库中创建相应的Task
对象。launch_task()
将新的任务对象添加到会话中,但不会发出提交。 一般来说,最好在更高层次函数中的数据库会话上进行操作,因为它允许你在单个事务中组合由较低级别函数所做的多个更新。 这不是一个严格的规则,并且,在本章后面的子函数中也会存在一个例外的提交。get_tasks_in_progress()
方法返回该用户未完成任务的列表。 稍后你会看到,我使用此方法在将有关正在运行的任务的信息渲染到用户的页面中。get_task_in_progress()
是上一个方法的简化版本并返回指定的任务。 我阻止用户同时启动两个或多个相同类型的任务,因此在启动任务之前,可以使用此方法来确定前一个任务是否还在运行。send_email()
函数总是使用后台线程异步发送电子邮件。 当我要从后台任务发送一封电子邮件时(已经是异步的了),基于线程的二级后台任务没有什么意义,所以我需要同时支持同步和异步电子邮件的发送。send_email()
函数的默认关键字参数,然后在Message
对象中配置它们。 选择在前台发送电子邮件时,我只需要添加一个sync=True
的关键字参数即可:attach()
方法接受三个定义附件的参数:文件名,媒体类型和实际文件数据。 文件名就是收件人看到的与附件关联的名称。 媒体类型定义了这种附件的类型,这有助于电子邮件读者适当地渲染它。 例如,如果你发送image/png
作为媒体类型,则电子邮件阅读器会知道该附件是一个图像,在这种情况下,它可以显示它。 对于用户动态数据文件,我将使用JSON格式,该格式使用application/json
媒体类型。 最后一个参数包含附件内容的字符串或字节序列。send_email()
的attachments
参数将成为一个元组列表,每个元组将有三个元素对应于attach()
的三个参数。 因此,我需要将此列表中的每个元素作为参数发送给attach()
。 在Python中,如果你想将列表或元组中的每个元素作为参数传递给函数,你可以使用func(*args)
将这个列表或元祖解包成函数中的多个参数,而不必枯燥地一个个地传递,如func(args[0], args[1], args[2])
。 例如,如果你有一个列表args = [1, 'foo']
,func(*args)
将会传递两个参数,就和你调用func(1, 'foo')
一样。 如果没有*
,调用将会传入一个参数,即args
列表。sync
是True
的时候恢复成调用mail.send(msg)
。example()
任务是一个简单的独立函数,但导出用户动态的函数却需要应用中具有的一些功能,例如访问数据库和发送电子邮件。 因为这将在单独的进程中运行,所以我需要初始化Flask-SQLAlchemy和Flask-Mail,而Flask-Mail又需要Flask应用实例以从中获取它们的配置。 因此,我将在app/tasks.py模块的顶部添加Flask应用实例和应用上下文:flask
命令时,根目录中的microblog.py模块创建应用实例,但RQ worker对此却一无所知,所以当任务函数需要它时,它需要创建自己的应用实例。 你已经在好几个地方看到了app.app_context()
方法,推送一个上下文使应用成为“当前”的应用实例,这样一来Flask-SQLAlchemy等插件才可以使用current_app.config
获取它们的配置。 没有上下文,current_app
表达式会返回一个错误。job.meta
字典传递进度信息之外,我还想将通知推送给客户端,以便自动动态更新完成百分比。为此,我将使用我在第二十一章中构建的通知机制。更新将以与未读消息徽章非常类似的方式工作。当服务器渲染模板时,它将包含从job.meta
获得的“静态”进度信息,但是一旦页面位于客户端的浏览器中,通知将使用通知来动态更新百分比。由于通知的原因,更新正在运行的任务的进度将比上一个示例中的操作稍微多一些,所以我将创建一个专用于更新进度的包装函数:_set_task_progress()
来记录进度百分比。 该函数首先将百分比写入job.meta
字典并将其保存到Redis,然后从数据库加载相应的任务对象,并使用task.user
已有的add_notification()
方法将通知推送给请求该任务的用户。 通知将被命名为task_progress
,并且与其关联的数据将成为具有两个条目的字典:任务标识符和进度数值。 稍后我将添加JavaScript代码来处理这种新的通知类型。complete
属性。 数据库提交调用确保通过add_notification()
添加的任务和通知对象都立即保存到数据库。 我需要非常精确地设计父任务,确保不执行任何数据库更改,因为执行本调用会将父任务的更改也写入数据库。sys.exc_info()
来获得)。 使用Flask应用日志记录器来记录错误的好处在于,你可以观察到你为Flask应用实现的任何日志记录机制。 例如,在第七章中,我配置了要发送到管理员电子邮件地址的错误。 只要使用app.logger
,我也可以得到这些错误信息。datetime
对象不存储时区,因此在以ISO格式导出时间后,我添加了'Z',它表示UTC。i
,并且在进入循环之前还需要发出一个额外的数据库查询,查询total_posts
以获得用户动态的总数。 使用了i
和total_posts
,在每个循环迭代我都可以使用从0到100的数字来更新任务进度。time.sleep(5)
调用。主要原因是我想要延长导出所需的时间,以便在用户动态不多的情况下也可以方便地查看到导出进度的增长。data
附件发送邮件给用户:send_email()
函数的调用。 附件被定义为一个元组,其中有三个元素被传递给Flask-Mail的Message
对象的attach()
方法。 元组中的第三个元素是附件内容,它是用Python的json.dumps()
函数生成的。export_posts
视图函数:get_task_in_progress()
方法来检查这种情况。launch_task()
来启动它。 第一个参数是将传递给RQ worker的函数的名称,前缀为app.tasks.
。 第二个参数只是一个友好的文本描述,将会显示给用户。 这两个值都会被写入数据库中的Task对象。 该函数以重定向到用户个人主页结束。rq worker microblog-tasks
flask run
(记得先设置 FLASK_APP
变量)命令启动Flask应用get_tasks_in_progress()
方法来获取当前正在进行的任务列表。 在当前版本的应用中,我最多只能得到一个结果,因为我不允许多个导出任务同时执行,但将来我可能要支持可以共存的其他类型的任务,所以以通用的方式渲染Alert可以节省我以后的时间。alert-success
,而在闪现消息是alert-info
。 Bootstrap文档包含有关Alert的HTML结构的详细信息。 Alert文本包括存储在Task
模型中的description
字段,后面跟着完成百分比。id
属性的<span>
元素中。 原因是我要在收到通知时用JavaScript刷新百分比。 我给任务ID末尾附加-progress
来构造id
属性。 当有通知到达时,通过其中的任务ID,我可以很容易地使用#<task.id>-progress
选择器找到正确的<span>
元素来更新。span>
元素的百分比的动态更新做准备,我将在JavaScript端编写一个辅助函数:id
和一个进度值,并使用jQuery为这个任务定位<span>
元素,并将新进度作为其内容写入。 实际上不需要验证页面上是否存在该元素,因为如果没有找到该元素,jQuery将不会执行任何操作。_set_task_progress()
函数每次更新进度时调用add_notification()
,就会产生新的通知。 而我在第二十一章明智地以完全通用的方式实现了通知功能。 所以当浏览器定期向服务器发送通知更新请求时,浏览器会获得通过add_notification()
方法添加的任何通知。unread_message_count
名称的那些通知,并忽略其余部分。 我现在需要做的是扩展该函数,通过调用我上面定义的set_task_progress()
函数来处理task_progress
通知。 以下是处理通知更新版本JavaScript代码:switch
语句替换检查unread_message_count
通知名称的if
语句,该语句包含我现在需要支持的每个通知。 如果你对“C”系列语言不熟悉,就可能从未见过switch语句,它提供了一种方便的语法,可以替代一长串的if/elseif
语句。这是一个很棒的特性,因为当我需要支持更多通知时,只需简单地添加case
块即可。task_progress
通知的数据是一个包含两个元素task_id
和progress
的字典,这两个元素是我用来调用set_task_progress()
的两个参数。sudo apt-get install redis-server
来安装Redis服务器。rq worker microblog-tasks
。 如果你想要运行多个worker(假设是生产环境),则可以使用Supervisor的numprocs
指令来指示要同时运行多少个实例。REDIS_URL
变量添加到你的Heroku环境中,这正是应用所需的。rq
worker进程。 为此,你将需要在procfile的一个单独的行中声明worker:REDIS_URL
环境变量。 下面是一个完整的命令来启动应用,包含了一个redis链接:docker run
命令:--entrypoint
参数只取得可执行文件的名称,但是参数(如果有的话)需要在镜像和标签之后,也就是在命令行的结尾处给出。 请注意rq
命令需要使用venv/bin/rq
,以便在没有手动激活虚拟环境的情况下,也能识别虚拟环境并正常工作。