This project was contributed to the Roboflow blog by Nathan Yan.

In this tutorial, we’ll be using Roboflow's computer vision tools to analyze and detect when a person falls. This application could be used to identify falls in manufacturing facilities, where a fall may present a significant danger to ongoing operations.

Our application will allow us to distinguish between what is likely to be a fall versus someone kneeling and other voluntary acts that involve being in a non-standing position.

If a person falls fast, we can tell it is not on purpose (a swift movement to the ground is likely to be a fall). If a person falls slowly, they are likely getting down on purpose (kneeling, sleeping, doing pushups)

0:00
/0:12

With this tutorial, you’ll learn how to:

  • Get useable data
  • Use Roboflow code to build a fall detection model
  • Add a time and punishment system to the model
  • Analyze data within a video

Here is a video that walks through the project:

Before You Start

Before we start programming, set your environment to use a Built-in GPU. A Built-in GPU will enhance processing speed, handle computationally demanding tasks more efficiently, and save energy.

# Set to GPU
!nvidia-smi

Step #1: Install Required Dependencies

First, we need to install and import the necessary libraries needed to build the project.

We will be using:

  • Numpy for easy access to information given;
  • Matplotlib to graph information;
  • Pillow to save and load the images;
  • Opencv-python to save and download the video;
  • Roboflow to train and build the fall detection model;
  • Inference to get the result of the model;
  • Supervision to get frames and annotate the image to visualize our detections.

You can install and import the required dependencies using the following code:

# Import requirements
!pip install numpy matplotlib pillow opencv-python roboflow inference supervision==0.19.0

# Import needed modules
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import supervision as sv
import numpy as np
import shutil
import cv2
import os

from inference_sdk import InferenceHTTPClient
from google.colab.patches import cv2_imshow
from PIL import Image as im

Step #2: Download the Dataset

To download the video dataset, we will separate a given video into various frames.

First, download the video. We can do this by simply downloading the sample image obtained from “!wget”.

You can view the video here:

# Get the sample detection system we will be using
!wget - no-check-certificate 'https://drive.google.com/uc?export=download&id=1nSFmlcLAiBiFfQkMEisdR5DMICcYSax0' -O sample_detection.mp4

Then, we must give the specific paths of the files we need.

  • FRAMES_DIR is for the unannotated frames obtained from the video.
  • ANNOTATED_DIR is for the annotated frames that we will combine into a video.
  • VIDEO_PATH is the input video path (in this case it will be sample_detection).
  • OUTPUT_VIDEO_MODEL is the fully annotated video.
  • OUTPUT_VIDEO_HEATMAP is the output video for the heatmap.
FRAMES_DIR="/content/frames" # NON DETECTED FOLDER PATH
ANNOTATED_DIR = "/content/annotated" # ANNOTATED IMAGES (OUTPUT) FOLDER
VIDEO_PATH = "/content/sample_detection.mp4" # INPUT VIDEO PATH
OUTPUT_VIDEO_MODEL = "/content/vision.mp4" #OUTPUT VIDEO PATH
OUTPUT_VIDEO_HEATMAP = "/content/heatmap.mp4" #OUTPUT HEATMAP VIDEO PATH

Next, we create the folders using:

os.mkdir(ANNOTATED_DIR)
os.mkdir(FRAMES_DIR)

Finally, we iterate through the video images using Supervision’s frame generator. Using a for-loop and the enumerate function, we can get the frame as well as a number correlated to that frame (starting from 0 and ending at the last frame).

# Save all frames in video as images (jpg)
frames_generator = sv.get_video_frames_generator(VIDEO_PATH)

for i, frame in enumerate(frames_generator):
  img = im.fromarray(frame)
  img.save(f"{FRAMES_DIR}/{i}.jpg")

print(f"Saved frames to {FRAMES_DIR}")

Since we will have multiple images, it is best practice to clear the file before we try another video. To clear a video, we use the following function obtained from stack overflow.

