web_security

Mitsubishi PLC R Series Control code, using by Python3

HawordFREAKEK 2025. 2. 24. 20:26
더보기

import socket
import struct

def read_r_series_data(ip, port, device_code, device_number, read_length):
    """R 시리즈 PLC에서 데이터를 읽어오는 함수"""

    # SLMP 프레임 생성
    command = b'\x04\x01'  # 워드 데이터 읽기 명령어
    sub_command = b'\x00\x00'
    device_code_bytes = device_code.encode('ascii')
    device_number_bytes = struct.pack('<H', device_number)
    read_length_bytes = struct.pack('<H', read_length)
    data = command + sub_command + device_code_bytes + device_number_bytes + read_length_bytes
    frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data

    # 소켓 통신
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((ip, port))
    sock.sendall(frame)
    response = sock.recv(1024)
    sock.close()

    # 응답 데이터 처리
    if response[11] == 0x00:  # 정상 응답 확인
        data_start = 18  # 데이터 시작 위치
        read_data = struct.unpack('<' + 'H' * read_length, response[data_start:data_start + read_length * 2])
        return read_data
    else:
        return None  # 오류 발생

# 사용 예시
ip_address = '192.168.1.10'
port_number = 5006
device_code = 'D'  # jk 레지스터
device_number = 100
read_length = 10

while True:
    print("\n기능 선택:")
    print("1. 데이터 읽기")
    print("2. 데이터 쓰기")
    print("3. PLC 시작")
    print("4. PLC 스캔")
    print("5. PLC 정지")
    print("0. 종료")

    choice = input("선택: ")

    if choice == '1':
        read_data = read_r_series_data(ip_address, port_number, device_code, device_number, read_length)
        if read_data:
            print(f"읽은 데이터: {read_data}")
        else:
            print("데이터 읽기 실패")
    elif choice == '2':
        write_data = tuple(map(int, input("쓸 데이터 (쉼표로 구분): ").split(',')))
        if write_r_series_data(ip_address, port_number, device_code, device_number, write_data):
            print("데이터 쓰기 성공")
        else:
            print("데이터 쓰기 실패")
    elif choice == '3':
        if plc_control(ip_address, port_number, 'start'):
            print("PLC 시작 성공")
        else:
            print("PLC 시작 실패")
        time.sleep(1)
    elif choice == '4':
        if plc_control(ip_address, port_number, 'scan'):
            print("PLC 스캔 성공")
        else:
            print("PLC 스캔 실패")
        time.sleep(1)
    elif choice == '5':
        if plc_control(ip_address, port_number, 'stop'):
            print("PLC 정지 성공")
        else:
            print("PLC 정지 실패")
    elif choice == '0':
        break
    else:
        print("잘못된 선택입니다.")

 

 

test

더보기

import socket
import struct
import time

def read_r_series_data(ip, port, device_code, device_number, read_length):
    """R 시리즈 PLC에서 데이터를 읽어오는 함수 (오류 처리 강화)"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)  # 2초 타임아웃 설정
        sock.connect((ip, port))

        # SLMP 프레임 생성
        command = b'\x04\x01'  # 워드 데이터 읽기 명령어
        sub_command = b'\x00\x00'
        device_code_bytes = device_code.encode('ascii')
        device_number_bytes = struct.pack('<H', device_number)
        read_length_bytes = struct.pack('<H', read_length)
        data = command + sub_command + device_code_bytes + device_number_bytes + read_length_bytes
        frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data

        sock.sendall(frame)
        response = sock.recv(1024)
        sock.close()

        if response[11] == 0x00:  # 정상 응답 확인
            data_start = 18  # 데이터 시작 위치
            read_data = struct.unpack('<' + 'H' * read_length, response[data_start:data_start + read_length * 2])
            return read_data
        else:
            print(f"PLC 오류 응답: {response[11]}")  # jk오류 코드 출력
            return None
    except (socket.timeout, ConnectionRefusedError, OSError) as e:
        print(f"통신 오류: {e}")
        return None

def write_r_series_data(ip, port, device_code, device_number, write_data):
    """R 시리즈 PLC에 데이터를 쓰는 함수 (오류 처리 강화)"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)
        sock.connect((ip, port))

        # SLMP 프레임 생성
        command = b'\x14\x01'  # 워드 데이터 쓰기 명령어
        sub_command = b'\x00\x00'
        device_code_bytes = device_code.encode('ascii')
        device_number_bytes = struct.pack('<H', device_number)
        write_length = len(write_data)
        write_length_bytes = struct.pack('<H', write_length)
        data = command + sub_command + device_code_bytes + device_number_bytes + write_length_bytes + struct.pack('<' + 'H' * write_length, *write_data)
        frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data

        sock.sendall(frame)
        response = sock.recv(1024)
        sock.close()

        if response[11] == 0x00:  # 정상 응답 확인
            return True
        else:
            print(f"PLC 쓰기 오류 응답: {response[11]}")
            return False
    except (socket.timeout, ConnectionRefusedError, OSError) as e:
        print(f"통신 오류: {e}")
        return False

