分类 软件开发 下的文章

使用 Pyramid 和 Cornice 构建和描述可扩展的 RESTful Web 服务。

Python 是一种高级的、面向对象的编程语言,它以其简单的语法而闻名。它一直是构建 RESTful API 的顶级编程语言之一。

Pyramid 是一个 Python Web 框架,旨在随着应用的扩展而扩展:这可以让简单的应用很简单,也可以增长为大型、复杂的应用。此外,Pyramid 为 PyPI (Python 软件包索引)提供了强大的支持。Cornice 为使用 Pyramid 构建和描述 RESTful Web 服务提供了助力。

本文将使用 Web 服务的例子来获取名人名言,来展示如何使用这些工具。

建立 Pyramid 应用

首先为你的应用创建一个虚拟环境,并创建一个文件来保存代码:

$ mkdir tutorial
$ cd tutorial
$ touch main.py
$ python3 -m venv env
$ source env/bin/activate
(env) $ pip3 install cornice twisted

导入 Cornice 和 Pyramid 模块

使用以下命令导入这些模块:

from pyramid.config import Configurator
from cornice import Service

定义服务

将引用服务定义为 Service 对象:

QUOTES = Service(name='quotes',
                 path='/',
                 description='Get quotes')

编写引用逻辑

到目前为止,这仅支持获取名言。用 QUOTES.get 装饰函数。这是将逻辑绑定到 REST 服务的方法:

@QUOTES.get()
def get_quote(request):
    return {
        'William Shakespeare': {
            'quote': ['Love all, trust a few, do wrong to none',
            'Some are born great, some achieve greatness, and some have greatness thrust upon them.']
    },
    'Linus': {
        'quote': ['Talk is cheap. Show me the code.']
        }
    }

请注意,与其他框架不同,装饰器不会更改 get_quote 函数。如果导入此模块,你仍然可以定期调用该函数并检查结果。

在为 Pyramid RESTful 服务编写单元测试时,这很有用。

定义应用对象

最后,使用 scan 查找所有修饰的函数并将其添加到配置中:

with Configurator() as config:
    config.include("cornice")
    config.scan()
    application = config.make_wsgi_app()

默认扫描当前模块。如果要扫描软件包中的所有模块,你也可以提供软件包的名称。

运行服务

我使用 Twisted 的 WSGI 服务器运行该应用,但是如果需要,你可以使用任何其他 WSGI 服务器,例如 Gunicorn 或 uWSGI。

(env)$ python -m twisted web --wsgi=main.application

默认情况下,Twisted 的 WSGI 服务器运行在端口 8080 上。你可以使用 HTTPie 测试该服务:

(env) $ pip install httpie
...
(env) $ http GET <http://localhost:8080/>
HTTP/1.1 200 OK
Content-Length: 220
Content-Type: application/json
Date: Mon, 02 Dec 2019 16:49:27 GMT
Server: TwistedWeb/19.10.0
X-Content-Type-Options: nosniff

{
    "Linus": {
        "quote": [
            "Talk is cheap. Show me the code."
        ]
    },
    "William Shakespeare": {
        "quote": [
            "Love all,trust a few,do wrong to none",
            "Some are born great, some achieve greatness, and some greatness thrust upon them."
        ]
    }
}

为什么要使用 Pyramid?

Pyramid 并不是最受欢迎的框架,但它已在 PyPI 等一些引人注目的项目中使用。我喜欢 Pyramid,因为它是认真对待单元测试的框架之一:因为装饰器不会修改函数并且没有线程局部变量,所以可以直接从单元测试中调用函数。例如,需要访问数据库的函数将从通过 request.config 传递的 request.config 对象中获取它。这允许单元测试人员将模拟(或真实)数据库对象放入请求中,而不用仔细设置全局变量、线程局部变量或其他特定于框架的东西。

如果你正在寻找一个经过测试的库来构建你接下来的 API,请尝试使用 Pyramid。你不会失望的。


via: https://opensource.com/article/20/1/python-web-api-pyramid-cornice

作者:Moshe Zadka 选题:lujun9972 译者:geekpi 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

学习如何使用 Python 的 Pygame 模块编程电脑游戏,并开始操作引力。

真实的世界充满了运动和生活。物理学使得真实的生活如此忙碌和动态。物理学是物质在空间中运动的方式。既然一个电脑游戏世界没有物质,它也就没有物理学规律,使用游戏程序员不得不模拟物理学。

从大多数电脑游戏来说,这里基本上仅有两个方面的物理学是重要的:引力和碰撞。

当你添加一个敌人到你的游戏中时,你实现了一些碰撞检测,但是这篇文章要添加更多的东西,因为引力需要碰撞检测。想想为什么引力可能涉及碰撞。如果你不能想到任何原因,不要担心 —— 它会随着你开发示例代码工作而且显然。

在真实世界中的引力是有质量的物体来相互吸引的倾向性。物体(质量)越大,它施加越大的引力作用。在电脑游戏物理学中,你不必创建质量足够大的物体来证明引力的正确;你可以在电脑游戏世界本身中仅编程一个物体落向假设的最大的对象的倾向。

添加一个引力函数

记住你的玩家已经有了一个决定动作的属性。使用这个属性来将玩家精灵拉向屏幕底部。

在 Pygame 中,较高的数字更接近屏幕的底部边缘。

在真实的世界中,引力影响一切。然而,在平台游戏中,引力是有选择性的 —— 如果你添加引力到你的整个游戏世界,你的所有平台都将掉到地上。反之,你可以仅添加引力到你的玩家和敌人精灵中。

