카테고리 없음

오탁

HawordFREAKEK 2025. 2. 24. 22:48

import socket
import struct
import time
import requests

def otac_authenticate(otac_server_url, user_id, password):
    """센스톤 OTAC 인증 함수"""
    try:
        response = requests.post(otac_server_url, json={'user_id': user_id, 'password': password})
        response.raise_for_status()
        otac_token = response.json()['otac_token']
        return otac_token
    except requests.exceptions.RequestException as e:
        print(f"OTAC 인증 오류: {e}")
        return None

def read_r_series_data(ip, port, device_code, device_number, read_length):
    """R 시리즈 PLC에서 데이터를 읽어오는 함수"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)
        sock.connect((ip, port))

        command = b'\x04\x01'
        sub_command = b'\x00\x00'
        device_code_bytes = device_code.encode('ascii')
        device_number_bytes = struct.pack('<H', device_number)
        read_length_bytes = struct.pack('<H', read_length)
        data = command + sub_command + device_code_bytes + device_number_bytes + read_length_bytes
        frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data

        sock.sendall(frame)
        response = sock.recv(1024)
        sock.close()

        if response[11] == 0x00:
            data_start = 18
            read_data = struct.unpack('<' + 'H' * read_length, response[data_start:data_start + read_length * 2])
            return read_data
        else:
            print(f"PLC 오류 응답: {response[11]}")
            return None
    except (socket.timeout, ConnectionRefusedError, OSError) as e:
        print(f"통신 오류: {e}")
        return None

def write_r_series_data(ip, port, device_code, device_number, write_data):
    """R 시리즈 PLC에 데이터를 쓰는 함수"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)
        sock.connect((ip, port))

        command = b'\x14\x01'
        sub_command = b'\x00\x00'
        device_code_bytes = device_code.encode('ascii')
        device_number_bytes = struct.pack('<H', device_number)
        write_length = len(write_data)
        write_length_bytes = struct.pack('<H', write_length)
        data = command + sub_command + device_code_bytes + device_number_bytes + write_length_bytes + struct.pack('<' + 'H' * write_length, *write_data)
        frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data

        sock.sendall(frame)
        response = sock.recv(1024)
        sock.close()

        if response[11] == 0x00:
            return True
        else:
            print(f"PLC 쓰기 오류 응답: {response[11]}")
            return False
    except (socket.timeout, ConnectionRefusedError, OSError) as e:
        print(f"통신 오류: {e}")
        return False

def test_otac_plc(otac_server_url, user_id, password, plc_ip, plc_port, device_code, device_number, read_length, write_data):
    """OTAC 적용 PLC 테스트 함수"""
    # 정상 OTAC 인증 테스트
    print("\n정상 OTAC 인증 테스트:")
    otac_token = otac_authenticate(otac_server_url, user_id, password)
    if otac_token:
        print("OTAC 인증 성공")

        # PLC 데이터 읽기 테스트
        print("\nPLC 데이터 읽기 테스트:")
        read_data = read_r_series_data(plc_ip, plc_port, device_code, device_number, read_length)
        if read_data:
            print(f"PLC 데이터 읽기 성공: {read_data}")

            # PLC 데이터 쓰기 테스트
            print("\nPLC 데이터 쓰기 테스트:")
            if write_r_series_data(plc_ip, plc_port, device_code, device_number, write_data):
                print("PLC 데이터 쓰기 성공")
            else:
                print("PLC 데이터 쓰기 실패")
        else:
            print("PLC 데이터 읽기 실패")
    else:
        print("OTAC 인증 실패")

    # 비정상 OTAC 인증 테스트
    print("\n비정상 OTAC 인증 테스트:")
    if not otac_authenticate(otac_server_url, user_id, "wrong_password"):
        print("비정상 OTAC 인증 실패 (정상)")
    else:
        print("비정상 OTAC 인증 성공 (오류)")

# 테스트 설정
otac_server_url = 'YOUR_OTAC_SERVER_URL'
user_id = 'YOUR_USER_ID'
password = 'YOUR_PASSWORD'
plc_ip = '192.168.1.10'
plc_port = 5006
device_code = 'D'
device_number = 100
read_length = 10
write_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

# 테스트 실행
test_otac_plc(otac_server_url, user_id, password, plc_ip, plc_port, device_code, device_number, read_length, write_data)