迳口麒麟舞(KirinDance)AI教练人体姿态动作动态评分算法
JuruoAssociation 2024-10-07 10:31:01 阅读 91
迳口麒麟舞(KirinDance)AI教练人体姿态动作动态评分算法
文章目录
第一版算法原理局限性解决方案
第二版算法原理局限性解决方案
第三版
关注作者:JuruoAssociation 原创于 CSDN
原创不易,谢绝转载。点击支持原文
为了开发麒麟舞AI动作教练,动作解算我们采用的是OpenPose和PoseNet,但由于网上没有较为完善的动作评分系统,我们决定自行开发。
首先,我们的接口可以从摄像头中得到各个关节的置信度(Confidence)和位置(X、Y),设置一定的阈值(threshold)具有标记阈值之上置信度的关节为出现的关节,评分则基于这些出现的关节进行评分。
原项目已经开源在github
第一版
算法原理
第一版算法我们主要研究关节的瞬态位置,对于一个动作,选取若干个关节,获取其位置信息,然后做傅里叶变换转换到频域,这样即使是时间不同步(考虑显示延时和用户反应时间),他们的幅度曲线也具有相似性。
我们需要解算的是图1.A的标准视频中的静态动作数据,以及用户的摄像头动作流数据,图中横轴为时间,纵轴为位置(X坐标或Y坐标)。如果对于图一A和图一B,最终得到的频域曲线是相似的,随机延迟会被消除。评分依据是正确频率上的幅度和错误频率上的幅度。对于图一C,最终的频域曲线幅度将变小,对于图一D,频率将不正确。
局限性
但局限性也十分明显,如上的算法需要测量的频率极低,如果样本步长过小,①会导致精度丢失,②小幅度波动测量的十分清楚,导致产生大量杂值,然而由于动作解算方法的局限性,对于一个2160p的摄像头,10px以内的波动都是正常情况。③以上的方法对用户不友好,用户没办法获得自己的动作反馈,而且评分反馈难以让用户理解。④由于FFT对步长的要求,这是算法系统延时限制,会导致评分不能及时展示,最好的方案是等到全部录制完毕再评分,可能影响用户积极性。
解决方案
对于问题②可以采用低通滤波的方式解决,但是①③④暂时没有更好的方案。因此我们在第一版基础上没有做过多停留,改善到第二版进行实现。
第二版
算法原理
后来我们经过讨论,使用了一种模拟算法,这种模拟算法具体计算步骤如下:
将图像从BGR格式转换为RGB格式。 使用 MediaPipe 的 pose 模型处理图像,获取姿态 landmarks。如果未检测到姿态 landmarks,返回 None。 定义 get_coords 函数,用于从 landmarks 中提取坐标。提取左右肩膀、手肘、手腕、臀部、膝盖和脚踝的坐标。 计算中心肩膀和中心臀部的坐标,以及垂直参考点的坐标。计算左右手臂、左右腿和身体倾斜的角度。 返回计算得到的角度列表。 如果过程中发生异常,打印错误信息并返回 None。
以上4个步骤可以得到图像中人体关键点的角度,包括左右手臂、左右腿和身体倾斜的角度,具体代码如下:
<code>
# Function to calculate angle
def calculate_angle(fp, sp, tp):
fp, sp, tp = np.array(fp), np.array(sp), np.array(tp)
radians = np.arctan2(tp[1] - sp[1], tp[0] - sp[0]) - np.arctan2(fp[1] - sp[1], fp[0] - sp[0])
angle = np.abs(radians * 180 / np.pi)
return angle if angle <= 180 else 360 - angle
# Function to get pose angles from an image
def get_pose_angles(img):
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
results = pose.process(img_rgb)
if not results.pose_landmarks:
return None
landmarks = results.pose_landmarks.landmark
def get_coords(landmark):
return [landmarks[landmark.value].x, landmarks[landmark.value].y]
try:
left_shoulder, left_elbow, left_wrist = map(get_coords, [mp_pose.PoseLandmark.LEFT_SHOULDER,
mp_pose.PoseLandmark.LEFT_ELBOW,
mp_pose.PoseLandmark.LEFT_WRIST])
right_shoulder, right_elbow, right_wrist = map(get_coords, [mp_pose.PoseLandmark.RIGHT_SHOULDER,
mp_pose.PoseLandmark.RIGHT_ELBOW,
mp_pose.PoseLandmark.RIGHT_WRIST])
left_hip, left_knee, left_ankle = map(get_coords, [mp_pose.PoseLandmark.LEFT_HIP,
mp_pose.PoseLandmark.LEFT_KNEE,
mp_pose.PoseLandmark.LEFT_ANKLE])
right_hip, right_knee, right_ankle = map(get_coords, [mp_pose.PoseLandmark.RIGHT_HIP,
mp_pose.PoseLandmark.RIGHT_KNEE,
mp_pose.PoseLandmark.RIGHT_ANKLE])
center_shoulder = [(left_shoulder[0] + right_shoulder[0]) / 2, (left_shoulder[1] + right_shoulder[1]) / 2]
center_hip = [(left_hip[0] + right_hip[0]) / 2, (left_hip[1] + right_hip[1]) / 2]
vertical_refp = [(left_hip[0] + right_hip[0]) / 2, (left_hip[1] + right_hip[1]) / 2 + 10]
angles = [
calculate_angle(left_shoulder, left_elbow, left_wrist),
calculate_angle(right_shoulder, right_elbow, right_wrist),
calculate_angle(left_hip, left_knee, left_ankle),
calculate_angle(right_hip, right_knee, right_ankle),
180 - calculate_angle(center_shoulder, center_hip, vertical_refp)
]
return angles
except Exception as e:
print(f"Error in get_pose_angles: { e}")
return None
其中的landmark.png是各个关节的相对位置和关系地图,从外部加载。
接着,我们通过计算与标准角度间的差距,用分段函数映射,对于绝对值在10以下,记为满分;对于10到70之间,采用平方衰减;对于70以上,采用指数衰减;具体参数通过反馈调参实现,代码实现如下:
<code> angles = np.round(angles, 2)
diff_angles = angles - np.array(standard_angles)
frame_scores = np.zeros(len(diff_angles))
for part in range(len(diff_angles)):
abs_diff = abs(diff_angles[part])
if abs_diff <= 10:
frame_scores[part] = 100 - 0.03 * abs_diff * abs_diff
elif 10 < abs_diff <= 70:
frame_scores[part] = 94.23 - 0.02 * (abs_diff - 8) * (abs_diff - 8)
else:
frame_scores[part] = 17.35 * math.exp(-(abs_diff - 70) / 20)
局限性
接入外围接口和设备之后,我们发现这个算法的评分十分不稳定,而且可信度不高。具体表现为:暂停视频同一帧比较时,评分仍然会较大波动;并且即便人坐着,标准视频站着,都有可能得到较高的分数。
解决方案
经过中途调试和分析并且将点位绘制在实时视频流上,我们发现即便人不动作,标记点位都会有较大的波动,表现为动作的位移和置信度的波动,一个方案是通过提高算法参数规模,但是我们暂时没有更多算力资源,OpenPose的后端每秒只能处理10帧,PosNet的20版本每秒可以处理20帧(相当糟糕的表现)。另一个更方便的方案是通过缓存,用最近的历史平滑掉波动的问题。
对于非波动性误差的产生,我经过反复验证,确定是由于时间误差产生的,因为上述的算法几乎完全无视了这个问题,因为我们的算力和人反应力是不稳定的,会产生动态的延迟,因此我们需要用一个更大的尺度来衡量这个分数标准,也是从另一个方面平滑了波动的效果。
第三版
第三版为个人研究所得,个人想法先通过划分时间和关节动态,得到各个动作的时间区间。
单位时间:通过时间戳,降低算力和录制、标准视频帧率对最终得分的影响关节动态的定义:关节在单位时间内经过缓存平滑计算后,仍然和历史0保留位置存在大于阈值的距离,则标记为关节动态。关节方向:以逆时针旋转,X+为零弧度,计算从上一个位置到平滑后的当前位置的方向。动作同一性:如果关节动态未改变,且关节运动方向与上一个状态差<30°动作连续性:如果关节动态未改变,且关节方向运动上一个状态差大于30°晓宇45°;否则切分为另一个动作,将上一个动作记录为最小评分单位。
function updateJointState(pose, currentTime) {
// 最后一次的pose可能是undefine,因此需要判断一下
if (!pose) {
return;
}
positions = pose.keypoints.map(keypoint => ({ x: keypoint.position.x, y: keypoint.position.y }));
if (bufferPositions.length == 0){
pushBuffer(positions, currentTime, jointStates, jointSpeeds, lastDirections);
return;
}
buffer = getBuffer();
console.log(buffer);
lastPositions = buffer.positions;
prevJointStates = buffer.states;
prevLastDirections = buffer.directions;
lastTime = buffer.time;
if (!lastTime) {
return;
}
const dt = (currentTime - lastTime);
if (dt === 0) {
return;
}
// Check if all joints are moving in the same direction
let notSameDirection = 0;
let directionSum = 0;
// Calculate speed and angle for each joint
// 将jointStates清空
for (let i = 0; i < 16; i++) {
jointStates[i] = false;
jointSpeeds[i] = 0;
lastDirections[i] = 0;
}
for (let i = 5; i < 16; i++) {
if (pose.keypoints[i].score < 0.5) {
continue
}
const dx = positions[i].x - lastPositions[i].x;
const dy = positions[i].y - lastPositions[i].y;
const speed = Math.sqrt(dx * dx + dy * dy) / dt * pose_weight[i]; // px/ms
const angle = Math.atan2(dy, dx) * 180 / Math.PI; // degree/ms thresh = 100degree/1000ms
jointSpeeds[i] = speed; // 关节速度
lastDirections[i] = angle; // 运动方向,x轴正方向为0°,逆时针为正方向
directionSum += angle;
if (speed > threshold) {
jointStates[i] = true; // 关节是否在运动
} else {
jointStates[i] = false;
}
}
// Check if all joints are moving in the same direction ,理想状态下所有,但是实际上只要大于8个就行
const avgDirection = directionSum / jointStates.slice(5).filter(Boolean).length;
notSameDirection = 0;
for (let i = 5; i < 16; i++) {
if (jointStates[i]) {
if (Math.abs(lastDirections[i] - avgDirection) > same_direction_threshold) {
notSameDirection += 1;
break;
}
}
}
if (notSameDirection < 4) {
jointStates.fill(false);
let lowestMoving = -1, maxY = 0;
for (let i = 5; i < 16; i++) { // find the lowest joint that is moving (y坐标最大)
if (jointStates[i] && positions[i].y > maxY) {
maxY = positions[i].y;
lowestMoving = i - 5;
}
}
if (lowestMoving !== -1) {
jointStates[lowestMoving + 5] = true; // Mark the other joints as not moving except for the lowest one
}
}
// Remove conflicting joints (如果挥左手,左肩、左肘、左腕都会动,当左腕加权速度最大时,只需要告诉用户动左腕就行)
for (let i = 0; i < pose_conflict.length; i++) {
const joints = pose_conflict[i];
max_speed = 0;
max_joint = -1;
// Reset the moving state of conflicting joints
joints.forEach(j => jointStates[j] = false);
for (let j = 0; j < joints.length; j++) {
if (jointSpeeds[joints[j]] > max_speed) {
max_speed = jointSpeeds[joints[j]];
max_joint = j;
}
}
if (max_joint !== -1) {
jointStates[joints[max_joint]] = true;
}
}
console.log(jointStates);
console.log(jointSpeeds);
console.log(lastDirections);
// Check continuity
let isThisContinuous = true;
let minorChanged = false;
let movingChanged = false;
let directionChanged = false;
for (let i = 5; i < 16; i++) {
if (jointStates[i] != prevJointStates[i]) {
isThisContinuous = false;
movingChanged = true;
break;
}
if (jointStates[i])
{
if (Math.abs(lastDirections[i] - prevLastDirections[i]) > same_direction_threshold) {
isThisContinuous = false;
directionChanged = true;
break;
}
else if (Math.abs(lastDirections[i] - prevLastDirections[i]) > minor_changed_direction_threshold) {
// 改变方向小于45°但大于30°时,认为是连续的,记录进入history
minorChanged = true;
break;
}
}
}
// 容错检查
if (minorChanged)
{
updateHistory(positions, lastDirections);
drawHistory();
}
document.getElementById('test').textContent = '---';
if (!isThisContinuous) {
interruptionCount++;
if (interruptionCount >= incontinuity_thresh && dt > incontinuity_time) {
if (movingChanged){
document.getElementById('test') .textContent = '动作变化';
}
else if (directionChanged){
document.getElementById('test') .textContent = '方向变化';
}
updateHistory(lastPositions, lastDirections); // Save last known continuous positions
drawHistory(); // 在canvas_mask上画出连续动作轨迹
history = [];
updateHistory(lastPositions, lastDirections); // Save current positions as renewed continuous positions
interruptionCount = 0;
}
} else {
interruptionCount = 0; // Reset counter on continuity
// Update last positions and time
}
pushBuffer(positions, currentTime, jointStates, jointSpeeds, lastDirections);
}
以上评分过程还在开发调试中,目前暂未发现问题。
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。