BlinkDetection

1. 眨眼检测

  • 使用dlib库 眨眼检测
# import the necessary packages
from scipy.spatial import distance as dist
from imutils.video import FileVideoStream
from imutils.video import VideoStream
from imutils import face_utils
import argparse
import imutils
import time
import dlib
import cv2
import os


def eye_aspect_ratio(eye):
    # compute the euclidean distances between the two sets of
    # vertical eye landmarks (x, y)-coordinates
    A = dist.euclidean(eye[1], eye[5])
    B = dist.euclidean(eye[2], eye[4])
    # compute the euclidean distance between the horizontal
    # eye landmark (x, y)-coordinates
    C = dist.euclidean(eye[0], eye[3])
    # compute the eye aspect ratio
    ear = (A + B) / (2.0 * C)
    # return the eye aspect ratio
    return ear


threshold = 0.27
shape_predictor = os.path.join(
    'backend', "shape_predictor_68_face_landmarks.dat")


class Eye_Detector():
    def __init__(self):
        EYE_AR_THRESH = threshold
        self.detector = dlib.get_frontal_face_detector()
        self.predictor = dlib.shape_predictor(shape_predictor)
        (lStart, lEnd) = face_utils.FACIAL_LANDMARKS_IDXS["left_eye"]
        (rStart, rEnd) = face_utils.FACIAL_LANDMARKS_IDXS["right_eye"]
        self.index = [lStart, lEnd, rStart, rEnd]

    def calculate_ear(self, frame):
        lStart, lEnd, rStart, rEnd = self.index
        detector, predictor = self.detector, self.predictor
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # detect faces in the grayscale frame
        rects = detector(gray, 0)
        # loop over the face detections
        ear = 0
        for rect in rects:
            # determine the facial landmarks for the face region, then
            # convert the facial landmark (x, y)-coordinates to a NumPy
            # array
            shape = predictor(gray, rect)
            shape = face_utils.shape_to_np(shape)
            # extract the left and right eye coordinates, then use the
            # coordinates to compute the eye aspect ratio for both eyes
            leftEye = shape[lStart:lEnd]
            rightEye = shape[rStart:rEnd]
            leftEAR = eye_aspect_ratio(leftEye)
            rightEAR = eye_aspect_ratio(rightEye)
            # average the eye aspect ratio together for both eyes
            ear = (leftEAR + rightEAR) / 2.0
        return ear
  • 使用socketio进行通信
#!/usr/bin/env python3
import os
from flask import Flask, render_template, request
from flask_socketio import SocketIO, send, emit
from PIL import Image
import base64
import io
import cv2
import numpy as np
import random
import time
from flask_sslify import SSLify
from backend import Eye_Detector

# ==================
# Initialize
# ==================

app = Flask(__name__)
socketio = SocketIO(app)
player_list = []
detector = Eye_Detector()
SSLify(app)

# prevent cached responses(https://stackoverflow.com/questions/47376744/how-to-prevent-cached-response-flask-server-using-chrome)
if app.config["DEBUG"]:
    @app.after_request
    def after_request(response):
        response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, public, max-age=0"
        response.headers["Expires"] = 0
        response.headers["Pragma"] = "no-cache"
        return response

# ==================
# Utils Function
# ==================


def find_by_id(_id):
    """
    Find element by id in the player_list.
    Args:
      _id: ID of this player.
    Return:
      index: The index of this player in the player list.
    """
    filt = [i for (i, item) in enumerate(player_list) if item['id'] == _id]
    index = None if len(filt) == 0 else filt[0]
    return index


def find_random_waiting(_id):
    """
    Randomly choose a 'waiting' player from the list.
    But it can not choose itself(_id).
    Args:
      _id: ID of this player.
    Return:
      index: The index of the chosen player in the player list.
    """
    filt = [i for (i, item) in enumerate(player_list) if (
        item['status'] == 'waiting' and item['id'] != _id)]
    if len(filt):
        index = random.choice(filt)
    else:
        index = None
    return index

# ==================
# Socket-io
# ==================


@socketio.on('connect')
def on_connect():
    """
    Called when the client just connected.
    Create a player and push it to the list.
    Player:
      id: Indentifier of the player.
      status: 'idle', 'waiting', 'playing'.
      ear: Eye aspect ratio.
      rival: The rival's id.
      startTime: The start time of the game.
      end: Whether the game has ended. '1' means end, '0' means playing.
    """
    client_id = request.args['id']
    index = find_by_id(client_id)
    if not index:
        player_list.append({'id': client_id, 'status': 'idle',
                            'ear': 0, 'rival': None, 'startTime': 0, 'end': 0})
    print(client_id, 'connected !')