首先,在你的 Player 类中添加一个 gravity 函数:

    def gravity(self):
        self.movey += 3.2 # 玩家掉落的多快

这是一个简单的函数。首先,不管你的玩家是否想运动,你设置你的玩家垂直运动。也就是说,你已经编程你的玩家总是在下降。这基本上就是引力。

为使引力函数生效,你必须在你的主循环中调用它。这样,当每一个处理循环时,Python 都应用下落运动到你的玩家。

在这代码中,添加第一行到你的循环中:

    player.gravity() # 检查引力
    player.update()

启动你的游戏来看看会发生什么。要注意,因为它发生的很快:你是玩家从天空上下落,马上掉出了你的游戏屏幕。

你的引力模拟是工作的,但是,也许太好了。

作为一次试验,尝试更改你玩家下落的速度。

给引力添加一个地板

你的游戏没有办法发现你的角色掉落出世界的问题。在一些游戏中,如果一个玩家掉落出世界,该精灵被删除,并在某个新的位置重生。在另一些游戏中,玩家会丢失分数或一条生命。当一个玩家掉落出世界时,不管你想发生什么,你必须能够侦测出玩家何时消失在屏幕外。

在 Python 中,要检查一个条件,你可以使用一个 if 语句。

你必需查看你玩家是否正在掉落,以及你的玩家掉落的程度。如果你的玩家掉落到屏幕的底部,那么你可以做一些事情。简化一下,设置玩家精灵的位置为底部边缘上方 20 像素。

使你的 gravity 函数看起来像这样:

    def gravity(self):
        self.movey += 3.2 # 玩家掉落的多快
       
        if self.rect.y > worldy and self.movey >= 0:
            self.movey = 0
            self.rect.y = worldy-ty

然后,启动你的游戏。你的精灵仍然下落,但是它停在屏幕的底部。不过,你也许不能看到你在地面层之上的精灵。一个简单的解决方法是,在精灵碰撞游戏世界的底部后,通过添加另一个 -ty 到它的新 Y 位置,从而使你的精灵弹跳到更高处:

    def gravity(self):
        self.movey += 3.2 # 玩家掉落的多快
       
        if self.rect.y > worldy and self.movey >= 0:
            self.movey = 0
            self.rect.y = worldy-ty-ty

现在你的玩家在屏幕底部弹跳,恰好在你地面精灵上面。

你的玩家真正需要的是反抗引力的方法。引力问题是,你不能反抗它,除非你有一些东西来推开引力作用。因此,在接下来的文章中,你将添加地面和平台碰撞以及跳跃能力。在这期间,尝试应用引力到敌人精灵。

到目前为止,这里是全部的代码:

#!/usr/bin/env python3
# draw a world
# add a player and player control
# add player movement
# add enemy and basic collision
# add platform
# add gravity

# GNU All-Permissive License
# Copying and distribution of this file, with or without modification,
# are permitted in any medium without royalty provided the copyright
# notice and this notice are preserved.  This file is offered as-is,
# without any warranty.

import pygame
import sys
import os

'''
Objects
'''

class Platform(pygame.sprite.Sprite):
    # x location, y location, img width, img height, img file    
    def __init__(self,xloc,yloc,imgw,imgh,img):
        pygame.sprite.Sprite.__init__(self)
        self.image = pygame.image.load(os.path.join('images',img)).convert()
        self.image.convert_alpha()
        self.rect = self.image.get_rect()
        self.rect.y = yloc
        self.rect.x = xloc