def plc_control(ip, port, control_code):
    """PLC 시작, 정지, 스캔 제어 함수 (오류 처리 강화)"""
    control_codes = {
        'start': b'\x0A\x01\x00\x00\x01\x00',  # 시작
        'stop': b'\x0A\x01\x00\x00\x02\x00',   # 정지
        'scan': b'\x0A\x01\x00\x00\x05\x00'    # 스캔
    }

    if control_code not in control_codes:
        print("잘못된 제어 코드입니다.")
        return False

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)
        sock.connect((ip, port))

        # SLMP 프레임 생성
        data = control_codes[control_code]
        frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data

        sock.sendall(frame)
        response = sock.recv(1024)
        sock.close()

        if response[11] == 0x00:  # 정상 응답 확인
            return True
        else:
            print(f"PLC 제어 오류 응답: {response[11]}")
            return False
    except (socket.timeout, ConnectionRefusedError, OSError) as e:
        print(f"통신 오류: {e}")
        return False

# 사용 예시
ip_address = '192.168.1.10'
port_number = 5006
device_code = 'D'  # D 레지스터
device_number = 100
read_length = 10

while True:
    print("\n기능 선택:")
    print("1. 데이터 읽기")
    print("2. 데이터 쓰기")
    print("3. PLC 시작")
    print("4. PLC 스캔")
    print("5. PLC 정지")
    print("0. 종료")

    choice = input("선택: ")

    if choice == '1':
        read_data = read_r_series_data(ip_address, port_number, device_code, device_number, read_length)
        if read_data:
            print(f"읽은 데이터: {read_data}")
        else:
            print("데이터 읽기 실패")
    elif choice == '2':
        try:
            write_data = tuple(map(int, input("쓸 데이터 (쉼표로 구분): ").split(',')))
            if write_r_series_data(ip_address, port_number, device_code, device_number, write_data):
                print("데이터 쓰기 성공")
            else:
                print("데이터 쓰기 실패")
        except ValueError:
            print("잘못된 데이터 형식입니다. 정수 값을 쉼표로 구분하여 입력하세요.")
    elif choice == '3':
        if plc_control(ip_address, port_number, 'start'):
            print("PLC 시작 성공")
        else:
            print("PLC 시작 실패")
        time.sleep(1)
    elif choice == '4':
        if plc_control(ip_address, port_number, 'scan'):
            print("PLC 스캔 성공")
        else:
            print("PLC 스캔 실패")
        time.sleep(1)
    elif choice == '5':
        if plc_control(ip_address, port_number, 'stop'):
            print("PLC 정지 성공")
        else:
            print("PLC 정지 실패")
    elif choice == '0':
        break
    else:
        print("잘못된 선택입니다.")

 

packet 통신 확인 코드

import socket
import struct

def check_plc_connection(ip, port):
    """R 시리즈 PLC 연결 확인 함수"""

    try:
        # 소켓 생성 및 연결
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)  # 2초 타임아웃 설정
        sock.connect((ip, port))

        # SLMP 프레임 생성 (간단한 읽기 명령어)
        command = b'\x04\x01'  # 워드 데이터 읽기 명령어
        sub_command = b'\x00\x00'
        device_code_bytes = b'D'  # D 레지스터
        device_number_bytes = struct.pack('<H', 0)  # D0
        read_length_bytes = struct.pack('<H', 1)  # 1 워드 읽기
        data = command + sub_command + device_code_bytes + device_number_bytes + read_length_bytes
        frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data

        # 프레임 전송 및 응답 수신
        sock.sendall(frame)
        response = sock.recv(1024)

        # 응답 확인
        if response and response[11] == 0x00:
            print("PLC 연결 성공 및 응답 수신!")
            return True
        else:
            print("PLC 연결 성공, 응답 오류!")
            return False

    except socket.timeout:
        print("PLC 연결 시간 초과!")
        return False
    except ConnectionRefusedError:
        print("PLC 연결 거부됨!")
        return False
    except Exception as e:
        print(f"PLC 연결 중 오류 발생: {e}")
        return False
    finally:
        if 'sock' in locals():
            sock.close()

# 사용 예시
plc_ip = '192.168.1.10'  # PLC IP 주소
plc_port = 5006  # PLC 포트 번호

if check_plc_connection(plc_ip, plc_port):
    print("PLC와 패킷 통신 가능!")
else:
    print("PLC와 패킷 통신 불가능!")

'web_security' 카테고리의 다른 글

ntds.dit  (1) 2024.08.30
국산 토종 종갓집 웹쉘  (1) 2023.04.18
sql injection payloads  (0) 2021.07.20
robots.txt 란?  (0) 2020.02.15