# Reset frames and annotated values
def clear_file(FILEPATH):
for filename in os.listdir(FILEPATH):
    file_path = os.path.join(FILEPATH, filename)
    try:
        if os.path.isfile(file_path) or os.path.islink(file_path):
            os.unlink(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)
    except Exception as e:
        print('Failed to delete %s. Reason: %s' % (file_path, e))
clear_file(ANNOTATED_DIR)

Step #3: Use the Person Detection Model with Roboflow Inference

Now that we have gotten all of the data, we can finally start training the model. Since I have already built out a model, I will be using the preexisting dataset here.

The model we will use, hosted on Roboflow, detects three statuses:

  • Standing
  • Someone falling
  • Someone who has fallen

To use the model, we will first need to initialise the Roboflow object detection tools by declaring the CLIENT. You will need your own Roboflow API key. Learn how to retrieve your Roboflow API key

# Get Roboflow API and tools
CLIENT = InferenceHTTPClient(
  api_url="https://detect.roboflow.com",
  api_key="" # INSERT API_KEY
)

Step #4: Create Timer Class

Next, using the following video, we add a timer feature to detect how long someone has fallen. Make sure to attach a reset timer feature so the model will be able to reset its timer (reset_all and reset_time). These two models will help track the true severity of a fall.

class FPS_Timer():
  def __init__(self, fps: int = 30) -> None:
      self.fps = fps
      self.frames_id = 0
      self.tracker_id2frame_id: Dict[int, int] = {}
      
  def reset_time(self, tracker_id: int) -> None:
      self.tracker_id2frame_id[tracker_id] = self.frames_id
      
  def reset_all(self):
    for key in self.tracker_id2frame_id:
       self.tracker_id2frame_id[key] = self.frames_id
       
  def tick(self, detections: sv.Detections) -> np.ndarray:
      self.frames_id += 1
      times = []
      for tracker_id in detections.tracker_id:
          if tracker_id not in self.tracker_id2frame_id:
              self.tracker_id2frame_id[tracker_id] = self.frames_id


          start_frame_id = self.tracker_id2frame_id[tracker_id]
          time_duration = (self.frames_id - start_frame_id) / self.fps
          times.append(time_duration)

      return np.array(times)

Step #5: Build Action Logic

We are going to assume that it takes around 5 seconds to stand up from a minor fall. Therefore, because we are tracking the time, if we get the label “fallen” and if the time is greater than 30 seconds, the fall may be deemed an emergency. Through this, we create a simple detect danger function that checks if they have fallen for more than 30 seconds or not.

# Detect how long they have fallen for
fall_time = 30

def detect_danger(detections):
for tracker_id in detections.tracker_id:
  if tracker_id > fall_time:
    print("critical condition")

Step #6: Analyze a Video

Our script will take in a video, analyze each frame, and plot predictions that show whether someone is standing, falling, or has fallen. For each prediction, we will show on the video:

  • The label (standing, falling, fallen);
  • The bounding box, and;
  • The timer.

First, call the label annotator, the bounding box, and the time tracker by defining them. Additionally, we get the identity of the image (standing, falling, or fallen) to track if the label of the detection has changed or not.

BOUNDING_BOX_ANNOTATOR = sv.BoundingBoxAnnotator()
LABEL_ANNOTATOR = sv.LabelAnnotator()
tracker = sv.ByteTrack()
identity = []

Next, we will need to get the frames per second (fps) of the video. We can get this by using Roboflow’s built-in function “video_info”.