class Player(pygame.sprite.Sprite):
    '''
    Spawn a player
    '''
    def __init__(self):
        pygame.sprite.Sprite.__init__(self)
        self.movex = 0
        self.movey = 0
        self.frame = 0
        self.health = 10
        self.score = 1
        self.images = []
        for i in range(1,9):
            img = pygame.image.load(os.path.join('images','hero' + str(i) + '.png')).convert()
            img.convert_alpha()
            img.set_colorkey(ALPHA)
            self.images.append(img)
            self.image = self.images[0]
            self.rect  = self.image.get_rect()

    def gravity(self):
        self.movey += 3.2 # how fast player falls
       
        if self.rect.y > worldy and self.movey >= 0:
            self.movey = 0
            self.rect.y = worldy-ty-ty
       
    def control(self,x,y):
        '''
        control player movement
        '''
        self.movex += x
        self.movey += y
       
    def update(self):
        '''
        Update sprite position
        '''

        self.rect.x = self.rect.x + self.movex
        self.rect.y = self.rect.y + self.movey

        # moving left
        if self.movex < 0:
            self.frame += 1
            if self.frame > ani*3:
                self.frame = 0
            self.image = self.images[self.frame//ani]

        # moving right
        if self.movex > 0:
            self.frame += 1
            if self.frame > ani*3:
                self.frame = 0
            self.image = self.images[(self.frame//ani)+4]

        # collisions
        enemy_hit_list = pygame.sprite.spritecollide(self, enemy_list, False)
        for enemy in enemy_hit_list:
            self.health -= 1
            print(self.health)

        ground_hit_list = pygame.sprite.spritecollide(self, ground_list, False)
        for g in ground_hit_list:
            self.health -= 1
            print(self.health)

class Enemy(pygame.sprite.Sprite):
    '''
    Spawn an enemy
    '''
    def __init__(self,x,y,img):
        pygame.sprite.Sprite.__init__(self)
        self.image = pygame.image.load(os.path.join('images',img))
        #self.image.convert_alpha()
        #self.image.set_colorkey(ALPHA)
        self.rect = self.image.get_rect()
        self.rect.x = x
        self.rect.y = y
        self.counter = 0
       
    def move(self):
        '''
        enemy movement
        '''
        distance = 80
        speed = 8

        if self.counter >= 0 and self.counter <= distance:
            self.rect.x += speed
        elif self.counter >= distance and self.counter <= distance*2:
            self.rect.x -= speed
        else:
            self.counter = 0

        self.counter += 1

class Level():
    def bad(lvl,eloc):
        if lvl == 1:
            enemy = Enemy(eloc[0],eloc[1],'yeti.png') # spawn enemy
            enemy_list = pygame.sprite.Group() # create enemy group
            enemy_list.add(enemy)              # add enemy to group
           
        if lvl == 2:
            print("Level " + str(lvl) )

        return enemy_list

    def loot(lvl,lloc):
        print(lvl)

    def ground(lvl,gloc,tx,ty):
        ground_list = pygame.sprite.Group()
        i=0
        if lvl == 1:
            while i < len(gloc):
                ground = Platform(gloc[i],worldy-ty,tx,ty,'ground.png')
                ground_list.add(ground)
                i=i+1

        if lvl == 2:
            print("Level " + str(lvl) )

        return ground_list

    def platform(lvl,tx,ty):
        plat_list = pygame.sprite.Group()
        ploc = []
        i=0
        if lvl == 1:
            ploc.append((0,worldy-ty-128,3))
            ploc.append((300,worldy-ty-256,3))
            ploc.append((500,worldy-ty-128,4))

            while i < len(ploc):
                j=0
                while j <= ploc[i][2]:
                    plat = Platform((ploc[i][0]+(j*tx)),ploc[i][1],tx,ty,'ground.png')
                    plat_list.add(plat)
                    j=j+1
                print('run' + str(i) + str(ploc[i]))
                i=i+1

        if lvl == 2:
            print("Level " + str(lvl) )

        return plat_list

'''
Setup
'''
worldx = 960
worldy = 720

fps = 40 # frame rate
ani = 4  # animation cycles
clock = pygame.time.Clock()
pygame.init()
main = True

BLUE  = (25,25,200)
BLACK = (23,23,23 )
WHITE = (254,254,254)
ALPHA = (0,255,0)

world = pygame.display.set_mode([worldx,worldy])
backdrop = pygame.image.load(os.path.join('images','stage.png')).convert()
backdropbox = world.get_rect()
player = Player() # spawn player
player.rect.x = 0
player.rect.y = 0
player_list = pygame.sprite.Group()
player_list.add(player)
steps = 10 # how fast to move

eloc = []
eloc = [200,20]
gloc = []
#gloc = [0,630,64,630,128,630,192,630,256,630,320,630,384,630]
tx = 64 #tile size
ty = 64 #tile size

i=0
while i <= (worldx/tx)+tx:
    gloc.append(i*tx)
    i=i+1

enemy_list = Level.bad( 1, eloc )
ground_list = Level.ground( 1,gloc,tx,ty )
plat_list = Level.platform( 1,tx,ty )

'''
Main loop
'''
while main == True:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            pygame.quit(); sys.exit()
            main = False

        if event.type == pygame.KEYDOWN:
            if event.key == pygame.K_LEFT or event.key == ord('a'):
                print("LEFT")
                player.control(-steps,0)
            if event.key == pygame.K_RIGHT or event.key == ord('d'):
                print("RIGHT")
                player.control(steps,0)
            if event.key == pygame.K_UP or event.key == ord('w'):
                print('jump')

        if event.type == pygame.KEYUP:
            if event.key == pygame.K_LEFT or event.key == ord('a'):
                player.control(steps,0)
            if event.key == pygame.K_RIGHT or event.key == ord('d'):
                player.control(-steps,0)
            if event.key == pygame.K_UP or event.key == ord('w'):
                print('jump')

            if event.key == ord('q'):
                pygame.quit()
                sys.exit()
                main = False

    world.blit(backdrop, backdropbox)
    player.gravity() # check gravity
    player.update()
    player_list.draw(world)
    enemy_list.draw(world)
    ground_list.draw(world)
    plat_list.draw(world)
    for e in enemy_list:
        e.move()
    pygame.display.flip()
    clock.tick(fps)

这是仍在进行中的关于使用 Pygame 模块来在 Python 3 在创建电脑游戏的第七部分。先前的文章是:


via: https://opensource.com/article/19/11/simulate-gravity-python

作者:Seth Kenlon 选题:lujun9972 译者:robsean 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

通过学习如何定位并发处理的陷阱来避免未来处理这些问题时的困境。

在复杂的分布式系统进行任务处理时,你通常会需要进行并发的操作。在 Mode.net 公司,我们每天都要和实时、快速和灵活的软件打交道。而没有一个高度并发的系统,就不可能构建一个毫秒级的动态地路由数据包的全球专用网络。这个动态路由是基于网络状态的,尽管这个过程需要考虑众多因素,但我们的重点是链路指标。在我们的环境中,链路指标可以是任何跟网络链接的状态和当前属性(如链接延迟)有关的任何内容。

并发探测链接监控

我们的动态路由算法 H.A.L.O. 逐跳自适应链路状态最佳路由 Hop-by-Hop Adaptive Link-State Optimal Routing )部分依赖于链路指标来计算路由表。这些指标由位于每个 PoP( 存活节点 Point of Presence )上的独立组件收集。PoP 是表示我们的网络中单个路由实体的机器,通过链路连接并分布在我们的网络拓扑中的各个位置。某个组件使用网络数据包探测周围的机器,周围的机器回复数据包给前者。从接收到的探测包中可以获得链路延迟。由于每个 PoP 都有不止一个临近节点,所以这种探测任务实质上是并发的:我们需要实时测量每个临近连接点的延迟。我们不能串行地处理;为了计算这个指标,必须尽快处理每个探测。

 title=

序列号和重置:一个重新排列场景

我们的探测组件互相发送和接收数据包,并依靠序列号进行数据包处理。这旨在避免处理重复的包或顺序被打乱的包。我们的第一个实现依靠特殊的序列号 0 来重置序列号。这个数字仅在组件初始化时使用。主要的问题是我们考虑了递增的序列号总是从 0 开始。在该组件重启后,包的顺序可能会重新排列,某个包的序列号可能会轻易地被替换成重置之前使用过的值。这意味着,后继的包都会被忽略掉,直到排到重置之前用到的序列值。

UDP 握手和有限状态机

这里的问题是该组件重启前后的序列号是否一致。有几种方法可以解决这个问题,经过讨论,我们选择了实现一个带有清晰状态定义的三步握手协议。这个握手过程在初始化时通过链接建立会话。这样可以确保节点通过同一个会话进行通信且使用了适当的序列号。

为了正确实现这个过程,我们必须定义一个有清晰状态和过渡的有限状态机。这样我们就可以正确管理握手过程中的所有极端情况。

 title=

会话 ID 由握手的初始化程序生成。一个完整的交换顺序如下:

  1. 发送者发送一个 SYN(ID) 数据包。
  2. 接收者存储接收到的 ID 并发送一个 SYN-ACK(ID)
  3. 发送者接收到 SYN-ACK(ID) 并发送一个 ACK(ID)。它还发送一个从序列号 0 开始的数据包。
  4. 接收者检查最后接收到的 ID,如果 ID 匹配,则接受 ACK(ID)。它还开始接受序列号为 0 的数据包。

处理状态超时

基本上,每种状态下你都需要处理最多三种类型的事件:链接事件、数据包事件和超时事件。这些事件会并发地出现,因此你必须正确处理并发。

  • 链接事件包括网络连接或网络断开的变化,相应的初始化一个链接会话或断开一个已建立的会话。
  • 数据包事件是控制数据包(SYN/SYN-ACK/ACK)或只是探测响应。
  • 超时事件在当前会话状态的预定超时时间到期后触发。

这里面临的最主要的问题是如何处理并发的超时到期和其他事件。这里很容易陷入死锁和资源竞争的陷阱。

第一种方法

本项目使用的语言是 Golang。它确实提供了原生的同步机制,如自带的通道和锁,并且能够使用轻量级线程来进行并发处理。

 title=

gopher 们聚众狂欢

首先,你可以设计两个分别表示我们的会话和超时处理程序的结构体。

type Session struct {  
  State SessionState  
  Id SessionId  
  RemoteIp string  
}

type TimeoutHandler struct {  
  callback func(Session)  
  session Session  
  duration int  
  timer *timer.Timer  
}

Session 标识连接会话,内有表示会话 ID、临近的连接点的 IP 和当前会话状态的字段。

TimeoutHandler 包含回调函数、对应的会话、持续时间和指向调度计时器的指针。

每一个临近连接点的会话都包含一个保存调度 TimeoutHandler 的全局映射。

SessionTimeout map[Session]*TimeoutHandler

下面方法注册和取消超时:

// schedules the timeout callback function.  
func (timeout* TimeoutHandler) Register() {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {  
    timeout.callback(timeout.session)  
  })  
}

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.timer == nil {  
    return  
  }  
  timeout.timer.Stop()  
}

