import socket
import struct
import time
import requests
def otac_authenticate(otac_server_url, user_id, password):
"""센스톤 OTAC 인증 함수"""
try:
response = requests.post(otac_server_url, json={'user_id': user_id, 'password': password})
response.raise_for_status()
otac_token = response.json()['otac_token']
return otac_token
except requests.exceptions.RequestException as e:
print(f"OTAC 인증 오류: {e}")
return None
def read_r_series_data(ip, port, device_code, device_number, read_length):
"""R 시리즈 PLC에서 데이터를 읽어오는 함수"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((ip, port))
command = b'\x04\x01'
sub_command = b'\x00\x00'
device_code_bytes = device_code.encode('ascii')
device_number_bytes = struct.pack('<H', device_number)
read_length_bytes = struct.pack('<H', read_length)
data = command + sub_command + device_code_bytes + device_number_bytes + read_length_bytes
frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data
sock.sendall(frame)
response = sock.recv(1024)
sock.close()
if response[11] == 0x00:
data_start = 18
read_data = struct.unpack('<' + 'H' * read_length, response[data_start:data_start + read_length * 2])
return read_data
else:
print(f"PLC 오류 응답: {response[11]}")
return None
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"통신 오류: {e}")
return None
def write_r_series_data(ip, port, device_code, device_number, write_data):
"""R 시리즈 PLC에 데이터를 쓰는 함수"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((ip, port))
command = b'\x14\x01'
sub_command = b'\x00\x00'
device_code_bytes = device_code.encode('ascii')
device_number_bytes = struct.pack('<H', device_number)
write_length = len(write_data)
write_length_bytes = struct.pack('<H', write_length)
data = command + sub_command + device_code_bytes + device_number_bytes + write_length_bytes + struct.pack('<' + 'H' * write_length, *write_data)
frame = b'\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x00\xFF\xFF\x03\x00' + struct.pack('<H', len(data)) + data
sock.sendall(frame)
response = sock.recv(1024)
sock.close()
if response[11] == 0x00:
return True
else:
print(f"PLC 쓰기 오류 응답: {response[11]}")
return False
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"통신 오류: {e}")
return False
def test_otac_plc(otac_server_url, user_id, password, plc_ip, plc_port, device_code, device_number, read_length, write_data):
"""OTAC 적용 PLC 테스트 함수"""
# 정상 OTAC 인증 테스트
print("\n정상 OTAC 인증 테스트:")
otac_token = otac_authenticate(otac_server_url, user_id, password)
if otac_token:
print("OTAC 인증 성공")
# PLC 데이터 읽기 테스트
print("\nPLC 데이터 읽기 테스트:")
read_data = read_r_series_data(plc_ip, plc_port, device_code, device_number, read_length)
if read_data:
print(f"PLC 데이터 읽기 성공: {read_data}")
# PLC 데이터 쓰기 테스트
print("\nPLC 데이터 쓰기 테스트:")
if write_r_series_data(plc_ip, plc_port, device_code, device_number, write_data):
print("PLC 데이터 쓰기 성공")
else:
print("PLC 데이터 쓰기 실패")
else:
print("PLC 데이터 읽기 실패")
else:
print("OTAC 인증 실패")
# 비정상 OTAC 인증 테스트
print("\n비정상 OTAC 인증 테스트:")
if not otac_authenticate(otac_server_url, user_id, "wrong_password"):
print("비정상 OTAC 인증 실패 (정상)")
else:
print("비정상 OTAC 인증 성공 (오류)")
# 테스트 설정
otac_server_url = 'YOUR_OTAC_SERVER_URL'
user_id = 'YOUR_USER_ID'
password = 'YOUR_PASSWORD'
plc_ip = '192.168.1.10'
plc_port = 5006
device_code = 'D'
device_number = 100
read_length = 10
write_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
# 테스트 실행
test_otac_plc(otac_server_url, user_id, password, plc_ip, plc_port, device_code, device_number, read_length, write_data)