video_info = sv.VideoInfo.from_video_path(VIDEO_PATH)
timers = FPS_Timer(fps=video_info.fps)
Now that we have all the info we need, we can finally iterate through the frames.
frames_generator = sv.get_video_frames_generator(VIDEO_PATH)
for i, frame in enumerate(frames_generator):
    result = CLIENT.infer(
        f"{FRAMES_DIR}/{i}.jpg",
        model_id="fall-detection-real/1")
    detections = sv.Detections.from_inference(result)
    try:
      prev_identity = identity
      identity = detections.data['class_name'][0]

      if prev_identity != identity:
        timers.reset_all()
    except IndexError:
        identity = None

    detections = tracker.update_with_detections(detections=detections)
    times = timers.tick(detections=detections)

    annotated_frame = frame.copy()
    annotated_frame = BOUNDING_BOX_ANNOTATOR.annotate(
        scene = annotated_frame,
        detections = detections
    )

    labels = [
        f"{identity}: {time:.2f}s"
        for tracker_id, time in zip(time_track.tracker_id, times)
    ]
    annotated_frame = LABEL_ANNOTATOR.annotate(
            scene = annotated_frame,
            detections = detections,
            labels = labels
    )

    image = im.fromarray(annotated_frame)
    image.save(f"{ANNOTATED_DIR}/{i}.jpg")
    print(f"Detected frame {i}")

height, width, layers = frame.shape


# Merge the annotated images file toegther to create the video
frame = cv2.imread(os.path.join(ANNOTATED_DIR, "0.jpg"))

frame_rate = video_info.fps
video = cv2.VideoWriter(OUTPUT_VIDEO_MODEL, cv2.VideoWriter_fourcc('M','J','P','G'), frame_rate, (width, height))
for j in range(len(os.listdir(ANNOTATED_DIR))):
    video.write(cv2.imread(os.path.join(ANNOTATED_DIR, f"{j}.jpg")))

video.release()
print("Done!")

In the loop, we get the detection using Roboflow API.

Next, we get the label of the detection (if there is one) by using try and except. If there is no label, we simply pass the identity as None. However, if the identity and the previous identity are different, we reset the timer by calling the previously defined function.

Next, we add a tick to the timer so it tracks the number of seconds passed.

Then, we add our annotations by copying the frame, adding the detections, and calling the previous bounding box and label annotators. The format of the label will be the identity (standing, falling, fallen), and the time (in seconds).

To view the video, we save the image to a folder using a for-loop. After the for-loop is complete, we go through each frame in the folder and add them together using VideoWriter from opencv.

Lastly, we print “Done” to signal that we are done with the entire process. After running this code, we should get an output similar to this:

We can access the “vision.mp4" in the main files on the side here.

Step #6: Generate a Heatmap

We can generate a heatmap that shows where people are walking using our model.

heat_map_annotator = sv.HeatMapAnnotator()
frames_generator = sv.get_video_frames_generator(VIDEO_PATH)
video_info = sv.VideoInfo.from_video_path(video_path=VIDEO_PATH)

with sv.VideoSink(target_path=OUTPUT_VIDEO_HEATMAP, video_info=video_info) as sink:
    for i, frame in enumerate(frames_generator):
       result = CLIENT.infer(
        frame,
        model_id="fall-detection-real/1")
      detections = sv.Detections.from_inference(result)
      annotated_frame = frame.copy()
      annotated_frame = heat_map_annotator.annotate(
          scene=annotated_frame,
          detections=detections)

      sink.write_frame(frame=annotated_frame)
      print(f"Heatmap {i} completed")

First, we define the heatmap annotator provided by Roboflow. Next, we get the video info and the frames. Using Roboflow’s sink function, we can create the video faster to a directed path (acts the same as the previous cv2 video writer).

Using the detections from Roboflow API, we create the heatmap by using “heat_map_annotator” (similar to what we did with Label_Annotator and Bounding Box Annotator).

Finally, we write it into the video path using the write function. We print a statement to confirm its completion.

After running the heatmap code, we will see an output similar to this:

We can access the “heatmap.mp4” from the downloaded folders.

Conclusion

In this guide, we used computer vision to identify falls in a video. The logic in this project could be integrated with an alerting system to ensure someone who falls in a recorded environment can receive immediate attention.

For example, the system above could be deployed in a manufacturing facility to track if anyone falls. If someone has fallen and has been unable to get up in a few seconds, people qualified in first aid can be alerted, and workers in the area can be notified to stop working until care has been given to the injured worker.