你可以使用类似下面的方法来创建和存储超时:

func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {  
  if sessionTimeout[session] == nil {  
    sessionTimeout[session] := new(TimeoutHandler)  
  }  
   
  timeout = sessionTimeout[session]  
  timeout.session = session  
  timeout.callback = callback  
  timeout.duration = duration  
  return timeout  
}

超时处理程序创建后,会在经过了设置的 duration 时间(秒)后执行回调函数。然而,有些事件会使你重新调度一个超时处理程序(与 SYN 状态时的处理一样,每 3 秒一次)。

为此,你可以让回调函数重新调度一次超时:

func synCallback(session Session) {  
  sendSynPacket(session)

  // reschedules the same callback.  
  newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)  
  newTimeout.Register()

  sessionTimeout[state] = newTimeout  
}

这次回调在新的超时处理程序中重新调度自己,并更新全局映射 sessionTimeout

数据竞争和引用

你的解决方案已经有了。可以通过检查计时器到期后超时回调是否执行来进行一个简单的测试。为此,注册一个超时,休眠 duration 秒,然后检查是否执行了回调的处理。执行这个测试后,最好取消预定的超时时间(因为它会重新调度),这样才不会在下次测试时产生副作用。

令人惊讶的是,这个简单的测试发现了这个解决方案中的一个问题。使用 cancel 方法来取消超时并没有正确处理。以下顺序的事件会导致数据资源竞争:

  1. 你有一个已调度的超时处理程序。
  2. 线程 1:

    1. 你接收到一个控制数据包,现在你要取消已注册的超时并切换到下一个会话状态(如发送 SYN 后接收到一个 SYN-ACK
    2. 你调用了 timeout.Cancel(),这个函数调用了 timer.Stop()。(请注意,Golang 计时器的停止不会终止一个已过期的计时器。)
  3. 线程 2:

    1. 在取消调用之前,计时器已过期,回调即将执行。
    2. 执行回调,它调度一次新的超时并更新全局映射。
  4. 线程 1:

    1. 切换到新的会话状态并注册新的超时,更新全局映射。

两个线程并发地更新超时映射。最终结果是你无法取消注册的超时,然后你也会丢失对线程 2 重新调度的超时的引用。这导致处理程序在一段时间内持续执行和重新调度,出现非预期行为。

锁也解决不了问题

使用锁也不能完全解决问题。如果你在处理所有事件和执行回调之前加锁,它仍然不能阻止一个过期的回调运行:

func (timeout* TimeoutHandler) Register() {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {  
    stateLock.Lock()  
    defer stateLock.Unlock()

    timeout.callback(timeout.session)  
  })  
}