@socketio.on('_disconnect')
def on_disconnect(message):
    """
    Called when the client just disconnected.
    Delete the player from the list.
    """
    client_id = message['id']
    index = find_by_id(client_id)
    del player_list[index]
    print(client_id, 'disconnected !')


@socketio.on('set_player_wait')
def set_player_wait(message):
    """
    Set the player's status to 'waiting'.
    And then Randomly match a player whose status is also 'waiting'.
    Args:
      id: ID of the player.
    Emit:
      get_rival: Tell the player its rival's id.
    """
    client_id = message['id']
    curr_index = find_by_id(client_id)
    player_list[curr_index]['status'] = 'waiting'
    print(client_id, 'set to waiting...')
    # look for rival
    rival_index = find_random_waiting(client_id)
    if rival_index != None:
        rival_id = player_list[rival_index]['id']
        player_list[curr_index]['rival'] = rival_id
        player_list[curr_index]['status'] = 'playing'

        player_list[rival_index]['rival'] = client_id
        player_list[rival_index]['status'] = 'playing'

        emit('get_rival', {'id': rival_id})
        print(client_id, 'find rival', rival_id)


@socketio.on('set_player_startTime')
def set_player_startTime(message):
    """
    Set the player's startTime.
    Args:
      id: ID of the player.
    """
    client_id = message['id']
    curr_index = find_by_id(client_id)
    player_list[curr_index]['startTime'] = time.time()
    print('startTime', player_list[curr_index])


@socketio.on('send_image')
def send_image(message):
    """
    1. Convert base64 string from client to np image.
    2. Run 'Detector engine' to calculate its value.
    3. Emit new player data to the player.
    Args:
        uri: Base64 string from client.
        id: ID of the player.
    Emit:
        get_arena_data: send arena status to the client including 
          EAR1: Eye aspect ratio of this player.
          EAR2: Eye aspect ratio of its rival.
          elapsed: Elapsed time since start.
          end: Whether the game has ended.
    """
    # ===============
    # Process Base64
    # ===============

    uri, _id = message['uri'], message['id']
    # split header and body
    img_data = uri.split(',')[1]
    img_data = base64.b64decode(img_data)
    image = Image.open(io.BytesIO(img_data))
    # bgr
    array = np.array(image)
    REJECT = 0

    # ===============
    # Set EAR
    # ===============

    index = find_by_id(_id)
    EAR1 = player_list[index]['ear']

    # If the game has ended, we should not update the player.
    if player_list[index]['end'] == 0:
        EAR1 = detector.calculate_ear(array)
        player_list[index]['ear'] = EAR1
        player_list[index]['uri'] = uri

    # Find rival's EAR.
    rival_id = player_list[index]['rival']
    rival_index = find_by_id(rival_id)
    EAR2 = player_list[rival_index]['ear']

    # Calculat elapsed time
    elapsed = time.time() - player_list[index]['startTime']

    # EAR < threshold is determined as 'blink'.
    threshold = 0.20
    # The first 3 second is not counted.
    prepare_time = 3
    if ((EAR1 < threshold and EAR1 > 0) or (EAR2 < threshold and EAR2 > 0)) and elapsed > prepare_time:
        # The game end. Set player's status back to 'idle'.
        player_list[index]['end'] = 1
        player_list[index]['status'] = 'idle'
        player_list[rival_index]['end'] = 1
        player_list[rival_index]['status'] = 'idle'
        emit('get_arena_data', {'EAR1': EAR1, 'EAR2': EAR2, 'elapsed': elapsed, 'end': 1,
                                'uri1': player_list[index]['uri'], 'uri2': player_list[rival_index]['uri']})
    else:
        emit('get_arena_data', {'EAR1': EAR1,
                                'EAR2': EAR2, 'elapsed': elapsed, 'end': 0})

# ==================
# Run flask
# ==================


@app.route("/")
def index():
    return render_template('index.html')


if __name__ == "__main__":
    socketio.run(app, host='0.0.0.0', port=3000,
                 keyfile=os.path.join('backend', 'server.key'), certfile=os.path.join('backend', 'server.crt'))
0%