现在的区别就是全局映射的更新是同步的,但是这还是不能阻止在你调用 timeout.Cancel() 后回调的执行 —— 这种情况出现在调度计时器过期了但是还没有拿到锁的时候。你还是会丢失一个已注册的超时的引用。

使用取消通道

你可以使用取消通道,而不必依赖不能阻止到期的计时器执行的 golang 函数 timer.Stop()

这是一个略有不同的方法。现在你可以不用再通过回调进行递归地重新调度;而是注册一个死循环,这个循环接收到取消信号或超时事件时终止。

新的 Register() 产生一个新的 go 线程,这个线程在超时后执行你的回调,并在前一个超时执行后调度新的超时。返回给调用方一个取消通道,用来控制循环的终止。

func (timeout *TimeoutHandler) Register() chan struct{} {  
  cancelChan := make(chan struct{})  
   
  go func () {  
    select {  
    case _ = <- cancelChan:  
      return  
    case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
      func () {  
        stateLock.Lock()  
        defer stateLock.Unlock()

        timeout.callback(timeout.session)  
      } ()  
    }  
  } ()

  return cancelChan  
}

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  
  timeout.cancelChan <- struct{}{}  
}

这个方法给你注册的所有超时提供了取消通道。一个取消调用向通道发送一个空结构体并触发取消操作。然而,这并不能解决前面的问题;可能在你通过通道取消之前以及超时线程拿到锁之前,超时时间就已经到了。

这里的解决方案是,在拿到锁之后,检查一下超时范围内的取消通道。

  case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
    func () {  
      stateLock.Lock()  
      defer stateLock.Unlock()  
     
      select {  
      case _ = <- handler.cancelChan:  
        return  
      default:  
        timeout.callback(timeout.session)  
      }  
    } ()  
  }

最终,这可以确保在拿到锁之后执行回调,不会触发取消操作。

小心死锁

这个解决方案看起来有效;但是还是有个隐患:死锁

请阅读上面的代码,试着自己找到它。考虑下描述的所有函数的并发调用。

这里的问题在取消通道本身。我们创建的是无缓冲通道,即发送的是阻塞调用。当你在一个超时处理程序中调用取消函数时,只有在该处理程序被取消后才能继续处理。问题出现在,当你有多个调用请求到同一个取消通道时,这时一个取消请求只被处理一次。当多个事件同时取消同一个超时处理程序时,如连接断开或控制包事件,很容易出现这种情况。这会导致死锁,可能会使应用程序停机。

 title=

有人在听吗?

(已获得 Trevor Forrey 授权。)

这里的解决方案是创建通道时指定缓存大小至少为 1,这样向通道发送数据就不会阻塞,也显式地使发送变成非阻塞的,避免了并发调用。这样可以确保取消操作只发送一次,并且不会阻塞后续的取消调用。

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  
   
  select {  
  case timeout.cancelChan <- struct{}{}:  
  default:  
    // can’t send on the channel, someone has already requested the cancellation.  
  }  
}

总结

在实践中你学到了并发操作时出现的常见错误。由于其不确定性,即使进行大量的测试,也不容易发现这些问题。下面是我们在最初的实现中遇到的三个主要问题:

在非同步的情况下更新共享数据

这似乎是个很明显的问题,但如果并发更新发生在不同的位置,就很难发现。结果就是数据竞争,由于一个更新会覆盖另一个,因此对同一数据的多次更新中会有某些更新丢失。在我们的案例中,我们是在同时更新同一个共享映射里的调度超时引用。(有趣的是,如果 Go 检测到在同一个映射对象上的并发读写,会抛出致命错误 — 你可以尝试下运行 Go 的数据竞争检测器)。这最终会导致丢失超时引用,且无法取消给定的超时。当有必要时,永远不要忘记使用锁。

 title=

不要忘记同步 gopher 们的工作

缺少条件检查

在不能仅依赖锁的独占性的情况下,就需要进行条件检查。我们遇到的场景稍微有点不一样,但是核心思想跟条件变量是一样的。假设有个一个生产者和多个消费者使用一个共享队列的经典场景,生产者可以将一个元素添加到队列并唤醒所有消费者。这个唤醒调用意味着队列中的数据是可访问的,并且由于队列是共享的,消费者必须通过锁来进行同步访问。每个消费者都可能拿到锁;然而,你仍然需要检查队列中是否有元素。因为在你拿到锁的瞬间并不知道队列的状态,所以还是需要进行条件检查。

在我们的例子中,超时处理程序收到了计时器到期时发出的“唤醒”调用,但是它仍需要检查是否已向其发送了取消信号,然后才能继续执行回调。

 title=

如果你要唤醒多个 gopher,可能就需要进行条件检查

死锁

当一个线程被卡住,无限期地等待一个唤醒信号,但是这个信号永远不会到达时,就会发生这种情况。死锁可以通过让你的整个程序停机来彻底杀死你的应用。

在我们的案例中,这种情况的发生是由于多次发送请求到一个非缓冲且阻塞的通道。这意味着向通道发送数据只有在从这个通道接收完数据后才能返回。我们的超时线程循环迅速从取消通道接收信号;然而,在接收到第一个信号后,它将跳出循环,并且再也不会从这个通道读取数据。其他的调用会一直被卡住。为避免这种情况,你需要仔细检查代码,谨慎处理阻塞调用,并确保不会发生线程饥饿。我们例子中的解决方法是使取消调用成为非阻塞调用 — 我们不需要阻塞调用。


via: https://opensource.com/article/19/12/go-common-pitfalls

作者:Eduardo Ferreira 选题:lujun9972 译者:lxbwolf 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

Django 是 Python API 开发中最流行的框架之一,在这个教程中,我们来学习如何使用它。

Django 所有 Web 框架中最全面的,也是最受欢迎的一个。自 2005 年以来,其流行度大幅上升。

Django 是由 Django 软件基金会维护,并且获得了社区的大力支持,在全球拥有超过 11,600 名成员。在 Stack Overflow 上,约有 191,000 个带 Django 标签的问题。Spotify、YouTube 和 Instagram 等都使用 Django 来构建应用程序和数据管理。

本文演示了一个简单的 API,通过它可以使用 HTTP 协议的 GET 方法来从服务器获取数据。

构建一个项目

首先,为你的 Django 应用程序创建一个目录结构,你可以在系统的任何位置创建:

$ mkdir myproject
$ cd myproject

然后,在项目目录中创建一个虚拟环境来隔离本地包依赖关系:

$ python3 -m venv env
$ source env/bin/activate

在 Windows 上,使用命令 env\Scripts\activate 来激活虚拟环境。

安装 Django 和 Django REST framework

然后,安装 Django 和 Django REST 模块:

$ pip3 install django
$ pip3 install djangorestframework

实例化一个新的 Django 项目

现在你的应用程序已经有了一个工作环境,你必须实例化一个新的 Django 项目。与 Flask 这样微框架不同的是,Django 有专门的命令来创建(注意第一条命令后的 . 字符)。

$ django-admin startproject tutorial .
$ cd tutorial
$ django-admin startapp quickstart

Django 使用数据库来管理后端,所以你应该在开始开发之前同步数据库,数据库可以通过 manage.py 脚本管理,它是在你运行 django-admin 命令时创建的。因为你现在在 tutorial 目录,所以使用 ../ 符号来运行脚本,它位于上一层目录:

$ python3 ../manage.py makemigrations
No changes detected
$ python3 ../manage.py migrate
Operations to perform:
  Apply all migrations: admin, auth, contenttypes, sessions
Running migrations:
  Applying contenttypes.0001_initial... OK
  Applying auth.0001_initial... OK
  Applying admin.0001_initial... OK
  Applying admin.0002_logentry_remove_auto_add... OK
  Applying admin.0003_logentry_add_action_flag_choices... OK
  Applying contenttypes.0002_remove_content_type_name... OK
  Applying auth.0002_alter_permission_name_max_length... OK
  Applying auth.0003_alter_user_email_max_length... OK
  Applying auth.0004_alter_user_username_opts... OK
  Applying auth.0005_alter_user_last_login_null... OK
  Applying auth.0006_require_contenttypes_0002... OK
  Applying auth.0007_alter_validators_add_error_messages... OK
  Applying auth.0008_alter_user_username_max_length... OK
  Applying auth.0009_alter_user_last_name_max_length... OK
  Applying auth.0010_alter_group_name_max_length... OK
  Applying auth.0011_update_proxy_permissions... OK
  Applying sessions.0001_initial... OK

在 Django 中创建用户

创建一个名为 admin,示例密码为 password123 的初始用户:

$ python3 ../manage.py createsuperuser \
  --email [email protected] \
  --username admin

在提示时创建密码。

在 Django 中实现序列化和视图

为了使 Django 能够将信息传递给 HTTP GET 请求,必须将信息对象转化为有效的响应数据。Django 为此实现了“序列化类” serializers

在你的项目中,创建一个名为 quickstart/serializers.py 的新模块,使用它来定义一些序列化器,模块将用于数据展示:

from django.contrib.auth.models import User, Group
from rest_framework import serializers

class UserSerializer(serializers.HyperlinkedModelSerializer):
    class Meta:
        model = User
        fields = ['url', 'username', 'email', 'groups']

class GroupSerializer(serializers.HyperlinkedModelSerializer):
    class Meta:
        model = Group
        fields = ['url', 'name']

Django 中的视图是一个接受 Web 请求并返回 Web 响应的函数。响应可以是 HTML、HTTP 重定向、HTTP 错误、JSON 或 XML 文档、图像或 TAR 文件,或者可以是从 Internet 获得的任何其他内容。要创建视图,打开 quickstart/views.py 并输入以下代码。该文件已经存在,并且其中包含一些示例文本,保留这些文本并将以下代码添加到文件中:

from django.contrib.auth.models import User, Group
from rest_framework import viewsets
from tutorial.quickstart.serializers import UserSerializer, GroupSerializer

class UserViewSet(viewsets.ModelViewSet):
    """
    API 允许查看或编辑用户
    """
    queryset = User.objects.all().order_by('-date_joined')
    serializer_class = UserSerializer

class GroupViewSet(viewsets.ModelViewSet):
    """
    API 允许查看或编辑组
    """
    queryset = Group.objects.all()
    serializer_class = GroupSerializer

使用 Django 生成 URL

现在,你可以生成 URL 以便人们可以访问你刚起步的 API。在文本编辑器中打开 urls.py 并将默认示例代码替换为以下代码:

from django.urls import include, path
from rest_framework import routers
from tutorial.quickstart import views

router = routers.DefaultRouter()
router.register(r'users', views.UserViewSet)
router.register(r'groups', views.GroupViewSet)

# 使用自动路由 URL
# 还有登录 URL
urlpatterns = [
    path('', include(router.urls)),
    path('api-auth/', include('rest_framework.urls', namespace='rest_framework'))
]

调整你的 Django 项目设置

这个示例项目的设置模块存储在 tutorial/settings.py 中,因此在文本编辑器中将其打开,然后在 INSTALLED_APPS 列表的末尾添加 rest_framework

INSTALLED_APPS = [
    ...
    'rest_framework',
]

测试 Django API

现在,你可以测试构建的 API。首先,从命令行启动内置服务器:

$ python3 manage.py runserver

你可以通过使用 curl 导航至 URL http://localhost:8000/users 来访问 API:

$ curl --get http://localhost:8000/users/?format=json
[{"url":"http://localhost:8000/users/1/?format=json","username":"admin","email":"[email protected]","groups":[]}]

使用 Firefox 或你选择的开源浏览器

 title=

有关使用 Django 和 Python 的 RESTful API 的更多深入知识,参考出色的 Django 文档

为什么要使用 Djago?

Django 的主要优点:

  1. Django 社区的规模正在不断扩大,因此即使你做一个复杂项目,也会有大量的指导资源。
  2. 默认包含模板、路由、表单、身份验证和管理工具等功能,你不必寻找外部工具,也不必担心第三方工具会引入兼容性问题。
  3. 用户、循环和条件的简单结构使你可以专注于编写代码。
  4. 这是一个成熟且经过优化的框架,它非常快速且可靠。

Django 的主要缺点:

  1. Django 很复杂!从开发人员视角的角度来看,它可能比简单的框架更难学。
  2. Django 有一个很大的生态系统。一旦你熟悉它,这会很棒,但是当你深入学习时,它可能会令人感到无所适从。

对你的应用程序或 API 来说,Django 是绝佳选择。下载并熟悉它,开始开发一个迷人的项目!


via: https://opensource.com/article/19/11/python-web-api-django

作者:Rachel Waston 选题:lujun9972 译者:MjSeven 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

Zope.interface 可以帮助声明存在哪些接口,是由哪些对象提供的,以及如何查询这些信息。

 title=

zope.interface 库可以克服 Python 接口设计中的歧义性。让我们来研究一下。

隐式接口不是 Python 之禅

Python 之禅 很宽松,但是有点自相矛盾,以至于你可以用它来例证任何东西。让我们来思考其中最著名的原则之一:“显示胜于隐式”。

传统上,在 Python 中会隐含的一件事是预期的接口。比如函数已经记录了它期望一个“类文件对象”或“序列”。但是什么是类文件对象呢?它支持 .writelines吗?.seek 呢?什么是一个“序列”?是否支持步进切片,例如 a[1:10:2]

最初,Python 的答案是所谓的“鸭子类型”,取自短语“如果它像鸭子一样行走,像鸭子一样嘎嘎叫,那么它可能就是鸭子”。换句话说,“试试看”,这可能是你能得到的最具隐式的表达。

为了使这些内容显式地表达出来,你需要一种方法来表达期望的接口。Zope Web 框架是最早用 Python 编写的大型系统之一,它迫切需要这些东西来使代码明确呈现出来,例如,期望从“类似用户的对象”获得什么。

zope.interface 由 Zope 开发,但作为单独的 Python 包发布。Zope.interface 可以帮助声明存在哪些接口,是由哪些对象提供的,以及如何查询这些信息。

想象编写一个简单的 2D 游戏,它需要各种东西来支持精灵界面(LCTT 译注:“ 精灵 Sprite ”是指游戏面板中各个组件)。例如,表示一个边界框,但也要表示对象何时与一个框相交。与一些其他语言不同,在 Python 中,将属性访问作为公共接口一部分是一种常见的做法,而不是实现 getter 和 setter。边界框应该是一个属性,而不是一个方法。

呈现精灵列表的方法可能类似于:

def render_sprites(render_surface, sprites):
    """
    sprites 应该是符合 Sprite 接口的对象列表:
    * 一个名为 "bounding_box" 的属性,包含了边界框
    * 一个名为 "intersects" 的方法,它接受一个边界框并返回 True 或 False
    """
    pass # 一些做实际渲染的代码

该游戏将具有许多处理精灵的函数。在每个函数中,你都必须在随附文档中指定预期。

此外,某些函数可能期望使用更复杂的精灵对象,例如具有 Z 序的对象。我们必须跟踪哪些方法需要 Sprite 对象,哪些方法需要 SpriteWithZ 对象。

如果能够使精灵是显式而直观的,这样方法就可以声明“我需要一个精灵”,并有个严格定义的接口,这不是很好吗?来看看 zope.interface

from zope import interface

class ISprite(interface.Interface):

    bounding_box = interface.Attribute(
        "边界框"
    )

    def intersects(box):
        "它和一个框相交吗?"

乍看起来,这段代码有点奇怪。这些方法不包括 self,而包含 self 是一种常见的做法,并且它有一个属性。这是在 zope.interface 中声明接口的方法。这看起来很奇怪,因为大多数人不习惯严格声明接口。

这样做的原因是接口显示了如何调用方法,而不是如何定义方法。因为接口不是超类,所以它们可以用来声明数据属性。

下面是一个能带有圆形精灵的接口的一个实现:

@implementer(ISprite)
@attr.s(auto_attribs=True)
class CircleSprite:
    x: float
    y: float
    radius: float

    @property
    def bounding_box(self):
        return (
            self.x - self.radius,
            self.y - self.radius,
            self.x + self.radius,
            self.y + self.radius,
        )

    def intersects(self, box):
        # 当且仅当至少一个角在圆内时,方框与圆相交
        top_left, bottom_right = box[:2], box[2:]
        for choose_x_from (top_left, bottom_right):
            for choose_y_from (top_left, bottom_right):
                x = choose_x_from[0]
                y = choose_y_from[1]
                if (((x - self.x) ` 2 + (y - self.y) ` 2) &lt;=
                    self.radius ` 2):
                     return True
        return False

显式声明了实现了该接口的 CircleSprite 类。它甚至能让我们验证该类是否正确实现了接口:

from zope.interface import verify

def test_implementation():
    sprite = CircleSprite(x=0, y=0, radius=1)
    verify.verifyObject(ISprite, sprite)

这可以由 pytest、nose 或其他测试框架运行,它将验证创建的精灵是否符合接口。测试通常是局部的:它不会测试仅在文档中提及的内容,甚至不会测试方法是否可以在没有异常的情况下被调用!但是,它会检查是否存在正确的方法和属性。这是对单元测试套件一个很好的补充,至少可以防止简单的拼写错误通过测试。


via: https://opensource.com/article/19/9/zopeinterface-python-package

作者:Moshe Zadka 选题:lujun9972 译者:MjSeven 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

欢迎阅读“Python 光明节(Pythonukkah)”系列文章,这个系列文章将会讨论《Python 之禅》。我们首先来看《Python 之禅》里的前两个原则:美观与明确。

早在 1999 年,Python 的贡献者之一,Tim Peters 就提出了《Python 之禅》,直到二十年后的今天,《Python 之禅》中的 19 条原则仍然对整个社区都产生着深远的影响。为此,就像庆典光明的 光明节 Hanukkah 一样,我们举行了这一次的“ Python 光明节 Pythonukkah ”。首先,我们会讨论《Python 之禅》中的前两个原则:美观和明确。

“Hanukkah is the Festival of Lights,

Instead of one day of presents, we get eight crazy nights.”

—亚当·桑德勒,光明节之歌

美观胜于丑陋 Beautiful is better than ugly

著名的《 计算机程序的构造和解释 Structure and Interpretation of Computer Programs 》中有这么一句话: 代码是写给人看的,只是恰好能让机器运行。 Programs must be written for people to read and only incidentally for machines to execute. 机器并不在乎代码的美观性,但人类在乎。

阅读美观的代码对人们来说是一种享受,这就要求在整套代码中保持一致的风格。使用诸如 Blackflake8Pylint 这一类工具能够有效地接近这一个目标。

但实际上,只有人类自己才知道什么才是真正的美观。因此,代码审查和协同开发是其中的不二法门,同时,在开发过程中倾听别人的意见也是必不可少的。

最后,个人的主观能动性也很重要,否则一切工具和流程都会变得毫无意义。只有意识到美观的重要性,才能主动编写出美观的代码。

这就是为什么美观在众多原则当中排到了首位,它让“美”成为了 Python 社区的一种价值。如果有人要问,”我们真的在乎美吗?“社区会以代码给出肯定的答案。

明确胜于隐晦 Explicit is better than implicit

人类会欢庆光明、惧怕黑暗,那是因为光能够让我们看到难以看清的事物。同样地,尽管有些时候我们会不自觉地把代码写得含糊不清,但明确地编写代码确实能够让我们理解很多抽象的概念。

“为什么类方法中要将 self 显式指定为第一个参数?”

这个问题已经是老生常谈了,但网络上很多流传已久的回答都是不准确的。在编写 元类 metaclass 时,显式指定 self 参数就显得毫无意义。如果你没有编写过元类,希望你可以尝试一下,这是很多 Python 程序员的必经之路。

显式指定 self 参数的原因并不是 Python 的设计者不想将这样的元类视为“默认”元类,而是因为第一个参数必须是显式的。

即使 Python 中确实允许非显式的情况存在(例如上下文变量),但我们还是应该提出疑问:某个东西是不是有存在的必要呢?如果非显式地传递参数会不会出现问题呢?有些时候,由于种种原因,这是会有问题的。总之,在写代码时一旦能够优先考虑到明确性,至少意味着能对不明确的地方提出疑问并对结果作出有效的估计。


via: https://opensource.com/article/19/12/zen-python-beauty-clarity

作者:Moshe Zadka 选题:lujun9972 译者:HankChow 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出